Skip to content

Commit 47b1076

Browse files
committed
shm: use shm transport to construct shm regions and messages
1 parent 3414ad0 commit 47b1076

10 files changed

Lines changed: 81 additions & 140 deletions

src/StfBuilder/StfBuilderDevice.cxx

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ void StfBuilderDevice::Init()
6565
{
6666
DDLOGF(fair::Severity::DEBUG, "StfBuilderDevice::Init()");
6767
mI = std::make_unique<StfBuilderInstance>();
68-
mMemI = std::make_unique<MemoryResources>();
68+
mMemI = std::make_unique<MemoryResources>(this->AddTransport(fair::mq::Transport::SHM));
6969

7070
I().mFileSource = std::make_unique<SubTimeFrameFileSource>(*this, eStfFileSourceOut);
7171
I().mReadoutInterface = std::make_unique<StfInputInterface>(*this);
@@ -189,31 +189,6 @@ void StfBuilderDevice::InitTask()
189189
DDLOGF(fair::Severity::WARNING, "Running in standalone mode and with STF file sink disabled. Data will be lost.");
190190
}
191191

192-
// channel for FileSource: stf or dpl, or generic one in case of standalone
193-
if (isStandalone()) {
194-
// create default FMQ shm channel
195-
auto lTransportFactory = FairMQTransportFactory::CreateTransportFactory("shmem", "", GetConfig());
196-
if (!lTransportFactory) {
197-
DDLOGF(fair::Severity::ERROR, "Creating transport factory failed!");
198-
exit(-1);
199-
}
200-
201-
I().mStandaloneChannel = std::make_unique<FairMQChannel>(
202-
"standalone-chan[0]" , // name
203-
"pair", // type
204-
"bind", // method
205-
"ipc:///tmp/standalone-chan-stfb", // address
206-
lTransportFactory
207-
);
208-
209-
// mStandaloneChannel.Init();
210-
I().mStandaloneChannel->Init();
211-
// mStandaloneChannel->BindEndpoint("ipc:///tmp/standalone-chan");
212-
I().mStandaloneChannel->Validate();
213-
}
214-
215-
DDLOGF(fair::Severity::info, "Sending data to channel: {}", getOutputChannel().GetName());
216-
217192
// try to see if channels have been configured
218193
{
219194
if (!I().mFileSource->enabled()) {
@@ -241,8 +216,7 @@ void StfBuilderDevice::InitTask()
241216
I().mFileSink->start();
242217

243218
// start file source
244-
// channel for FileSource: stf or dpl, or generic one in case of standalone
245-
I().mFileSource->start(getOutputChannel(), MemI(), I().mDplEnabled);
219+
I().mFileSource->start(MemI(), I().mDplEnabled);
246220

247221
// start a thread for readout process
248222
if (!I().mFileSource->enabled()) {
@@ -305,12 +279,11 @@ void StfBuilderDevice::StfOutputThread()
305279
std::unique_ptr<InterleavedHdrDataSerializer> lStfSerializer;
306280
std::unique_ptr<StfToDplAdapter> lStfDplAdapter;
307281

308-
// cannot get the channels in standalone mode
309-
auto& lOutputChan = getOutputChannel();
310-
311-
DDLOGF(fair::Severity::info, "StfOutputThread: sending data to channel: {}", lOutputChan.GetName());
312-
313282
if (!isStandalone()) {
283+
// cannot get the channels in standalone mode
284+
auto& lOutputChan = getOutputChannel();
285+
DDLOGF(fair::Severity::info, "StfOutputThread: sending data to channel: {}", lOutputChan.GetName());
286+
314287
if (!dplEnabled()) {
315288
lStfSerializer = std::make_unique<InterleavedHdrDataSerializer>(lOutputChan);
316289
} else {
@@ -329,8 +302,8 @@ void StfBuilderDevice::StfOutputThread()
329302
// decrement the stf counter
330303
I().mNumStfs--;
331304

332-
DDLOGF_RL(2000, fair::Severity::DEBUG, "Sending an STF out. stf_id={} channel={} stf_size={} unique_equipments={}",
333-
lStf->header().mId, lOutputChan.GetName(), lStf->getDataSize(), lStf->getEquipmentIdentifiers().size());
305+
DDLOGF_RL(2000, fair::Severity::DEBUG, "Sending an STF out. stf_id={} stf_size={} unique_equipments={}",
306+
lStf->header().mId, lStf->getDataSize(), lStf->getEquipmentIdentifiers().size());
334307

335308
// get data size sample
336309
I().mStfSizeSamples.Fill(lStf->getDataSize());
@@ -394,6 +367,7 @@ void StfBuilderDevice::StfOutputThread()
394367
);
395368

396369
// Send a multipart
370+
auto& lOutputChan = getOutputChannel();
397371
FairMQParts lCompletedMsg;
398372
auto lNoFree = [](void*, void*) { /* stack */ };
399373
lCompletedMsg.AddPart(lOutputChan.NewMessage(lDoneStack.data(), lDoneStack.size(), lNoFree));

src/StfBuilder/StfBuilderDevice.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,6 @@ class StfBuilderDevice : public DataDistDevice,
9191
const std::string& getDplChannelName() const { return I().mDplChannelName; }
9292

9393
auto& getOutputChannel() {
94-
if (isStandalone()) {
95-
return *I().mStandaloneChannel;
96-
}
97-
9894
if (dplEnabled()) {
9995
return this->GetChannel(I().mDplChannelName);
10096
}
@@ -218,7 +214,6 @@ class StfBuilderDevice : public DataDistDevice,
218214
std::unique_ptr<SubTimeFrameFileSink> mFileSink;
219215

220216
/// File source
221-
std::unique_ptr<FairMQChannel> mStandaloneChannel;
222217
std::unique_ptr<SubTimeFrameFileSource> mFileSource;
223218

224219
/// Info thread
@@ -233,10 +228,10 @@ class StfBuilderDevice : public DataDistDevice,
233228

234229
std::unique_ptr<StfBuilderInstance> mI;
235230
std::unique_ptr<MemoryResources> mMemI;
236-
231+
const StfBuilderInstance& I() const { return *mI; }
232+
public:
237233
StfBuilderInstance& I() { return *mI; }
238234
MemoryResources& MemI() { return *mMemI; }
239-
const StfBuilderInstance& I() const { return *mI; }
240235
};
241236

242237
}

src/StfBuilder/StfBuilderInput.cxx

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,9 @@ void StfInputInterface::start(const std::size_t pNumBuilders)
3939
mBuilderInputQueues.clear();
4040
mBuilderInputQueues.resize(mNumBuilders);
4141

42-
// Reference to the output or DPL channel
43-
// const auto &lOutChanName = mDevice.getOutputChannelName();
44-
auto& lOutputChan = mDevice.getOutputChannel();
45-
4642
// NOTE: create the mStfBuilders first to avid resizing the vector; then threads
4743
for (std::size_t i = 0; i < mNumBuilders; i++) {
48-
mStfBuilders.emplace_back(lOutputChan, mDevice.dplEnabled());
44+
mStfBuilders.emplace_back(mDevice.MemI(), mDevice.dplEnabled());
4945
}
5046

5147
for (std::size_t i = 0; i < mNumBuilders; i++) {

src/TfBuilder/TfBuilderDevice.cxx

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ TfBuilderDevice::~TfBuilderDevice()
4646

4747
void TfBuilderDevice::Init()
4848
{
49-
mMemI = std::make_unique<MemoryResources>();
49+
mMemI = std::make_unique<MemoryResources>(this->AddTransport(fair::mq::Transport::SHM));
5050
}
5151

5252
void TfBuilderDevice::Reset()
@@ -111,27 +111,6 @@ void TfBuilderDevice::InitTask()
111111
DDLOGF(fair::Severity::INFO, "Not sending to DPL.");
112112
}
113113

114-
// channel for FileSource: stf or dpl, or generic one in case of standalone
115-
if (mStandalone) {
116-
// create default FMQ shm channel
117-
auto lTransportFactory = FairMQTransportFactory::CreateTransportFactory("shmem", "", GetConfig());
118-
if (!lTransportFactory) {
119-
DDLOG(fair::Severity::ERROR) << "Creating transport factory failed!";
120-
throw "Transport factory";
121-
return;
122-
}
123-
mStandaloneChannel = std::make_unique<FairMQChannel>(
124-
"standalone-chan[0]" , // name
125-
"pair", // type
126-
"bind", // method
127-
"ipc:///tmp/standalone-chan-tfb", // address
128-
lTransportFactory
129-
);
130-
131-
mStandaloneChannel->Init();
132-
mStandaloneChannel->Validate();
133-
}
134-
135114
// start the info thread
136115
mInfoThread = create_thread_member("tfb_info", &TfBuilderDevice::InfoThread, this);
137116
}
@@ -157,10 +136,12 @@ bool TfBuilderDevice::start()
157136

158137
// we reached the scheduler instance, initialize everything else
159138
mRunning = true;
139+
auto lShmTransport = this->AddTransport(fair::mq::Transport::SHM);
160140

161141
if (!mStandalone && dplEnabled()) {
162142
auto& lOutputChan = GetChannel(getDplChannelName(), 0);
163-
mTfBuilder = std::make_unique<TimeFrameBuilder>(lOutputChan, mTfBufferSize, dplEnabled());
143+
144+
mTfBuilder = std::make_unique<TimeFrameBuilder>(MemI(), mTfBufferSize, 512 << 20 /* config */, dplEnabled());
164145
mTfDplAdapter = std::make_unique<StfToDplAdapter>(lOutputChan);
165146
}
166147

@@ -178,11 +159,7 @@ bool TfBuilderDevice::start()
178159
}
179160

180161
// start file source
181-
if (mStandalone) {
182-
mFileSource.start(*mStandaloneChannel, MemI(), false);
183-
} else {
184-
mFileSource.start(GetChannel(mDplChannelName), MemI(), mDplEnabled);
185-
}
162+
mFileSource.start(MemI(), mStandalone ? false : mDplEnabled);
186163

187164
return true;
188165
}

