Skip to content

Commit 3b3936b

Browse files
committed
sched: implement datadist controller rpcs
1 parent ae8f4f3 commit 3b3936b

17 files changed

Lines changed: 510 additions & 157 deletions

src/DataDistControl/DataDistControl.proto

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ message PartitionInfo {
2525
}
2626

2727
enum PartitionState {
28-
__IGNORE = 0;
28+
IGNORE__ = 0;
29+
2930
PARTITION_UNKNOWN = 1;
3031
// Partition not known to DataDistControl. Initialize by calling PartitionInitialize()
3132

@@ -43,7 +44,10 @@ enum PartitionState {
4344
// All components configured, ready for dataflow commands
4445

4546
PARTITION_TERMINATING = 6;
46-
// Partition is terminating. EPN-FLP connections are cleanly terminated.
47+
// Partition is terminating. EPN-FLP connections will be cleanly closed.
48+
49+
PARTITION_TERMINATED = 7;
50+
// Partition is terminated. TfScheduler does not accept further requests
4751
}
4852

4953

src/TfScheduler/TfSchedulerConnManager.cxx

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,9 @@ void TfSchedulerConnManager::disconnectTfBuilder(const TfBuilderConfigStatus &pT
116116
pResponse.set_status(0);
117117
const std::string &lTfBuilderId = pTfBuilderStatus.info().process_id();
118118

119-
std::scoped_lock lLock(mStfSenderClientsLock);
120-
121-
deleteTfBuilderRpcClient(lTfBuilderId);
122-
123-
if (!stfSendersReady()) {
124-
IDDLOG("TfBuilder Connection error: StfSenders not ready.");
125-
pResponse.set_status(ERROR_STF_SENDERS_NOT_READY);
126-
return;
119+
{
120+
std::scoped_lock lLock(mStfSenderClientsLock);
121+
deleteTfBuilderRpcClient(lTfBuilderId);
127122
}
128123

129124
TfBuilderEndpoint lParam;
@@ -137,31 +132,70 @@ void TfSchedulerConnManager::disconnectTfBuilder(const TfBuilderConfigStatus &pT
137132
continue; // not connected
138133
}
139134

140-
lParam.set_tf_builder_id(lTfBuilderId);
141-
lParam.set_endpoint(lSocketInfo.endpoint());
135+
{ // lock clients
136+
std::scoped_lock lLock(mStfSenderClientsLock);
137+
138+
if (mStfSenderRpcClients.count(lStfSenderId) == 0) {
139+
WDDLOG("disconnectTfBuilder: Unknown StfSender. stfs_id={}", lStfSenderId);
140+
continue;
141+
}
142142

143-
if (mStfSenderRpcClients.count(lStfSenderId) == 0) {
144-
WDDLOG("disconnectTfBuilder: Unknown StfSender. stfs_id={}", lStfSenderId);
145-
continue;
143+
lParam.set_tf_builder_id(lTfBuilderId);
144+
lParam.set_endpoint(lSocketInfo.endpoint());
145+
StatusResponse lResponse;
146+
147+
auto &lRpcClient = mStfSenderRpcClients[lSocketInfo.peer_id()];
148+
if(!lRpcClient->DisconnectTfBuilderRequest(lParam, lResponse).ok()) {
149+
EDDLOG("StfSender Connection error: gRPC error. stfs_id={} tfb_id={}", lStfSenderId, lTfBuilderId);
150+
pResponse.set_status(ERROR_GRPC_STF_SENDER);
151+
continue;
152+
}
153+
// check StfSender status
154+
if (lResponse.status() != 0) {
155+
EDDLOG("TfBuilder Connection error. stfs_id={} tfb_id={} response={}", lStfSenderId, lTfBuilderId, lResponse.status());
156+
pResponse.set_status(ERROR_STF_SENDER_CONNECTING);
157+
continue;
158+
}
146159
}
160+
}
161+
}
147162

148-
auto &lRpcClient = mStfSenderRpcClients[lSocketInfo.peer_id()];
163+
// Partition RPC: keep sending until all TfBuilders are gone
164+
bool TfSchedulerConnManager::requestTfBuildersTerminate() {
165+
std::vector<std::string> lFailedRpcsForDeletion;
149166

150-
StatusResponse lResponse;
151-
if(!lRpcClient->DisconnectTfBuilderRequest(lParam, lResponse).ok()) {
152-
EDDLOG("TfBuilder Connection error: gRPC error. stfs_id={} tfb_id={}", lStfSenderId, lTfBuilderId);
153-
pResponse.set_status(ERROR_GRPC_STF_SENDER);
154-
break;
167+
std::scoped_lock lLock(mStfSenderClientsLock);
168+
169+
for (auto &lTfBuilder : mTfBuilderRpcClients) {
170+
if (!lTfBuilder.second.mClient->TerminatePartition()) {
171+
lFailedRpcsForDeletion.emplace_back(lTfBuilder.first);
155172
}
173+
}
156174

157-
// check StfSender status
158-
if (lResponse.status() != 0) {
159-
EDDLOG("TfBuilder Connection error. stfs_id={} tfb_id={} response={}",
160-
lStfSenderId, lTfBuilderId, lResponse.status());
161-
pResponse.set_status(ERROR_STF_SENDER_CONNECTING);
162-
break;
175+
for (const auto &lId : lFailedRpcsForDeletion) {
176+
deleteTfBuilderRpcClient(lId);
177+
}
178+
179+
return mTfBuilderRpcClients.size() == 0;
180+
}
181+
182+
// Partition RPC: notify all StfSenders and remove rpc clients
183+
bool TfSchedulerConnManager::requestStfSendersTerminate() {
184+
std::vector<std::string> lFailedRpcsForDeletion;
185+
186+
std::scoped_lock lLock(mStfSenderClientsLock);
187+
188+
for (auto &lStfSender : mStfSenderRpcClients) {
189+
if (!lStfSender.second->TerminatePartition()) {
190+
lFailedRpcsForDeletion.emplace_back(lStfSender.first);
163191
}
164192
}
193+
194+
for (const auto &lId : lFailedRpcsForDeletion) {
195+
deleteTfBuilderRpcClient(lId);
196+
}
197+
198+
return mTfBuilderRpcClients.size() == 0;
165199
}
166200

167201

@@ -260,10 +294,10 @@ void TfSchedulerConnManager::StfSenderMonitoringThread()
260294

261295
WDDLOG_RL(1000, "Waiting for StfSenders. ready={} total={}", lNumStfSenders, mPartitionInfo.mStfSenderIdList.size());
262296
lSleep = 250ms;
297+
} else {
298+
mStfSenderState = STF_SENDER_STATE_OK;
263299
}
264300

265-
mStfSenderState = STF_SENDER_STATE_OK;
266-
267301
// wait for drop futures
268302
{
269303
{

src/TfScheduler/TfSchedulerConnManager.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class TfSchedulerConnManager
6262
using namespace std::chrono_literals;
6363

6464
while (!mStfSenderRpcClients.start()) {
65-
std::this_thread::sleep_for(1s);
65+
return false; // we'll be called back
6666
}
6767

6868
mRunning = true;
@@ -98,6 +98,10 @@ class TfSchedulerConnManager
9898

9999
void StfSenderMonitoringThread();
100100

101+
/// Partition RPCs
102+
bool requestTfBuildersTerminate();
103+
bool requestStfSendersTerminate();
104+
101105
/// External requests by TfBuilders
102106
void connectTfBuilder(const TfBuilderConfigStatus &pTfBuilderStatus, TfBuilderConnectionResponse &pResponse /*out*/);
103107
void disconnectTfBuilder(const TfBuilderConfigStatus &pTfBuilderStatus, StatusResponse &pResponse /*out*/);
@@ -127,7 +131,13 @@ class TfSchedulerConnManager
127131
return mStfSenderRpcClients.checkStfSenderRpcConn(lStfSenderId);
128132
}
129133

134+
void deleteStfSenderRpcClient(const std::string &pId)
135+
{
136+
mStfSenderRpcClients.remove(pId);
137+
}
138+
130139
StfSenderState getStfSenderState() const { return mStfSenderState; }
140+
std::size_t getStfSenderCount() const { return mStfSenderRpcClients.size(); }
131141

132142
private:
133143
/// Partition information

src/TfScheduler/TfSchedulerDevice.cxx

Lines changed: 33 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -42,95 +42,67 @@ TfSchedulerDevice::~TfSchedulerDevice()
4242
void TfSchedulerDevice::InitTask()
4343
{
4444
DataDistLogger::SetThreadName("tfs-main");
45-
46-
// Discovery
47-
mDiscoveryConfig = std::make_shared<ConsulTfSchedulerService>(ProcessType::TfSchedulerService, Config::getEndpointOption(*GetConfig()));
48-
49-
auto &lStatus = mDiscoveryConfig->status();
50-
51-
lStatus.mutable_info()->set_type(TfSchedulerService);
52-
lStatus.mutable_info()->set_process_id(Config::getIdOption(*GetConfig()));
53-
lStatus.mutable_info()->set_ip_address(Config::getNetworkIfAddressOption(*GetConfig()));
54-
55-
mDiscoveryConfig->write();
56-
57-
// start the service thread
58-
mServiceThread = create_thread_member("sched_service", &TfSchedulerDevice::TfSchedulerServiceThread, this);
5945
}
6046

6147
void TfSchedulerDevice::PreRun()
6248
{
63-
49+
mDiscoveryConfig = std::make_unique<ConsulTfSchedulerService>(
50+
ProcessType::TfSchedulerService,
51+
Config::getEndpointOption(*GetConfig())
52+
);
6453
}
6554

66-
void TfSchedulerDevice::ResetTask()
55+
void TfSchedulerDevice::PostRun()
6756
{
68-
// stop the scheduler service
69-
if (mServiceThread.joinable()) {
70-
mServiceThread.join();
71-
}
57+
IDDLOG("Stopping the TfScheduler and exiting. partition_id={}", mPartitionId);
7258

73-
// stop all instances
74-
for (auto &lInstIt : mSchedulerInstances) {
75-
lInstIt.second->stop();
59+
// delete everything
60+
if (mSchedInstance) {
61+
mSchedInstance->stop();
7662
}
63+
mSchedInstance.reset();
64+
mPartitionId.clear();
7765

78-
mSchedulerInstances.clear();
79-
66+
throw "Intentional exit";
67+
}
8068

81-
DDDLOG("ResetTask() done.");
69+
void TfSchedulerDevice::ResetTask()
70+
{
8271
}
8372

8473
bool TfSchedulerDevice::ConditionalRun()
8574
{
8675
// nothing to do here sleep for awhile
8776
std::this_thread::sleep_for(500ms);
8877

89-
// NOTE: Not using Run or ConditionalRun lets us put teardown in PostRun()
90-
return true;
91-
}
92-
93-
void TfSchedulerDevice::TfSchedulerServiceThread()
94-
{
95-
// wait for the device to go into RUNNING state
96-
WaitForRunningState();
97-
98-
while (IsRunningState()) {
99-
78+
if (!mSchedInstance) {
10079
// check for new requests
10180
PartitionRequest lNewPartitionRequest;
102-
DDLOGF_RL(2000, DataDistSeverity::debug, "Checking for new partition creation requests.");
81+
DDLOGF_RL(5000, DataDistSeverity::debug, "Checking for new partition creation requests.");
10382
if (mDiscoveryConfig->getNewPartitionRequest(lNewPartitionRequest)) {
10483
// new request
10584
IDDLOG("Request for starting a new partition. partition={}", lNewPartitionRequest.mPartitionId);
10685

107-
// check if we already have instance for the requested partition
108-
if (mSchedulerInstances.count(lNewPartitionRequest.mPartitionId) == 0) {
109-
110-
// Create a new instance for the partition
111-
auto [lNewInstIt, lEmplaced ] = mSchedulerInstances.emplace(
112-
lNewPartitionRequest.mPartitionId,
113-
std::make_unique<TfSchedulerInstanceHandler>(*this,
114-
std::string("0"), // TODO: add multiple schedulers
115-
lNewPartitionRequest
116-
) // value
117-
);
118-
119-
if (lEmplaced) {
120-
auto &lNewInstance = lNewInstIt->second;
121-
lNewInstance->start();
122-
}
123-
IDDLOG("Created new scheduler instance. partition={}", lNewPartitionRequest.mPartitionId);
124-
break; // Only service one partition per process
125-
} else {
126-
EDDLOG("Scheduler instance already exists. partition={}",lNewPartitionRequest.mPartitionId);
127-
}
86+
// Create a new instance for the partition
87+
mPartitionId = lNewPartitionRequest.mPartitionId;
88+
mSchedInstance = std::make_unique<TfSchedulerInstanceHandler>(*this,
89+
std::string("0"), // TODO: add multiple schedulers
90+
lNewPartitionRequest
91+
);
92+
mSchedInstance->start();
93+
94+
IDDLOG("Created new scheduler instance. partition={}", lNewPartitionRequest.mPartitionId);
12895
}
96+
}
12997

130-
std::this_thread::sleep_for(1000ms);
98+
if (mSchedInstance) {
99+
if (mSchedInstance->isTerminated()) {
100+
std::this_thread::sleep_for(2000ms);
101+
return false; // -> PostRun() -> exit
102+
}
131103
}
132104

133-
DDDLOG("Exiting TfSchedulerServiceThread.");
105+
return true;
134106
}
135107

136108
}

src/TfScheduler/TfSchedulerDevice.h

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include <Utilities.h>
2222

2323
#include <thread>
24-
#include <map>
2524

2625

2726
namespace o2
@@ -45,21 +44,14 @@ class TfSchedulerDevice : public DataDistDevice
4544

4645
protected:
4746
void PreRun() final;
48-
void PostRun() final { };
47+
void PostRun() final;
4948
bool ConditionalRun() final;
5049

51-
/// Discovery configuration
52-
std::shared_ptr<ConsulTfSchedulerService> mDiscoveryConfig;
53-
54-
/// Scheduler service thread
55-
void TfSchedulerServiceThread();
56-
std::thread mServiceThread;
57-
5850
/// Scheduler Instances
59-
// NOTE (unique_ptr): TfSchedulerInstance has threads that take *this.
60-
// unique_ptr ensures that *this does not change due to container
61-
std::map<std::string, std::unique_ptr<TfSchedulerInstanceHandler>> mSchedulerInstances;
51+
std::string mPartitionId;
52+
std::unique_ptr<TfSchedulerInstanceHandler> mSchedInstance;
6253

54+
std::unique_ptr<ConsulTfSchedulerService> mDiscoveryConfig;
6355
};
6456
}
6557
} /* namespace o2::DataDistribution */

0 commit comments

Comments
 (0)