Skip to content

Commit 0fe6f66

Browse files
committed
Executor behaviour changed -> it uses stable connection with possibility to restore connection or process.
1 parent a201e32 commit 0fe6f66

2 files changed

Lines changed: 108 additions & 83 deletions

File tree

api/include/apihandler.hpp

Lines changed: 105 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -112,96 +112,90 @@ class Executor {
112112
const std::string& method, const std::vector<std::vector<::general::Variant>>& params, const int64_t executionTime, cs::Sequence sequence);
113113

114114
void getContractMethods(GetContractMethodsResult& _return, const std::vector<::general::ByteCodeObject>& byteCodeObjects) {
115-
if (!connect()) {
116-
_return.status.code = 1;
117-
_return.status.message = "No executor connection!";
118-
return;
119-
}
120115
try {
121116
std::shared_lock lock(sharedErrorMutex_);
122117
origExecutor_->getContractMethods(_return, byteCodeObjects, EXECUTOR_VERSION);
123118
}
124-
catch (::apache::thrift::transport::TTransportException & x) {
119+
catch (const ::apache::thrift::transport::TTransportException& x) {
125120
// sets stop_ flag to true forever, replace with new instance
126121
if (x.getType() == ::apache::thrift::transport::TTransportException::NOT_OPEN) {
127-
reCreationOriginExecutor();
122+
recreateOriginExecutor();
123+
notifyError();
128124
}
125+
129126
_return.status.code = 1;
130127
_return.status.message = x.what();
131128
}
132-
catch( std::exception & x ) {
129+
catch(const std::exception& x ) {
133130
_return.status.code = 1;
134131
_return.status.message = x.what();
132+
133+
notifyError();
135134
}
136-
disconnect();
137135
}
138136

139137
void getContractVariables(GetContractVariablesResult& _return, const std::vector<::general::ByteCodeObject>& byteCodeObjects, const std::string& contractState) {
140-
if (!connect()) {
141-
_return.status.code = 1;
142-
_return.status.message = "No executor connection!";
143-
return;
144-
}
145138
try {
146139
std::shared_lock lock(sharedErrorMutex_);
147140
origExecutor_->getContractVariables(_return, byteCodeObjects, contractState, EXECUTOR_VERSION);
148141
}
149-
catch (::apache::thrift::transport::TTransportException & x) {
142+
catch (const ::apache::thrift::transport::TTransportException& x) {
150143
// sets stop_ flag to true forever, replace with new instance
151144
if (x.getType() == ::apache::thrift::transport::TTransportException::NOT_OPEN) {
152-
reCreationOriginExecutor();
145+
recreateOriginExecutor();
146+
notifyError();
153147
}
148+
154149
_return.status.code = 1;
155150
_return.status.message = x.what();
156151
}
157-
catch( std::exception & x ) {
152+
catch(const std::exception& x ) {
158153
_return.status.code = 1;
159154
_return.status.message = x.what();
155+
156+
notifyError();
160157
}
161-
disconnect();
162158
}
163159

164160
void compileSourceCode(CompileSourceCodeResult& _return, const std::string& sourceCode) {
165-
if (!connect()) {
166-
_return.status.code = 1;
167-
_return.status.message = "No executor connection!";
168-
return;
169-
}
170161
try {
171162
std::shared_lock slk(sharedErrorMutex_);
172163
origExecutor_->compileSourceCode(_return, sourceCode, EXECUTOR_VERSION);
173164
}
174-
catch (::apache::thrift::transport::TTransportException & x) {
165+
catch (::apache::thrift::transport::TTransportException& x) {
175166
// sets stop_ flag to true forever, replace with new instance
176167
if (x.getType() == ::apache::thrift::transport::TTransportException::NOT_OPEN) {
177-
reCreationOriginExecutor();
168+
recreateOriginExecutor();
169+
notifyError();
178170
}
171+
179172
_return.status.code = 1;
180173
_return.status.message = x.what();
181174
}
182-
catch( std::exception & x ) {
175+
catch(const std::exception& x ) {
183176
_return.status.code = 1;
184177
_return.status.message = x.what();
178+
179+
notifyError();
185180
}
186-
disconnect();
187181
}
188182

189183
public:
190-
191-
~Executor() {
192-
stop();
193-
}
194-
195184
static Executor& getInstance(const BlockChain* p_blockchain = nullptr, const cs::SolverCore* solver = nullptr, const int p_exec_port = 0,
196185
const std::string p_exec_ip = std::string{}, const std::string p_exec_cmdline = std::string{}) { // singlton
197186
static Executor executor(*p_blockchain, *solver, p_exec_port, p_exec_ip, p_exec_cmdline);
198187
return executor;
199188
}
200189

190+
bool isConnect() const {
191+
return executorTransport_->isOpen();
192+
}
193+
201194
void stop() {
202195
requestStop_ = true;
196+
203197
// wake up watching thread if it sleeps
204-
cvErrorConnect_.notify_one();
198+
notifyError();
205199
}
206200

207201
std::optional<cs::Sequence> getSequence(const general::AccessID& accessId) {
@@ -353,7 +347,7 @@ class Executor {
353347
csdb::Transaction send_transaction;
354348
const auto source = BlockChain::getAddressFromKey(transaction.source);
355349
const uint64_t WALLET_DENOM = csdb::Amount::AMOUNT_MAX_FRACTION; // 1'000'000'000'000'000'000ull;
356-
send_transaction.set_amount(csdb::Amount(transaction.amount.integral, transaction.amount.fraction, WALLET_DENOM));
350+
send_transaction.set_amount(csdb::Amount(transaction.amount.integral, uint64_t(transaction.amount.fraction), WALLET_DENOM));
357351
BlockChain::WalletData wallData{};
358352
BlockChain::WalletId id{};
359353

@@ -363,7 +357,7 @@ class Executor {
363357
send_transaction.set_currency(csdb::Currency(1));
364358
send_transaction.set_source(source);
365359
send_transaction.set_target(BlockChain::getAddressFromKey(transaction.target));
366-
send_transaction.set_max_fee(csdb::AmountCommission((uint16_t)transaction.fee.commission));
360+
send_transaction.set_max_fee(csdb::AmountCommission(uint16_t(transaction.fee.commission)));
367361
send_transaction.set_innerID(transaction.id & 0x3fffffffffff);
368362

369363
// TODO Change Thrift to avoid copy
@@ -376,10 +370,6 @@ class Executor {
376370
return send_transaction;
377371
}
378372

379-
bool isConnect() {
380-
return isConnect_;
381-
}
382-
383373
void state_update(const csdb::Pool& pool);
384374

385375
void addToLockSmart(const general::Address& address, const general::AccessID& accessId) {
@@ -424,48 +414,80 @@ public slots:
424414
state_update(block);
425415
}
426416

417+
void onExecutorStarted() {
418+
if (!isConnect()) {
419+
connect();
420+
}
421+
}
422+
423+
void onExecutorFinished() {
424+
if (!executorProcess_->isRunning() && !requestStop_) {
425+
executorProcess_->launch(cs::Process::Options::None);
426+
}
427+
}
428+
429+
void onExecutorProcessError(const cs::ProcessException& exception) {
430+
cswarning() << "Executor process error occured " << exception.what() << ", code " << exception.code();
431+
}
432+
427433
private:
428434
std::map<general::Address, general::AccessID> lockSmarts;
429435
explicit Executor(const BlockChain& p_blockchain, const cs::SolverCore& solver, int p_exec_port,
430-
const std::string p_exec_ip, const std::string p_exec_cmdline)
436+
const std::string& p_exec_ip, const std::string& p_exec_cmdline)
431437
: blockchain_(p_blockchain)
432438
, solver_(solver)
433439
, executorTransport_(new ::apache::thrift::transport::TBufferedTransport(
434440
::apache::thrift::stdcxx::make_shared<::apache::thrift::transport::TSocket>(p_exec_ip, p_exec_port)))
435441
, origExecutor_(
436442
std::make_unique<executor::ContractExecutorConcurrentClient>(::apache::thrift::stdcxx::make_shared<apache::thrift::protocol::TBinaryProtocol>(executorTransport_))) {
437-
std::thread th([=]() {
438-
std::string executor_cmdline = p_exec_cmdline;
439-
std::unique_ptr<cs::Process> executor_process;
440-
if(!executor_cmdline.empty()) {
441-
executor_process = std::make_unique<cs::Process>(executor_cmdline);
442-
executor_process->launch(cs::Process::Options::None);
443-
}
444-
while (true) {
445-
if (isConnect_) {
446-
static std::mutex mt;
447-
std::unique_lock ulk(mt);
448-
cvErrorConnect_.wait(ulk, [&] { return !isConnect_ || requestStop_; });
449-
}
443+
std::string executorCmdline = p_exec_cmdline;
450444

451-
if (requestStop_) {
452-
break;
453-
}
454-
static const int RECONNECT_TIME = 10;
455-
std::this_thread::sleep_for(std::chrono::seconds(RECONNECT_TIME));
456-
if (!executor_process || executor_process->isRunning()) {
457-
if (connect())
458-
disconnect();
445+
if (executorCmdline.empty()) {
446+
cswarning() << "Executor command line args are empty, process would not be created";
447+
return;
448+
}
449+
450+
executorProcess_ = std::make_unique<cs::Process>(executorCmdline);
451+
452+
cs::Connector::connect(&executorProcess_->started, this, &Executor::onExecutorStarted);
453+
cs::Connector::connect(&executorProcess_->finished, this, &Executor::onExecutorFinished);
454+
cs::Connector::connect(&executorProcess_->errorOccured, this, &Executor::onExecutorProcessError);
455+
456+
executorProcess_->launch(cs::Process::Options::None);
457+
while (!executorProcess_->isRunning());
458+
459+
std::thread thread([this]() {
460+
while(!requestStop_) {
461+
if (isConnect()) {
462+
static std::mutex mutex;
463+
std::unique_lock lock(mutex);
464+
465+
cvErrorConnect_.wait(lock, [&] {
466+
return !isConnect() || requestStop_;
467+
});
459468
}
460-
else {
461-
executor_process->launch(cs::Process::Options::None);
469+
470+
static const int kReconnectTime = 5;
471+
std::this_thread::sleep_for(std::chrono::seconds(kReconnectTime));
472+
473+
if (!isConnect()) {
474+
connect();
462475
}
463476
}
464-
if (executor_process) {
465-
executor_process->terminate();
466-
}
467477
});
468-
th.detach();
478+
479+
thread.detach();
480+
}
481+
482+
~Executor() {
483+
stop();
484+
485+
if (executorProcess_) {
486+
if (executorProcess_->isRunning()) {
487+
disconnect();
488+
executorProcess_->terminate();
489+
}
490+
}
469491
}
470492

471493
struct OriginExecuteResult {
@@ -480,11 +502,11 @@ public slots:
480502
std::lock_guard lk(mutex_);
481503
++lastAccessId_;
482504
accessSequence_[lastAccessId_] = (explicit_sequence != kUseLastSequence ? explicit_sequence : blockchain_.getLastSeq());
483-
return lastAccessId_;
505+
return static_cast<uint64_t>(lastAccessId_);
484506
}
485507

486508
uint64_t getFutureAccessId() {
487-
return lastAccessId_ + 1;
509+
return static_cast<uint64_t>(lastAccessId_ + 1);
488510
}
489511

490512
void deleteAccessId(const general::AccessID& p_access_id) {
@@ -503,31 +525,35 @@ public slots:
503525
}
504526

505527
executorTransport_->open();
506-
isConnect_ = true;
507528
}
508529
catch (...) {
509-
isConnect_ = false;
510-
cvErrorConnect_.notify_one();
530+
notifyError();
511531
}
512-
return isConnect_;
532+
533+
return executorTransport_->isOpen();
513534
}
514535

515536
void disconnect() {
516537
try {
517538
executorTransport_->close();
518539
}
519540
catch (::apache::thrift::transport::TTransportException&) {
520-
isConnect_ = false;
521-
cvErrorConnect_.notify_one();
541+
notifyError();
522542
}
523543
}
524544

545+
void notifyError() {
546+
cvErrorConnect_.notify_one();
547+
}
548+
525549
//
526550
using OriginExecutor = executor::ContractExecutorConcurrentClient;
527551
using BinaryProtocol = apache::thrift::protocol::TBinaryProtocol;
528552
std::shared_mutex sharedErrorMutex_;
529-
void reCreationOriginExecutor() {
530-
std::lock_guard glk(sharedErrorMutex_);
553+
554+
void recreateOriginExecutor() {
555+
std::lock_guard lock(sharedErrorMutex_);
556+
disconnect();
531557
origExecutor_.reset(new OriginExecutor(::apache::thrift::stdcxx::make_shared<BinaryProtocol>(executorTransport_)));
532558
}
533559
//
@@ -537,6 +563,7 @@ public slots:
537563
const cs::SolverCore& solver_;
538564
::apache::thrift::stdcxx::shared_ptr<::apache::thrift::transport::TTransport> executorTransport_;
539565
std::unique_ptr<executor::ContractExecutorConcurrentClient> origExecutor_;
566+
std::unique_ptr<cs::Process> executorProcess_;
540567

541568
general::AccessID lastAccessId_{};
542569
std::map<general::AccessID, cs::Sequence> accessSequence_;
@@ -549,9 +576,8 @@ public slots:
549576
std::atomic_size_t execCount_{0};
550577

551578
std::condition_variable cvErrorConnect_;
552-
std::atomic_bool isConnect_{ false };
553579
std::atomic_bool requestStop_{ false };
554-
const uint16_t EXECUTOR_VERSION = 2;
580+
const int16_t EXECUTOR_VERSION = 2;
555581

556582
// temporary solution?
557583
std::mutex callExecutorLock_;

api/src/apihandler.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2204,7 +2204,7 @@ namespace executor {
22042204
catch (::apache::thrift::transport::TTransportException& x) {
22052205
// sets stop_ flag to true forever, replace with new instance
22062206
if (x.getType() == ::apache::thrift::transport::TTransportException::NOT_OPEN) {
2207-
reCreationOriginExecutor();
2207+
recreateOriginExecutor();
22082208
}
22092209
_return.status.code = 1;
22102210
_return.status.message = x.what();
@@ -2215,7 +2215,6 @@ namespace executor {
22152215
}
22162216
--execCount_;
22172217
deleteAccessId(access_id);
2218-
disconnect();
22192218
}
22202219

22212220
std::optional<std::string> Executor::getState(const csdb::Address& p_address) {
@@ -2573,7 +2572,7 @@ namespace executor {
25732572
catch (::apache::thrift::transport::TTransportException& x) {
25742573
// sets stop_ flag to true forever, replace with new instance
25752574
if (x.getType() == ::apache::thrift::transport::TTransportException::NOT_OPEN) {
2576-
reCreationOriginExecutor();
2575+
recreateOriginExecutor();
25772576
}
25782577
originExecuteRes.resp.status.code = cs::error::ThriftException;
25792578
originExecuteRes.resp.status.message = x.what();
@@ -2586,7 +2585,7 @@ namespace executor {
25862585
--execCount_;
25872586
if (!isGetter)
25882587
deleteAccessId(access_id);
2589-
disconnect();
2588+
25902589
originExecuteRes.acceessId = access_id;
25912590
return std::make_optional<OriginExecuteResult>(std::move(originExecuteRes));
25922591
}

0 commit comments

Comments
 (0)