Skip to content

Commit 3c111d9

Browse files
committed
grpc: monitor channel status
1 parent a8a61fc commit 3c111d9

17 files changed

Lines changed: 487 additions & 277 deletions

src/StfSender/StfSenderOutput.cxx

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,12 @@ void StfSenderOutput::StfSchedulerThread()
220220
continue;
221221
}
222222

223-
DDLOGF_RL(1000, DataDistSeverity::debug, "StfSchedulerThread: scheduling stf_id={}", lStfId);
223+
if (!mDevice.TfSchedRpcCli().is_ready()) {
224+
EDDLOG_RL(1000, "StfSchedulerThread: TfScheduler gRPC connection is not ready stf_id={}", lStfId);
225+
continue;
226+
}
227+
228+
DDDLOG_RL(5000, "StfSchedulerThread: scheduling stf_id={}", lStfId);
224229

225230
// move the stf into triage map (before notifying the scheduler to avoid races)
226231
{
@@ -248,11 +253,11 @@ void StfSenderOutput::StfSchedulerThread()
248253

249254
mDevice.TfSchedRpcCli().StfSenderStfUpdate(lStfInfo, lSchedResponse);
250255

251-
DDLOGF_RL(3000, DataDistSeverity::debug, "Sent STF announce, stf_id={} stf_size={}", lStfId, lStfInfo.stf_size());
256+
DDDLOG_RL(5000, "Sent STF announce, stf_id={} stf_size={}", lStfId, lStfInfo.stf_size());
252257

253258
// check if the scheduler rejected the data
254259
if (lSchedResponse.status() != SchedulerStfInfoResponse::OK) {
255-
DDLOGF_RL(500, DataDistSeverity::info, "TfScheduler rejected the Stf announce. stf_id={} reason={}",
260+
IDDLOG_RL(1000, "TfScheduler rejected the Stf announce. stf_id={} reason={}",
256261
lStfId, SchedulerStfInfoResponse_StfInfoStatus_Name(lSchedResponse.status()));
257262

258263
// remove from the scheduling map

src/TfScheduler/TfSchedulerConnManager.cxx

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,15 @@ using namespace std::chrono_literals;
3030

3131
std::size_t TfSchedulerConnManager::checkStfSenders()
3232
{
33-
// TODO: monitor and reconnect?
34-
return mStfSenderRpcClients.size();
33+
std::size_t lReadyCnt = 0;
34+
for (const auto &lId : mPartitionInfo.mStfSenderIdList) {
35+
// this will attempt reconnection on existing connections
36+
if (checkStfSenderRpcConn(lId)) {
37+
lReadyCnt++;
38+
}
39+
}
40+
41+
return lReadyCnt;
3542
}
3643

3744
void TfSchedulerConnManager::connectTfBuilder(const TfBuilderConfigStatus &pTfBuilderStatus, TfBuilderConnectionResponse &pResponse /*out*/)
@@ -243,20 +250,26 @@ void TfSchedulerConnManager::StfSenderMonitoringThread()
243250
std::vector<std::uint64_t> lDroppedStfs;
244251

245252
while (mRunning) {
253+
std::chrono::milliseconds lSleep = 1000ms;
254+
246255
// make sure all StfSenders are alive
247256
const std::uint32_t lNumStfSenders = checkStfSenders();
248257
if (lNumStfSenders < mPartitionInfo.mStfSenderIdList.size()) {
249-
IDDLOG("Waiting for StfSenders. connected={} total={}", lNumStfSenders, mPartitionInfo.mStfSenderIdList.size() );
250-
std::this_thread::sleep_for(1000ms);
251-
continue;
258+
259+
mStfSenderState = STF_SENDER_STATE_INCOMPLETE;
260+
261+
WDDLOG_RL(1000, "Waiting for StfSenders. ready={} total={}", lNumStfSenders, mPartitionInfo.mStfSenderIdList.size());
262+
lSleep = 250ms;
252263
}
253264

265+
mStfSenderState = STF_SENDER_STATE_OK;
266+
254267
// wait for drop futures
255268
{
256269
{
257270
std::scoped_lock lLock(mStfDropFuturesLock);
258271
for (auto lFutureIt = mStfDropFutures.begin(); lFutureIt != mStfDropFutures.end(); lFutureIt++) {
259-
if (std::future_status::ready == lFutureIt->wait_for(std::chrono::seconds(0))) {
272+
if (std::future_status::ready == lFutureIt->wait_for(0s)) {
260273
assert (lFutureIt->valid());
261274
lDroppedStfs.push_back(lFutureIt->get());
262275
lFutureIt = mStfDropFutures.erase(lFutureIt);
@@ -267,13 +280,12 @@ void TfSchedulerConnManager::StfSenderMonitoringThread()
267280
sort(lDroppedStfs.begin(), lDroppedStfs.end());
268281
for (auto &lDroppedId : lDroppedStfs) {
269282
lDroppedTotal++;
270-
DDLOGF_RL(1000, DataDistSeverity::info, "Dropped SubTimeFrame (cannot schedule). stf_id={} total={}",
271-
lDroppedId, lDroppedTotal);
283+
IDDLOG_RL(1000, "Dropped SubTimeFrame (cannot schedule). stf_id={} total={}", lDroppedId, lDroppedTotal);
272284
}
273285
lDroppedStfs.clear();
274286
}
275287

276-
std::this_thread::sleep_for(1000ms);
288+
std::this_thread::sleep_for(lSleep);
277289
}
278290

279291
DDDLOG("Exiting StfSender RPC Monitoring thread.");

src/TfScheduler/TfSchedulerConnManager.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ namespace o2
3838
namespace DataDistribution
3939
{
4040

41+
enum StfSenderState {
42+
STF_SENDER_STATE_OK = 1,
43+
STF_SENDER_STATE_INITIALIZING,
44+
STF_SENDER_STATE_INCOMPLETE
45+
};
46+
4147
class TfSchedulerConnManager
4248
{
4349
public:
@@ -116,15 +122,20 @@ class TfSchedulerConnManager
116122
return mTfBuilderRpcClients.get(pId);
117123
}
118124

119-
void checkStfSenderRpcConn(const std::string &lStfSenderId)
125+
bool checkStfSenderRpcConn(const std::string &lStfSenderId)
120126
{
121-
mStfSenderRpcClients.checkStfSenderRpcConn(lStfSenderId);
127+
return mStfSenderRpcClients.checkStfSenderRpcConn(lStfSenderId);
122128
}
123129

130+
StfSenderState getStfSenderState() const { return mStfSenderState; }
131+
124132
private:
125133
/// Partition information
126134
PartitionRequest mPartitionInfo;
127135

136+
/// StfSender state
137+
std::atomic<StfSenderState> mStfSenderState = STF_SENDER_STATE_INITIALIZING;
138+
128139
/// Discovery configuration
129140
std::shared_ptr<ConsulTfSchedulerInstance> mDiscoveryConfig;
130141

src/TfScheduler/TfSchedulerStfInfo.cxx

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,7 @@ void TfSchedulerStfInfo::SchedulingThread()
119119
} else {
120120
// TfBuilder was removed in the meantime, e.g. by housekeeping thread because of stale info
121121
// We drop the current TF as this is not a likely situation
122-
WDDLOG(
123-
"Selected TfBuilder is not currently reachable. TF will be dropped. tfb_id={:s} tf_id={:d}",
122+
WDDLOG("Selected TfBuilder is not currently reachable. TF will be dropped. tfb_id={} tf_id={}",
124123
lTfBuilderId, lTfId);
125124

126125
mConnManager.dropAllStfsAsync(lTfId);
@@ -204,11 +203,23 @@ void TfSchedulerStfInfo::SchedulingThread()
204203

205204
void TfSchedulerStfInfo::addStfInfo(const StfSenderStfInfo &pStfInfo, SchedulerStfInfoResponse &pResponse)
206205
{
206+
static std::uint64_t sLastDropNotRunning = -1;
207+
static std::uint64_t sLastDropIncomplete = -1;
208+
207209
const auto lNumStfSenders = mDiscoveryConfig->status().stf_sender_count();
208210
const auto lStfId = pStfInfo.stf_id();
209211

210-
if (!mRunning) {
212+
// Drop not running
213+
if ((sLastDropNotRunning == lStfId) || !mRunning) {
211214
pResponse.set_status(SchedulerStfInfoResponse::DROP_NOT_RUNNING);
215+
sLastDropNotRunning = lStfId;
216+
return;
217+
}
218+
219+
// DROP When not complete
220+
if ((sLastDropIncomplete == lStfId) || (mConnManager.getStfSenderState() == StfSenderState::STF_SENDER_STATE_INCOMPLETE)) {
221+
pResponse.set_status(SchedulerStfInfoResponse::DROP_STFS_INCOMPLETE);
222+
sLastDropIncomplete = lStfId;
212223
return;
213224
}
214225

src/common/base/DataDistLogger.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,6 @@ class DataDistLogger {
282282
#define WDDLOG(...) DDLOGF(DataDistSeverity::warn, __VA_ARGS__)
283283
#define EDDLOG(...) DDLOGF(DataDistSeverity::error, __VA_ARGS__)
284284

285-
286285
// Log with fmt using ratelimiting (per thread)
287286
#define DDLOGF_RL(intervalMs, severity, ...) \
288287
do { \
@@ -299,6 +298,12 @@ do {
299298
} \
300299
} while(0)
301300

301+
302+
#define DDDLOG_RL(intervalMs, ...) DDLOGF_RL(intervalMs, DataDistSeverity::debug, __VA_ARGS__)
303+
#define IDDLOG_RL(intervalMs, ...) DDLOGF_RL(intervalMs, DataDistSeverity::info, __VA_ARGS__)
304+
#define WDDLOG_RL(intervalMs, ...) DDLOGF_RL(intervalMs, DataDistSeverity::warn, __VA_ARGS__)
305+
#define EDDLOG_RL(intervalMs, ...) DDLOGF_RL(intervalMs, DataDistSeverity::error, __VA_ARGS__)
306+
302307
// Log with fmt using ratelimiting (global)
303308
#define DDLOGF_GRL(intervalMs, severity, ...) \
304309
do { \
@@ -315,6 +320,12 @@ do {
315320
} \
316321
} while(0)
317322

323+
324+
#define DDDLOG_GRL(intervalMs, ...) DDLOGF_GRL(intervalMs, DataDistSeverity::debug, __VA_ARGS__)
325+
#define IDDLOG_GRL(intervalMs, ...) DDLOGF_GRL(intervalMs, DataDistSeverity::info, __VA_ARGS__)
326+
#define WDDLOG_GRL(intervalMs, ...) DDLOGF_GRL(intervalMs, DataDistSeverity::warn, __VA_ARGS__)
327+
#define EDDLOG_GRL(intervalMs, ...) DDLOGF_GRL(intervalMs, DataDistSeverity::error, __VA_ARGS__)
328+
318329
namespace impl {
319330
struct DataDistLoggerCtx {
320331

src/common/base/Utilities.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,33 @@ class RunningSamples
9999
return mCount == 0 ? T(0) : (Sum() / T(mCount));
100100
}
101101

102+
T MeanStep() const
103+
{
104+
T lStepSum = 0;
105+
std::size_t lStepsCounted = 0;
106+
107+
if (mCount < 2) {
108+
return T(0);
109+
}
110+
111+
for (auto it = begin()+1; it < end(); it++) {
112+
if (*it <= *(it-1)) {
113+
continue;
114+
}
115+
116+
lStepSum += (*it - *(it-1));
117+
lStepsCounted++;
118+
}
119+
120+
return (lStepsCounted > 0) ? ((T)lStepSum / (T)lStepsCounted) : T(0);
121+
}
122+
123+
T MeanStepFreq() const
124+
{
125+
const T lMeanStep = MeanStep();
126+
return (lMeanStep != T(0)) ? (T(1) / lMeanStep) : T(0);
127+
}
128+
102129
std::pair<T, T> MinMax() const
103130
{
104131
if (mCount == 0)

src/common/discovery/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ set_target_properties(discovery PROPERTIES
5656
target_include_directories(discovery
5757
PUBLIC
5858
${CMAKE_CURRENT_BINARY_DIR} # protobuf puts generated c++ files there
59-
${CMAKE_CURRENT_SOURCE_DIR}/../include/discovery
59+
${CMAKE_CURRENT_SOURCE_DIR}
6060
)
6161

6262
target_link_libraries(discovery
Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@
2424
#include <string>
2525
#include <map>
2626
#include <cassert>
27+
#include <future>
28+
29+
#include <boost/uuid/uuid.hpp>
30+
#include <boost/uuid/uuid_generators.hpp>
31+
#include <boost/uuid/uuid_io.hpp>
32+
#include <boost/lexical_cast.hpp>
33+
#include <boost/asio/ip/host_name.hpp>
2734

2835
namespace o2
2936
{
@@ -52,13 +59,13 @@ struct ProcessType {
5259
mType = -1;
5360
}
5461

55-
constexpr operator int() const { return mType; }
62+
// constexpr operator int() const { return mType; }
5663

5764
constexpr bool operator==(const ProcessType &b) const { return mType == b.mType; }
5865
constexpr bool operator!=(const ProcessType &b) const { return mType != b.mType; }
5966
constexpr bool operator<(const ProcessType &b) const { return mType < b.mType; }
6067

61-
operator std::string() const {
68+
std::string to_string() const {
6269

6370
if (*this == ProcessType::StfBuilder)
6471
return "StfBuilder";
@@ -165,20 +172,72 @@ class Config {
165172
}
166173

167174
static
168-
std::string getIdOption(const FairMQProgOptions& pFMQProgOpt)
175+
std::string getIdOption(const ProcessType pProcType, const FairMQProgOptions& pFMQProgOpt)
169176
{
170-
auto lId = pFMQProgOpt.GetValue<std::string>(OptionKeyDiscoveryId);
171-
if (lId.empty()) {
172-
DDLOG(fair::Severity::ERROR) << "Process must have unique ID for DataDistribution discovery.";
173-
throw std::invalid_argument("Process ID for DataDiscovery must be provided.");
177+
// check cmdline first
178+
{
179+
std::string lId = pFMQProgOpt.GetValue<std::string>(OptionKeyDiscoveryId);
180+
if (!lId.empty()) {
181+
IDDLOG("Parameter <{}> provided on command line. value={}", OptionKeyDiscoveryId, lId);
182+
return lId;
183+
}
184+
185+
if (pProcType == ProcessType::StfSender) {
186+
const auto lErrorMsg = fmt::format("Parameter <{}> for StfSender must be provided on command line.",
187+
OptionKeyDiscoveryId);
188+
EDDLOG( lErrorMsg);
189+
throw std::invalid_argument(lErrorMsg);
190+
}
191+
}
192+
193+
{
194+
// get an unique ID
195+
std::string lUniquePart = boost::lexical_cast<std::string>(boost::uuids::random_generator()());
196+
std::string lUniqueId = pProcType.to_string() + "-" + boost::asio::ip::host_name() + "-" + lUniquePart;
197+
198+
IDDLOG("Parameter {} not provided, using random value={}", Config::OptionKeyDiscoveryId, lUniqueId);
199+
return lUniqueId;
174200
}
175-
return lId;
176201
}
177202

178203
static
179204
std::string getPartitionOption(const FairMQProgOptions& pFMQProgOpt)
180205
{
181-
return pFMQProgOpt.GetValue<std::string>(OptionKeyDiscoveryPartition);
206+
// check cmdline first
207+
{
208+
std::string lPartId = pFMQProgOpt.GetValue<std::string>(OptionKeyDiscoveryPartition);
209+
if (!lPartId.empty()) {
210+
IDDLOG("Parameter <{}> provided on command line. value={}",
211+
OptionKeyDiscoveryPartition, lPartId);
212+
return lPartId;
213+
}
214+
}
215+
216+
// setup for ODC
217+
std::promise<std::string> mPartIdPromise;
218+
std::future<std::string> mPartIdFuture = mPartIdPromise.get_future();
219+
220+
pFMQProgOpt.Subscribe<std::string>("o2.datadist", [&](const std::string& pKey, std::string pValue) {
221+
IDDLOG("Config::Subscribe received key-value pair <{}>=<{}>", pKey, pValue);
222+
223+
// partition key for tf builder
224+
if (pKey == OptionKeyDiscoveryPartition) {
225+
mPartIdPromise.set_value(pValue);
226+
} else {
227+
EDDLOG("Config::Subscribe unrecognized key value pushed {}={}", pKey, pValue);
228+
}
229+
});
230+
231+
// wait for an ODC value
232+
mPartIdFuture.wait();
233+
const auto lDiscId = mPartIdFuture.get();
234+
if (lDiscId.length() == 0) {
235+
EDDLOG("Parameter {} provided from ODC. zero length", Config::OptionKeyDiscoveryPartition);
236+
throw std::invalid_argument(fmt::format("Invalid ODC parameter <{}>: zero length", Config::OptionKeyDiscoveryPartition));
237+
}
238+
239+
IDDLOG("Parameter {} provided from ODC. value={}", Config::OptionKeyDiscoveryPartition, lDiscId);
240+
return lDiscId;
182241
}
183242

184243
Config() = delete;
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)