Skip to content

Commit 3414ad0

Browse files
committed
stfb: add --max-built-stfs paramter; send DPL SourceInfo::Completed on exit
1 parent 7ccc81c commit 3414ad0

5 files changed

Lines changed: 86 additions & 10 deletions

File tree

src/StfBuilder/StfBuilderDevice.cxx

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
#include "StfBuilderDevice.h"
1515

16+
#include <DataDistLogger.h>
1617
#include <SubTimeFrameUtils.h>
1718
#include <SubTimeFrameVisitors.h>
1819
#include <ReadoutDataModel.h>
@@ -21,14 +22,13 @@
2122
#include <Utilities.h>
2223

2324
#include <options/FairMQProgOptions.h>
25+
#include <Framework/SourceInfoHeader.h>
2426

2527
#include <chrono>
2628
#include <thread>
2729
#include <exception>
2830
#include <boost/algorithm/string.hpp>
2931

30-
#include <DataDistLogger.h>
31-
3232
namespace o2
3333
{
3434
namespace DataDistribution
@@ -94,6 +94,7 @@ void StfBuilderDevice::InitTask()
9494
I().mDplChannelName = GetConfig()->GetValue<std::string>(OptionKeyDplChannelName);
9595
I().mStandalone = GetConfig()->GetValue<bool>(OptionKeyStandalone);
9696
I().mMaxStfsInPipeline = GetConfig()->GetValue<std::int64_t>(OptionKeyMaxBufferedStfs);
97+
I().mMaxBuiltStfs = GetConfig()->GetValue<std::uint64_t>(OptionKeyMaxBuiltStfs);
9798

9899
// input data handling
99100
ReadoutDataUtils::sSpecifiedDataOrigin = getDataOriginFromOption(
@@ -127,6 +128,10 @@ void StfBuilderDevice::InitTask()
127128
"Possibility of creating back-pressure.");
128129
}
129130

131+
// Limited number of STF?
132+
DDLOGF(fair::Severity::INFO, "Configuration: Number of built SubTimeFrames is {}",
133+
I().mMaxBuiltStfs == 0 ? "not limited" : ("limited to " + std::to_string(I().mMaxBuiltStfs)));
134+
130135
// File sink
131136
if (!I().mFileSink->loadVerifyConfig(*(this->GetConfig()))) {
132137
exit(-1);
@@ -254,6 +259,9 @@ void StfBuilderDevice::ResetTask()
254259
{
255260
DDLOGF(fair::Severity::DEBUG, "StfBuilderDevice::ResetTask()");
256261

262+
// Signal ConditionalRun() and other threads to stop
263+
I().mRunning = false;
264+
257265
// stop the memory resources
258266
MemI().stop();
259267

@@ -364,11 +372,41 @@ void StfBuilderDevice::StfOutputThread()
364372
const double lTimeMs = std::max(1e-6, std::chrono::duration<double, std::milli>(lNow - lSendStartTime).count());
365373
I().mSentOutRate = double(I().mSentOutStfs) / std::chrono::duration<double>(lNow - sStartOfStfSending).count();
366374
I().mStfDataTimeSamples.Fill(lTimeMs);
367-
} else {
368-
// DDLOGF(fair::Severity::ERROR, "Dropping stf size={}", lStf->getDataSize());
375+
}
376+
377+
// check if we should exit:
378+
// 1. max number of stf set, or
379+
// 2. file reply used without loop parameter
380+
if ( (I().mMaxBuiltStfs > 0) && (I().mSentOutStfsTotal == I().mMaxBuiltStfs) ) {
381+
DDLOGF(fair::Severity::INFO, "Maximum number of sent SubTimeFrames reached. Exiting.");
382+
break;
369383
}
370384
}
371385

386+
// leaving the output thread, send end of the stream info
387+
if (dplEnabled()) {
388+
o2::framework::SourceInfoHeader lDplExitHdr;
389+
lDplExitHdr.state = o2::framework::InputChannelState::Completed;
390+
auto lDoneStack = Stack(
391+
DataHeader(gDataDescriptionInfo, gDataOriginAny, 0, 0),
392+
o2::framework::DataProcessingHeader(),
393+
lDplExitHdr
394+
);
395+
396+
// Send a multipart
397+
FairMQParts lCompletedMsg;
398+
auto lNoFree = [](void*, void*) { /* stack */ };
399+
lCompletedMsg.AddPart(lOutputChan.NewMessage(lDoneStack.data(), lDoneStack.size(), lNoFree));
400+
lCompletedMsg.AddPart(lOutputChan.NewMessage());
401+
lOutputChan.Send(lCompletedMsg);
402+
403+
DDLOGF(fair::Severity::INFO, "Sent Source Completed message to DPL.");
404+
// NOTE: no guarantees this will be sent out
405+
std::this_thread::sleep_for(2s);
406+
}
407+
408+
I().mRunning = false; // trigger stop via CondRun()
409+
372410
DDLOGF(fair::Severity::info, "Exiting StfOutputThread...");
373411
}
374412

@@ -397,7 +435,12 @@ bool StfBuilderDevice::ConditionalRun()
397435
{
398436
// nothing to do here sleep for awhile
399437
std::this_thread::sleep_for(500ms);
400-
return true;
438+
439+
if (!I().mRunning) {
440+
DDLOGF(fair::Severity::DEBUG, "ConditionalRun() returning false.");
441+
}
442+
443+
return I().mRunning;
401444
}
402445