src/TfBuilder/TfBuilderDevice.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ class TfBuilderDevice : public DataDistDevice,
144144
SubTimeFrameFileSink mFileSink;
145145

146146
/// File source
147-
std::unique_ptr<FairMQChannel> mStandaloneChannel;
148147
SubTimeFrameFileSource mFileSource;
149148

150149
/// TF forwarding thread

src/common/SubTimeFrameBuilder.cxx

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,12 @@ using namespace o2::header;
3434
/// SubTimeFrameReadoutBuilder
3535
////////////////////////////////////////////////////////////////////////////////
3636

37-
SubTimeFrameReadoutBuilder::SubTimeFrameReadoutBuilder(FairMQChannel& pChan, bool pDplEnabled)
38-
: mStf(nullptr),
39-
mDplEnabled(pDplEnabled)
37+
SubTimeFrameReadoutBuilder::SubTimeFrameReadoutBuilder(MemoryResources &pMemRes, bool pDplEnabled)
38+
: mStf(nullptr), mDplEnabled(pDplEnabled),mMemRes(pMemRes)
4039
{
41-
mHeaderMemRes = std::make_unique<RegionAllocatorResource<alignof(o2::header::DataHeader)>>(
40+
mMemRes.mHeaderMemRes = std::make_unique<RegionAllocatorResource<alignof(o2::header::DataHeader)>>(
4241
"O2HeadersRegion",
43-
pChan,
42+
*mMemRes.mShmTransport,
4443
std::size_t(256) << 20, /* make configurable */
4544
mDplEnabled ?
4645
sizeof(DataHeader) + sizeof(o2::framework::DataProcessingHeader) :
@@ -190,14 +189,14 @@ void SubTimeFrameReadoutBuilder::addHbFrames(
190189
o2::framework::DataProcessingHeader{mStf->header().mId}
191190
);
192191

193-
lHdrMsg = mHeaderMemRes->NewFairMQMessage(lStack.size());
192+
lHdrMsg = mMemRes.newHeaderMessage(lStack.size());
194193
if (lHdrMsg) {
195194
std::memcpy(lHdrMsg->GetData(), lStack.data(), lStack.size());
196195
}
197196
} else {
198197
auto lHdrMsgStack = Stack(lDataHdr);
199198

200-
lHdrMsg = mHeaderMemRes->NewFairMQMessage(lHdrMsgStack.size());
199+
lHdrMsg = mMemRes.newHeaderMessage(lHdrMsgStack.size());
201200
if (lHdrMsg) {
202201
std::memcpy(lHdrMsg->GetData(), lHdrMsgStack.data(), lHdrMsgStack.size());
203202
}
@@ -229,20 +228,20 @@ std::unique_ptr<SubTimeFrame> SubTimeFrameReadoutBuilder::getStf()
229228
/// SubTimeFrameFileBuilder
230229
////////////////////////////////////////////////////////////////////////////////
231230

232-
SubTimeFrameFileBuilder::SubTimeFrameFileBuilder(FairMQChannel& pChan, MemoryResources &pMemRes,
231+
SubTimeFrameFileBuilder::SubTimeFrameFileBuilder(MemoryResources &pMemRes,
233232
const std::size_t pDataSegSize, const std::size_t pHdrSegSize, bool pDplEnabled)
234233
: mMemRes(pMemRes), mDplEnabled(pDplEnabled)
235234
{
236235
mMemRes.mHeaderMemRes = std::make_unique<RegionAllocatorResource<alignof(o2::header::DataHeader)>>(
237236
"O2HeadersRegion_FileSource",
238-
pChan,
237+
*mMemRes.mShmTransport,
239238
pHdrSegSize,
240239
0
241240
);
242241

243242
mMemRes.mDataMemRes = std::make_unique<RegionAllocatorResource<>>(
244243
"O2DataRegion_FileSource",
245-
pChan,
244+
*mMemRes.mShmTransport,
246245
pDataSegSize,
247246
0 // TODO: GPU flags
248247
);
@@ -307,20 +306,20 @@ void SubTimeFrameFileBuilder::adaptHeaders(SubTimeFrame *pStf)
307306
/// TimeFrameBuilder
308307
////////////////////////////////////////////////////////////////////////////////
309308

310-
TimeFrameBuilder::TimeFrameBuilder(FairMQChannel& pChan, const std::size_t pDataSegSize, bool pDplEnabled)
311-
: mDplEnabled(pDplEnabled),
312-
mOutputChan(pChan)
309+
TimeFrameBuilder::TimeFrameBuilder(MemoryResources &pMemRes,
310+
const std::size_t pDataSegSize, const std::size_t pHdrSegSize, bool pDplEnabled)
311+
: mDplEnabled(pDplEnabled), mMemRes(pMemRes)
313312
{
314-
mHeaderMemRes = std::make_unique<RegionAllocatorResource<alignof(o2::header::DataHeader)>>(
313+
mMemRes.mHeaderMemRes = std::make_unique<RegionAllocatorResource<alignof(o2::header::DataHeader)>>(
315314
"O2HeadersRegion",
316-
pChan,
317-
std::size_t(256) << 20, /* make configurable */
315+
*mMemRes.mShmTransport,
316+
pHdrSegSize,
318317
0 /* dont need registration flags for headers */
319318
);
320319

321-
mDataMemRes = std::make_unique<RegionAllocatorResource<>>(
320+
mMemRes.mDataMemRes = std::make_unique<RegionAllocatorResource<>>(
322321
"O2DataRegion_FileSource",
323-
pChan,
322+
*mMemRes.mShmTransport,
324323
pDataSegSize,
325324
0 // TODO: GPU flags
326325
);
@@ -332,7 +331,7 @@ void TimeFrameBuilder::adaptHeaders(SubTimeFrame *pStf)
332331
return;
333332
}
334333

335-
const auto lOutChannelType = mOutputChan.GetTransportType();
334+
const auto lOutChannelType = fair::mq::Transport::SHM;
336335

337336
// adapt headers for DPL
338337
for (auto& lDataIdentMapIter : pStf->mData) {
@@ -377,11 +376,7 @@ void TimeFrameBuilder::adaptHeaders(SubTimeFrame *pStf)
377376
o2::framework::DataProcessingHeader{pStf->header().mId}
378377
);
379378

380-
if (lOutChannelType == fair::mq::Transport::SHM) {
381-
lStfDataIter.mHeader = newHeaderMessage(lStack.size());
382-
} else {
383-
lStfDataIter.mHeader = mOutputChan.NewMessage(lStack.size());
384-
}
379+
lStfDataIter.mHeader = newHeaderMessage(lStack.size());
385380

386381
if (lStfDataIter.mHeader) {
387382
assert(lStfDataIter.mHeader->GetSize() >= sizeof (DataHeader));

src/common/SubTimeFrameFileSource.cxx

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,12 @@ using namespace std::chrono_literals;
4040
/// SubTimeFrameFileSource
4141
////////////////////////////////////////////////////////////////////////////////
4242

43-
void SubTimeFrameFileSource::start(FairMQChannel& pDstChan, MemoryResources &pMemRes, const bool pDplEnabled)
43+
void SubTimeFrameFileSource::start(MemoryResources &pMemRes, const bool pDplEnabled)
4444
{
4545
if (enabled()) {
46-
mDstChan = &pDstChan;
4746
mDplEnabled = pDplEnabled;
4847

4948
mFileBuilder = std::make_unique<SubTimeFrameFileBuilder>(
50-
pDstChan,
5149
pMemRes,
5250
mRegionSizeMB << 20,
5351
mHdrRegionSizeMB << 20,

0 commit comments

Comments
 (0)