Skip to content

Commit 2294021

Browse files
committed
tfscheduler: implement tf scheduling throttling based on setting provided from aliecs
1 parent c835304 commit 2294021

6 files changed

Lines changed: 118 additions & 10 deletions

File tree

src/TfScheduler/TfSchedulerInstance.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ TfSchedulerInstanceHandler::TfSchedulerInstanceHandler(DataDistDevice& pDev,
4343
lStatus.mutable_info()->set_process_id(pProcessId);
4444
lStatus.mutable_info()->set_ip_address(Config::getNetworkIfAddressOption(*pDev.GetConfig()));
4545
lStatus.mutable_partition()->set_partition_id(mPartitionInfo.mPartitionId);
46+
lStatus.set_allocated_partition_params(&mPartitionInfo.mParameters);
4647

4748
lStatus.set_stf_sender_count(mPartitionInfo.mStfSenderIdList.size());
4849
for (const auto &lStfSenderId : mPartitionInfo.mStfSenderIdList) {

src/TfScheduler/TfSchedulerStfInfo.cxx

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <set>
2424
#include <tuple>
2525
#include <algorithm>
26+
#include <random>
2627

2728
namespace o2::DataDistribution
2829
{
@@ -456,6 +457,10 @@ void TfSchedulerStfInfo::DropThread()
456457

457458
void TfSchedulerStfInfo::addStfInfo(const StfSenderStfInfo &pStfInfo, SchedulerStfInfoResponse &pResponse)
458459
{
460+
using namespace std::chrono_literals;
461+
static thread_local std::default_random_engine lGen;
462+
static thread_local std::uniform_real_distribution<double> lUniformDist(0, 100.0);
463+
459464
const std::uint64_t lRunNumber = pStfInfo.partition().run_number();
460465
const auto lStfId = pStfInfo.stf_id();
461466

@@ -494,6 +499,32 @@ void TfSchedulerStfInfo::addStfInfo(const StfSenderStfInfo &pStfInfo, SchedulerS
494499
SchedulerStfInfoResponse::DROP_SCHED_DISCARDED);
495500
return;
496501
}
502+
503+
{ // check if this tf is rejected because of throttling
504+
static std::uint64_t lThrottlingRejectedSize = 0;
505+
506+
if (lStfId > mLastThrottledStfId) {
507+
508+
// decide if discarding on throttling
509+
bool lTfDropped = (lUniformDist(lGen) <= mPercentageToBuild) ? false : true;
510+
if (lTfDropped) {
511+
mDroppedThrottlingStfs.SetEvent(lStfId);
512+
mDroppedStfs.SetEvent(lStfId);
513+
mNotScheduledTfsCount++;
514+
// this will only accurately represent the rate
515+
DDMON_RATE("tfscheduler", "tf.rejected.tf", lThrottlingRejectedSize);
516+
lThrottlingRejectedSize = 0;
517+
}
518+
mLastThrottledStfId = lStfId;
519+
}
520+
521+
if (mDroppedThrottlingStfs.GetEvent(lStfId)) {
522+
mTfSizeTotalRejected += pStfInfo.stf_size();
523+
lThrottlingRejectedSize += pStfInfo.stf_size();
524+
pResponse.set_status(SchedulerStfInfoResponse::DROP_STFS_THROTTLING);
525+
return;
526+
}
527+
}
497528
}
498529

499530
// Drop not running

src/TfScheduler/TfSchedulerStfInfo.h

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
#include <Utilities.h>
3030

31+
#include <boost/algorithm/string/trim.hpp>
32+
3133
#include <vector>
3234
#include <map>
3335
#include <thread>
@@ -87,13 +89,37 @@ class TfSchedulerStfInfo
8789
: mDiscoveryConfig(pDiscoveryConfig),
8890
mConnManager(pConnManager),
8991
mTfBuilderInfo(pTfBuilderInfo),
90-
mDroppedStfs(24ULL * 3600 * 88), // 1h of running ~ 1MiB size
91-
mBuiltTfs(24ULL * 3600 * 88)
92+
mDroppedStfs(48ULL * 3600 * 88), // 1h of running ~ 1MiB size
93+
mDroppedThrottlingStfs(48ULL * 3600 * 88),
94+
mBuiltTfs(48ULL * 3600 * 88)
9295
{ }
9396

9497
~TfSchedulerStfInfo() { }
9598

9699
void start() {
100+
// get parameters
101+
102+
// Build TfPercentage from AliECS
103+
if (mDiscoveryConfig->status().partition_params().param_values().count("BuildTfPercentage") > 0) {
104+
double lTfPercent = 100.0;
105+
auto lBuildPercentString = mDiscoveryConfig->status().partition_params().param_values().at("BuildTfPercentage");
106+
boost::algorithm::trim(lBuildPercentString);
107+
108+
try {
109+
if (lBuildPercentString.empty()) {
110+
throw boost::bad_lexical_cast();
111+
}
112+
lTfPercent = boost::lexical_cast<double>(lBuildPercentString);
113+
114+
} catch(boost::bad_lexical_cast const &e) {
115+
EDDLOG("Error while parsing AliECS parameter 'BuildTfPercentage'. str_value={} what={}", lBuildPercentString, e.what());
116+
lTfPercent = 100.0;
117+
}
118+
119+
mPercentageToBuild = std::clamp(lTfPercent, 0.0, 100.0);
120+
IDDLOG("TfScheduler parameters: BuildTfPercentage={:.4}", mPercentageToBuild);
121+
}
122+
97123
mStfInfoMap.clear();
98124

99125
mRunning = true;
@@ -169,6 +195,7 @@ class TfSchedulerStfInfo
169195

170196
/// Discovery configuration
171197
std::shared_ptr<ConsulTfScheduler> mDiscoveryConfig;
198+
double mPercentageToBuild = 100.0;
172199

173200
/// RPC clients to StfSenders and TfBuilders
174201
TfSchedulerConnManager &mConnManager;
@@ -202,14 +229,17 @@ class TfSchedulerStfInfo
202229
std::map<std::string, std::uint64_t> mMaxStfIdPerStfSender;
203230
// total sizes
204231
std::uint64_t mTfSizeTotalScheduled = 0;
205-
std::uint64_t mTfSizeTotalRejected = 0;
232+
std::atomic_uint64_t mTfSizeTotalRejected = 0;
206233

207234
std::uint64_t mStaleTfCount = 0;
208235
std::uint64_t mScheduledTfs = 0;
209236

210237
EventRecorder mDroppedStfs;
238+
EventRecorder mDroppedThrottlingStfs;
239+
std::uint64_t mLastThrottledStfId = 0;
211240
EventRecorder mBuiltTfs;
212241

242+
213243
void reset() {
214244
// NOTE: only call when holding mGlobalStfInfoLock
215245
mLastStfId = 0;
@@ -227,6 +257,8 @@ class TfSchedulerStfInfo
227257
mTfSizeTotalRejected = 0;
228258

229259
mDroppedStfs.reset();
260+
mDroppedThrottlingStfs.reset();
261+
mLastThrottledStfId = 0;
230262
mBuiltTfs.reset();
231263

232264
if (!mStfInfoMap.empty()) {

src/common/discovery/ConfigConsul.h

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#pragma GCC diagnostic push
2121
#pragma GCC diagnostic ignored "-Wunused-parameter"
22+
#include <google/protobuf/util/json_util.h>
2223
#include "discovery.pb.h"
2324
#pragma GCC diagnostic pop
2425

@@ -464,6 +465,7 @@ class ConsulConfig : public Config {
464465
static const std::string sPartitionIdSubKey = "/partition-id"s;
465466
static const std::string sRequestCreateTimeKey = "/request-create-time"s;
466467
static const std::string sStfSenderListSubKey = "/stf-sender-id-list"s;
468+
static const std::string sPartParamsKey = "/parameters"s;
467469

468470
static_assert(std::is_same_v<T, TfSchedulerConfigStatus>, "Only TfScheduler can call this method.");
469471

@@ -475,6 +477,7 @@ class ConsulConfig : public Config {
475477
const auto lReqPartitionIdKey = lPartReqKey + sPartitionIdSubKey;
476478
const auto lReqCreateTimeKey = lPartReqKey + sRequestCreateTimeKey;
477479
const auto lReqStfSenderListKey = lPartReqKey + sStfSenderListSubKey;
480+
const auto lReqParametersKey = lPartReqKey + sPartParamsKey;
478481

479482
std::scoped_lock lLock(mConsulLock);
480483

@@ -514,14 +517,40 @@ class ConsulConfig : public Config {
514517
}
515518

516519
// list of all FLPs
517-
// ../<part-id>/stf-sender-id-list: <str date-time>
520+
// ../<part-id>/stf-sender-id-list: <str>
518521
auto lFlpIdList = std::find_if(std::begin(lReqItems), std::end(lReqItems),
519522
[&] (KeyValue const& p) { return p.key == lReqStfSenderListKey; });
520523
if (lFlpIdList == std::end(lReqItems)) {
521524
EDDLOG("Invalid new partition request. Missing key: {}", lReqStfSenderListKey);
522525
break;
523526
}
524527

528+
// [optional] parameters from AliECS
529+
// ../<part-id>/parameters: <protobuf PartitionParameters as json>
530+
auto lPartParams = std::find_if(std::begin(lReqItems), std::end(lReqItems),
531+
[&] (KeyValue const& p) { return p.key == lReqParametersKey; });
532+
if (lPartParams != std::end(lReqItems)) {
533+
534+
// just validate the parameters can be decoded
535+
PartitionParameters lPartParamsProto;
536+
537+
// Parse the json_string
538+
google::protobuf::util::JsonParseOptions lJsonOptions;
539+
lJsonOptions.ignore_unknown_fields = true;
540+
lJsonOptions.case_insensitive_enum_parsing = true;
541+
542+
if (JsonStringToMessage(lPartParams->value, &lPartParamsProto, lJsonOptions).ok()) {
543+
pNewPartitionRequest.mParameters = std::move(lPartParamsProto);
544+
545+
for (const auto lParamKV : pNewPartitionRequest.mParameters.param_values()) {
546+
IDDLOG("Partition request parameters: {:30} : {}", lParamKV.first, lParamKV.second);
547+
}
548+
549+
} else {
550+
EDDLOG("Cannot parse partition parameter json object. json_str={}", lPartParams->value);
551+
}
552+
}
553+
525554
// validate the request fields
526555
// partition name, check if already exist
527556
const std::string lPartitionId = boost::trim_copy(lPartitionIdIt->value);

src/common/discovery/ConfigParameters.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
#ifndef ALICEO2_DATADIST_CONFIG_PARAMETERS_H_
1515
#define ALICEO2_DATADIST_CONFIG_PARAMETERS_H_
1616

17+
#pragma GCC diagnostic push
18+
#pragma GCC diagnostic ignored "-Wunused-parameter"
19+
#include "discovery.pb.h"
20+
#pragma GCC diagnostic pop
21+
22+
1723
#include <string>
1824
#include <vector>
1925
#include <cassert>
@@ -45,6 +51,8 @@ struct PartitionRequest {
4551
std::string mPartitionId;
4652
std::string mReqCreatedTime;
4753
std::vector<std::string> mStfSenderIdList;
54+
55+
PartitionParameters mParameters;
4856
};
4957

5058

src/common/discovery/discovery.proto

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,21 @@ message PartitionInfo {
5252
uint64 run_number = 3;
5353
}
5454

55+
message PartitionParameters {
56+
map<string, string> param_values = 1;
57+
}
58+
5559
message TfSchedulerConfigStatus {
56-
BasicInfo info = 1;
57-
PartitionInfo partition = 2;
60+
BasicInfo info = 1;
61+
PartitionInfo partition = 2;
62+
63+
string rpc_endpoint = 3;
5864

59-
string rpc_endpoint = 3;
65+
uint32 stf_sender_count = 4;
66+
repeated string stf_sender_id_list = 5;
67+
PartitionState partition_state = 6;
6068

61-
uint32 stf_sender_count = 4;
62-
repeated string stf_sender_id_list = 5;
63-
PartitionState partition_state = 6;
69+
PartitionParameters partition_params = 10; // Parameters from AliECS
6470
}
6571

6672
message StfBuilderConfigStatus {
@@ -256,6 +262,7 @@ message SchedulerStfInfoResponse {
256262
DROP_SCHED_DISCARDED = 4; // scheduler already processed that tf
257263
DROP_STFS_INCOMPLETE = 5; // all StfSenders are not reachable
258264
DROP_STFS_BUFFER_FULL = 6; // buffers on StfSender are full
265+
DROP_STFS_THROTTLING = 7; // Not scheduling because of throttling settings
259266
}
260267

261268
StfInfoStatus status = 1;

0 commit comments

Comments
 (0)