403446
bpo::options_description StfBuilderDevice::getDetectorProgramOptions() {

src/StfBuilder/StfBuilderDevice.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class StfBuilderDevice : public DataDistDevice,
6565
static constexpr const char* OptionKeyDplChannelName = "dpl-channel-name";
6666
static constexpr const char* OptionKeyStandalone = "stand-alone";
6767
static constexpr const char* OptionKeyMaxBufferedStfs = "max-buffered-stfs";
68+
static constexpr const char* OptionKeyMaxBuiltStfs = "max-built-stfs";
6869

6970
static constexpr const char* OptionKeyStfDetector = "detector";
7071
static constexpr const char* OptionKeyRhdVer = "detector-rdh";
@@ -201,6 +202,7 @@ class StfBuilderDevice : public DataDistDevice,
201202
bool mStandalone;
202203
bool mDplEnabled;
203204
std::int64_t mMaxStfsInPipeline;
205+
std::uint64_t mMaxBuiltStfs;
204206
bool mPipelineLimit;
205207

206208
/// Input Interface handler
@@ -209,6 +211,7 @@ class StfBuilderDevice : public DataDistDevice,
209211

210212
/// Internal threads
211213
std::thread mOutputThread;
214+
std::atomic_bool mRunning = true;
212215
std::atomic_bool mPaused = false;
213216

214217
/// File sink

src/StfBuilder/runStfBuilderDevice.cxx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ void addCustomOptions(bpo::options_description& options)
4545
bpo::value<std::int64_t>()->default_value(-1),
4646
"Maximum number of buffered SubTimeFrames before starting to drop data (unlimited: -1)."
4747
)
48+
(
49+
o2::DataDistribution::StfBuilderDevice::OptionKeyMaxBuiltStfs,
50+
bpo::value<std::uint64_t>()->default_value(0),
51+
"Maximum number of built and forwarded (Sub)TimeFrames before closing (unlimited: 0, default)."
52+
)
4853
(
4954
o2::DataDistribution::StfBuilderDevice::OptionKeyOutputChannelName,
5055
bpo::value<std::string>()->default_value("builder-stf-channel"),

src/common/SubTimeFrameFileSource.cxx

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ void SubTimeFrameFileSource::DataInjectThread()
244244
mReadStfQueue.size(), sNumSentStfs / getElapsedTime());
245245
}
246246

247+
mPipelineI.close(mPipelineStageOut);
248+
247249
DDLOGF(fair::Severity::INFO, "Exiting file source inject thread...");
248250
}
249251

@@ -257,13 +259,13 @@ void SubTimeFrameFileSource::DataHandlerThread()
257259
mFilesVector = getDataFileList();
258260
}
259261

260-
if (mFilesVector.empty()) {
261-
DDLOGF(fair::Severity::ERROR, "(Sub)TimeFrame directory contains no data files.");
262-
return;
263-
}
264-
265262
while (mRunning) {
266263

264+
if (mFilesVector.empty()) {
265+
DDLOGF(fair::Severity::ERROR, "(Sub)TimeFrame directory contains no data files.");
266+
break;
267+
}
268+
267269
for (const auto &lFileName : mFilesVector) {
268270
if (!mRunning) {
269271
break; // stop looping over files
@@ -300,6 +302,10 @@ void SubTimeFrameFileSource::DataHandlerThread()
300302
break;
301303
}
302304
}
305+
306+
// notify the injection thread to stop
307+
mReadStfQueue.stop();
308+
303309
DDLOGF(fair::Severity::INFO, "Exiting file source data load thread...");
304310
}
305311

src/common/base/ConcurrentQueue.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,14 @@ class ConcurrentContainerImpl
6868
mImpl->mCond.notify_all();
6969
}
7070

71+
// push a new element to the queue, while in the running state
72+
// return false (fail) if not running
7173
template <typename... Args>
7274
bool push(Args&&... args)
7375
{
7476
std::unique_lock<std::mutex> lLock(mImpl->mLock);
7577
if (!mImpl->mRunning) {
78+
mImpl->mCond.notify_all(); // just in case someone is waiting
7679
return false;
7780
}
7881

@@ -89,6 +92,8 @@ class ConcurrentContainerImpl
8992
return true;
9093
}
9194

95+
// pop an element from the queue. Caller will block while the queue is running
96+
// returns true on success
9297
bool pop(T& d)
9398
{
9499
std::unique_lock<std::mutex> lLock(mImpl->mLock);
@@ -173,6 +178,8 @@ class ConcurrentContainerImpl
173178
return mImpl->mContainer.size();
174179
}
175180

181+
bool empty() const { return size() == 0; }
182+
176183
bool is_running() const { return mImpl->mRunning; }
177184

178185
private:
@@ -260,6 +267,18 @@ class IFifoPipeline
260267
return false;
261268
}
262269

270+
// notify the receiver the queue is closed
271+
void close(unsigned pStage)
272+
{
273+
assert(pStage < mPipelineQueues.size());
274+
auto lNextStage = getNextPipelineStage(pStage);
275+
assert((lNextStage <= mPipelineQueues.size()) && "next stage larger than expected");
276+
277+
if (lNextStage < mPipelineQueues.size()) {
278+
mPipelineQueues[lNextStage].stop();
279+
}
280+
}
281+
263282
T dequeue(unsigned pStage)
264283
{
265284
T t;

0 commit comments

Comments
 (0)