Skip to content

Commit 7f2db93

Browse files
committed
stfsender: support cancellation of init process
1 parent a05e426 commit 7f2db93

4 files changed

Lines changed: 48 additions & 9 deletions

File tree

src/StfSender/StfSenderDevice.cxx

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,7 @@ void StfSenderDevice::Reset()
153153
DDDLOG("StfBuilderDevice::Reset()");
154154

155155
I().mDeviceRunning = false;
156-
// wait the Info thread, before closing mTfSchedulerRpcClient
157-
if (I().mInfoThread.joinable()) {
158-
I().mInfoThread.join();
159-
}
156+
// stop the receiver thread
160157
if (I().mReceiverThread.joinable()) {
161158
I().mReceiverThread.join();
162159
}
@@ -222,11 +219,20 @@ void StfSenderDevice::InitTask()
222219
if (!standalone()) {
223220
// contact the scheduler on gRPC
224221
while (I().mTfSchedulerRpcClient.should_retry_start() && !I().mTfSchedulerRpcClient.start(I().mDiscoveryConfig)) {
225-
std::this_thread::sleep_for(150ms);
222+
223+
// try to reach the scheduler unless we should exit
224+
if (NewStatePending()) {
225+
IDDLOG("InitTask: The control system requested abort.");
226+
AbortInitTask();
227+
ChangeState(fair::mq::Transition::ErrorFound);
228+
return;
229+
}
230+
231+
std::this_thread::sleep_for(250ms);
226232
}
227233

228234
// Did we fail to connect to the TfScheduler?
229-
if (!I().mTfSchedulerRpcClient.should_retry_start()) {
235+
if (!I().mTfSchedulerRpcClient.started()) {
230236
EDDLOG("InitTask: Failed to connect to TfScheduler. Exiting.");
231237
ChangeState(fair::mq::Transition::ErrorFound);
232238
return;
@@ -298,7 +304,7 @@ void StfSenderDevice::ResetTask()
298304
{
299305
I().mRunning = false;
300306

301-
// Stop the pipeline
307+
// Stop the pipeline
302308
I().stopPipeline();
303309

304310
// stop the receiver thread

src/StfSender/StfSenderDevice.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,16 @@ class StfSenderDevice : public DataDistDevice
7676
virtual void InitTask() override final;
7777
virtual void ResetTask() override final;
7878

79+
void AbortInitTask() {
80+
DDDLOG("Aborting InitTask...");
81+
if (mI && I().mDiscoveryConfig) {
82+
auto& lStatus = I().mDiscoveryConfig->status();
83+
lStatus.mutable_info()->set_process_state(BasicInfo::ABORTED);
84+
I().mDiscoveryConfig->write();
85+
}
86+
ResetTask();
87+
}
88+
7989
virtual void PreRun() final;
8090
virtual void PostRun() final;
8191
virtual bool ConditionalRun() final;
@@ -88,6 +98,21 @@ class StfSenderDevice : public DataDistDevice
8898
StfSenderInstance()
8999
: IFifoPipeline(ePipelineSize) {}
90100

101+
~StfSenderInstance() {
102+
mRunning = false;
103+
mDeviceRunning = false;
104+
105+
// stop the info thread
106+
if (mInfoThread.joinable()) {
107+
mInfoThread.join();
108+
}
109+
110+
// stop the receiver thread
111+
if (mReceiverThread.joinable()) {
112+
mReceiverThread.join();
113+
}
114+
}
115+
91116
/// Configuration
92117
std::string mInputChannelName;
93118
bool mStandalone = false;

src/StfSender/StfSenderOutputUCX.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ void StfSenderOutputUCX::stop()
148148
for (auto &lWorker : mDataWorkers) {
149149
ucp_worker_destroy(lWorker.ucp_worker);
150150
}
151-
ucp_cleanup(ucp_context);
152151
}
153152
DDDLOG("StfSenderOutputUCX::stop: closed all connections.");
154153

@@ -159,6 +158,8 @@ void StfSenderOutputUCX::stop()
159158
ucx::util::destroy_rkey_for_region(ucp_context, lMapping.ucp_mem, lMapping.ucp_rkey_buf);
160159
}
161160
}
161+
162+
ucp_cleanup(ucp_context);
162163
DDDLOG("StfSenderOutputUCX::stop: revoked all rkeys.");
163164
}
164165

src/common/rpc/TfSchedulerRpcClient.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class TfSchedulerRpcClient {
5959
return false;
6060
}
6161

62-
IDDLOG_RL(1000, "TfScheduler instance configuration not found. Retrying.");
62+
IDDLOG_RL(5000, "TfScheduler instance configuration not found. Retrying.");
6363
return false;
6464
}
6565

@@ -79,15 +79,20 @@ class TfSchedulerRpcClient {
7979

8080
IDDLOG("Connected to TfScheduler RPC endpoint={}", lEndpoint);
8181

82+
mStarted = true;
83+
8284
return true;
8385
}
8486

8587
void stop() {
8688
mTfSchedulerConf.Clear();
8789
mStub.reset(nullptr);
8890
mChannel.reset();
91+
mStarted = false;
8992
}
9093

94+
bool started() const { return mStarted; }
95+
9196
void updateTimeInformation(BasicInfo &pInfo);
9297

9398
// rpc HeartBeat(BasicInfo) returns (google.protobuf.Empty) { }
@@ -137,6 +142,8 @@ class TfSchedulerRpcClient {
137142

138143
// keep looking for the TfScheduler instance
139144
bool mShouldRetryStart = true;
145+
// mark successful start
146+
bool mStarted = false;
140147
};
141148

142149
} /* namespace o2::DataDistribution */

0 commit comments

Comments
 (0)