Skip to content

Commit aca4d85

Browse files
committed
Merge branch 'dapsnet' of https://gitlab.com/credits_bc/core/node into dapsnet_fresh
# Conflicts: # api/src/apihandler.cpp
2 parents ea33b59 + 18ba935 commit aca4d85

28 files changed

Lines changed: 678 additions & 912 deletions

api/include/apihandler.hpp

Lines changed: 118 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -112,96 +112,90 @@ class Executor {
112112
const std::string& method, const std::vector<std::vector<::general::Variant>>& params, const int64_t executionTime, cs::Sequence sequence);
113113

114114
void getContractMethods(GetContractMethodsResult& _return, const std::vector<::general::ByteCodeObject>& byteCodeObjects) {
115-
if (!connect()) {
116-
_return.status.code = 1;
117-
_return.status.message = "No executor connection!";
118-
return;
119-
}
120115
try {
121116
std::shared_lock lock(sharedErrorMutex_);
122117
origExecutor_->getContractMethods(_return, byteCodeObjects, EXECUTOR_VERSION);
123118
}
124-
catch (::apache::thrift::transport::TTransportException & x) {
119+
catch (const ::apache::thrift::transport::TTransportException& x) {
125120
// sets stop_ flag to true forever, replace with new instance
126121
if (x.getType() == ::apache::thrift::transport::TTransportException::NOT_OPEN) {
127-
reCreationOriginExecutor();
122+
recreateOriginExecutor();
123+
notifyError();
128124
}
125+
129126
_return.status.code = 1;
130127
_return.status.message = x.what();
131128
}
132-
catch( std::exception & x ) {
129+
catch(const std::exception& x ) {
133130
_return.status.code = 1;
134131
_return.status.message = x.what();
132+
133+
notifyError();
135134
}
136-
disconnect();
137135
}
138136

139137
void getContractVariables(GetContractVariablesResult& _return, const std::vector<::general::ByteCodeObject>& byteCodeObjects, const std::string& contractState) {
140-
if (!connect()) {
141-
_return.status.code = 1;
142-
_return.status.message = "No executor connection!";
143-
return;
144-
}
145138
try {
146139
std::shared_lock lock(sharedErrorMutex_);
147140
origExecutor_->getContractVariables(_return, byteCodeObjects, contractState, EXECUTOR_VERSION);
148141
}
149-
catch (::apache::thrift::transport::TTransportException & x) {
142+
catch (const ::apache::thrift::transport::TTransportException& x) {
150143
// sets stop_ flag to true forever, replace with new instance
151144
if (x.getType() == ::apache::thrift::transport::TTransportException::NOT_OPEN) {
152-
reCreationOriginExecutor();
145+
recreateOriginExecutor();
146+
notifyError();
153147
}
148+
154149
_return.status.code = 1;
155150
_return.status.message = x.what();
156151
}
157-
catch( std::exception & x ) {
152+
catch(const std::exception& x ) {
158153
_return.status.code = 1;
159154
_return.status.message = x.what();
155+
156+
notifyError();
160157
}
161-
disconnect();
162158
}
163159

164160
void compileSourceCode(CompileSourceCodeResult& _return, const std::string& sourceCode) {
165-
if (!connect()) {
166-
_return.status.code = 1;
167-
_return.status.message = "No executor connection!";
168-
return;
169-
}
170161
try {
171162
std::shared_lock slk(sharedErrorMutex_);
172163
origExecutor_->compileSourceCode(_return, sourceCode, EXECUTOR_VERSION);
173164
}
174-
catch (::apache::thrift::transport::TTransportException & x) {
165+
catch (::apache::thrift::transport::TTransportException& x) {
175166
// sets stop_ flag to true forever, replace with new instance
176167
if (x.getType() == ::apache::thrift::transport::TTransportException::NOT_OPEN) {
177-
reCreationOriginExecutor();
168+
recreateOriginExecutor();
169+
notifyError();
178170
}
171+
179172
_return.status.code = 1;
180173
_return.status.message = x.what();
181174
}
182-
catch( std::exception & x ) {
175+
catch(const std::exception& x ) {
183176
_return.status.code = 1;
184177
_return.status.message = x.what();
178+
179+
notifyError();
185180
}
186-
disconnect();
187181
}
188182

189183
public:
190-
191-
~Executor() {
192-
stop();
193-
}
194-
195184
static Executor& getInstance(const BlockChain* p_blockchain = nullptr, const cs::SolverCore* solver = nullptr, const int p_exec_port = 0,
196185
const std::string p_exec_ip = std::string{}, const std::string p_exec_cmdline = std::string{}) { // singlton
197186
static Executor executor(*p_blockchain, *solver, p_exec_port, p_exec_ip, p_exec_cmdline);
198187
return executor;
199188
}
200189

190+
bool isConnect() const {
191+
return executorTransport_->isOpen();
192+
}
193+
201194
void stop() {
202195
requestStop_ = true;
196+
203197
// wake up watching thread if it sleeps
204-
cvErrorConnect_.notify_one();
198+
notifyError();
205199
}
206200

207201
std::optional<cs::Sequence> getSequence(const general::AccessID& accessId) {
@@ -353,7 +347,7 @@ class Executor {
353347
csdb::Transaction send_transaction;
354348
const auto source = BlockChain::getAddressFromKey(transaction.source);
355349
const uint64_t WALLET_DENOM = csdb::Amount::AMOUNT_MAX_FRACTION; // 1'000'000'000'000'000'000ull;
356-
send_transaction.set_amount(csdb::Amount(transaction.amount.integral, transaction.amount.fraction, WALLET_DENOM));
350+
send_transaction.set_amount(csdb::Amount(transaction.amount.integral, uint64_t(transaction.amount.fraction), WALLET_DENOM));
357351
BlockChain::WalletData wallData{};
358352
BlockChain::WalletId id{};
359353

@@ -363,7 +357,7 @@ class Executor {
363357
send_transaction.set_currency(csdb::Currency(1));
364358
send_transaction.set_source(source);
365359
send_transaction.set_target(BlockChain::getAddressFromKey(transaction.target));
366-
send_transaction.set_max_fee(csdb::AmountCommission((uint16_t)transaction.fee.commission));
360+
send_transaction.set_max_fee(csdb::AmountCommission(uint16_t(transaction.fee.commission)));
367361
send_transaction.set_innerID(transaction.id & 0x3fffffffffff);
368362

369363
// TODO Change Thrift to avoid copy
@@ -376,10 +370,6 @@ class Executor {
376370
return send_transaction;
377371
}
378372

379-
bool isConnect() {
380-
return isConnect_;
381-
}
382-
383373
void state_update(const csdb::Pool& pool);
384374

385375
void addToLockSmart(const general::Address& address, const general::AccessID& accessId) {
@@ -424,48 +414,83 @@ public slots:
424414
state_update(block);
425415
}
426416

417+
void onExecutorStarted() {
418+
if (!isConnect()) {
419+
connect();
420+
}
421+
}
422+
423+
void onExecutorFinished() {
424+
if (!executorProcess_->isRunning() && !requestStop_) {
425+
executorProcess_->launch(cs::Process::Options::None);
426+
}
427+
}
428+
429+
void onExecutorProcessError(const cs::ProcessException& exception) {
430+
cswarning() << "Executor process error occured " << exception.what() << ", code " << exception.code();
431+
}
432+
427433
private:
428434
std::map<general::Address, general::AccessID> lockSmarts;
429435
explicit Executor(const BlockChain& p_blockchain, const cs::SolverCore& solver, int p_exec_port,
430-
const std::string p_exec_ip, const std::string p_exec_cmdline)
436+
const std::string& p_exec_ip, const std::string& p_exec_cmdline)
431437
: blockchain_(p_blockchain)
432438
, solver_(solver)
433-
, executorTransport_(new ::apache::thrift::transport::TBufferedTransport(
434-
::apache::thrift::stdcxx::make_shared<::apache::thrift::transport::TSocket>(p_exec_ip, p_exec_port)))
439+
, socket_(::apache::thrift::stdcxx::make_shared<::apache::thrift::transport::TSocket>(p_exec_ip, p_exec_port))
440+
, executorTransport_(new ::apache::thrift::transport::TBufferedTransport(socket_))
435441
, origExecutor_(
436442
std::make_unique<executor::ContractExecutorConcurrentClient>(::apache::thrift::stdcxx::make_shared<apache::thrift::protocol::TBinaryProtocol>(executorTransport_))) {
437-
std::thread th([=]() {
438-
std::string executor_cmdline = p_exec_cmdline;
439-
std::unique_ptr<cs::Process> executor_process;
440-
if(!executor_cmdline.empty()) {
441-
executor_process = std::make_unique<cs::Process>(executor_cmdline);
442-
executor_process->launch(cs::Process::Options::None);
443-
}
444-
while (true) {
445-
if (isConnect_) {
446-
static std::mutex mt;
447-
std::unique_lock ulk(mt);
448-
cvErrorConnect_.wait(ulk, [&] { return !isConnect_ || requestStop_; });
449-
}
443+
std::string executorCmdline = p_exec_cmdline;
450444

451-
if (requestStop_) {
452-
break;
453-
}
454-
static const int RECONNECT_TIME = 10;
455-
std::this_thread::sleep_for(std::chrono::seconds(RECONNECT_TIME));
456-
if (!executor_process || executor_process->isRunning()) {
457-
if (connect())
458-
disconnect();
445+
socket_->setSendTimeout(kSendTimeout);
446+
socket_->setRecvTimeout(kReceiveTimeout);
447+
448+
if (executorCmdline.empty()) {
449+
cswarning() << "Executor command line args are empty, process would not be created";
450+
return;
451+
}
452+
453+
executorProcess_ = std::make_unique<cs::Process>(executorCmdline);
454+
455+
cs::Connector::connect(&executorProcess_->started, this, &Executor::onExecutorStarted);
456+
cs::Connector::connect(&executorProcess_->finished, this, &Executor::onExecutorFinished);
457+
cs::Connector::connect(&executorProcess_->errorOccured, this, &Executor::onExecutorProcessError);
458+
459+
executorProcess_->launch(cs::Process::Options::None);
460+
while (!executorProcess_->isRunning());
461+
462+
std::thread thread([this]() {
463+
while(!requestStop_) {
464+
if (isConnect()) {
465+
static std::mutex mutex;
466+
std::unique_lock lock(mutex);
467+
468+
cvErrorConnect_.wait(lock, [&] {
469+
return !isConnect() || requestStop_;
470+
});
459471
}
460-
else {
461-
executor_process->launch(cs::Process::Options::None);
472+
473+
static const int kReconnectTime = 2;
474+
std::this_thread::sleep_for(std::chrono::seconds(kReconnectTime));
475+
476+
if (!isConnect()) {
477+
connect();
462478
}
463479
}
464-
if (executor_process) {
465-
executor_process->terminate();
466-
}
467480
});
468-
th.detach();
481+
482+
thread.detach();
483+
}
484+
485+
~Executor() {
486+
stop();
487+
488+
if (executorProcess_) {
489+
if (executorProcess_->isRunning()) {
490+
disconnect();
491+
executorProcess_->terminate();
492+
}
493+
}
469494
}
470495

471496
struct OriginExecuteResult {
@@ -480,11 +505,11 @@ public slots:
480505
std::lock_guard lk(mutex_);
481506
++lastAccessId_;
482507
accessSequence_[lastAccessId_] = (explicit_sequence != kUseLastSequence ? explicit_sequence : blockchain_.getLastSeq());
483-
return lastAccessId_;
508+
return static_cast<uint64_t>(lastAccessId_);
484509
}
485510

486511
uint64_t getFutureAccessId() {
487-
return lastAccessId_ + 1;
512+
return static_cast<uint64_t>(lastAccessId_ + 1);
488513
}
489514

490515
void deleteAccessId(const general::AccessID& p_access_id) {
@@ -498,45 +523,49 @@ public slots:
498523

499524
bool connect() {
500525
try {
501-
if (executorTransport_->isOpen()) {
502-
executorTransport_->close();
503-
}
504-
505526
executorTransport_->open();
506-
isConnect_ = true;
507527
}
508528
catch (...) {
509-
isConnect_ = false;
510-
cvErrorConnect_.notify_one();
529+
notifyError();
511530
}
512-
return isConnect_;
531+
532+
return executorTransport_->isOpen();
513533
}
514534

515535
void disconnect() {
516536
try {
517537
executorTransport_->close();
518538
}
519539
catch (::apache::thrift::transport::TTransportException&) {
520-
isConnect_ = false;
521-
cvErrorConnect_.notify_one();
540+
notifyError();
522541
}
523542
}
524543

544+
void notifyError() {
545+
cvErrorConnect_.notify_one();
546+
}
547+
525548
//
526549
using OriginExecutor = executor::ContractExecutorConcurrentClient;
527550
using BinaryProtocol = apache::thrift::protocol::TBinaryProtocol;
528551
std::shared_mutex sharedErrorMutex_;
529-
void reCreationOriginExecutor() {
530-
std::lock_guard glk(sharedErrorMutex_);
552+
553+
void recreateOriginExecutor() {
554+
std::lock_guard lock(sharedErrorMutex_);
555+
disconnect();
531556
origExecutor_.reset(new OriginExecutor(::apache::thrift::stdcxx::make_shared<BinaryProtocol>(executorTransport_)));
532557
}
533558
//
534559

535560
private:
536561
const BlockChain& blockchain_;
537562
const cs::SolverCore& solver_;
563+
564+
::apache::thrift::stdcxx::shared_ptr<::apache::thrift::transport::TSocket> socket_;
538565
::apache::thrift::stdcxx::shared_ptr<::apache::thrift::transport::TTransport> executorTransport_;
566+
539567
std::unique_ptr<executor::ContractExecutorConcurrentClient> origExecutor_;
568+
std::unique_ptr<cs::Process> executorProcess_;
540569

541570
general::AccessID lastAccessId_{};
542571
std::map<general::AccessID, cs::Sequence> accessSequence_;
@@ -549,9 +578,13 @@ public slots:
549578
std::atomic_size_t execCount_{0};
550579

551580
std::condition_variable cvErrorConnect_;
552-
std::atomic_bool isConnect_{ false };
553581
std::atomic_bool requestStop_{ false };
554-
const uint16_t EXECUTOR_VERSION = 2;
582+
583+
const int16_t EXECUTOR_VERSION = 2;
584+
585+
// timeout in ms
586+
const int kSendTimeout = 4000;
587+
const int kReceiveTimeout = 4000;
555588

556589
// temporary solution?
557590
std::mutex callExecutorLock_;

0 commit comments

Comments
 (0)