diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 4531092..6f28da4 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -40,6 +40,7 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC mon_merge_period_ms = 1000; multicast_send_period_ms = 5000; alarm_cooldown_ms = 1000; + config_devicename = ""; m_variables.Get("alerts_send", alerts_send); m_variables.Get("alert_send_port", alert_send_port); @@ -64,7 +65,9 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC if(m_verbose) std::cerr<<"device names cannot start with '('"<*)nullptr, &timeout, &err); + bool ok = m_backend_client.SendCommand("W_ALARM", cmd_string, (std::vector*)nullptr, timeout, &err); if(!ok){ if(m_verbose) std::cerr<<"SendAlarm error: "<& resp // Since we don't know what a user-provided query string may be, prepend with a space to ensure this. std::string sanitized_query = std::string{" "}+query; - if(!m_backend_client.SendCommand("W_QUERY", sanitized_query, &responses, &timeout, &err)){ + if(!m_backend_client.SendCommand("W_QUERY", sanitized_query, &responses, timeout, &err)){ if(m_verbose) std::cerr<<"SQLQuery error: "<(std::chrono::system_clock::now().time_since_epoch()).count(); } + timestamp_ms = timestamp%1000; + timestamp_sec = timestamp/1000; struct tm timestruct; gmtime_r(×tamp_sec, ×truct); // FIXME error checking? char timestring[24]; @@ -1168,23 +1170,21 @@ bool Services::LoadConfigAlertFunc(const char* alert, const char* payload){ if(run_mode_config_id!=m_run_mode_config_id || base_config_id!=m_base_config_id){ while(count<5){ - if(!GetCachedDeviceConfig(m_local_config, base_config_id, run_mode_config_id)){ - usleep(100000); - count++; + if(!GetCachedDeviceConfig(m_local_config, base_config_id, run_mode_config_id, config_devicename)){ + usleep(100000); + count++; } else count=99; } if(count==5) return false; - - (*sc_vars)["NewConfig"]->SetValue(1); - m_base_config_id = base_config_id; - m_run_mode_config_id = run_mode_config_id; - } - - - + + (*sc_vars)["NewConfig"]->SetValue(1); + m_base_config_id = base_config_id; + m_run_mode_config_id = run_mode_config_id; + } + return true; - + } std::string Services::LoadConfigSlowControlFunc(const char* control){ @@ -1194,10 +1194,10 @@ std::string Services::LoadConfigSlowControlFunc(const char* control){ tmp.JsonParser(payload); uint64_t base_config_id=0; uint64_t run_mode_config_id=0; - + short count = 0; std::stringstream ret; - + tmp.Get("Base",base_config_id); tmp.Get("RunMode",run_mode_config_id); @@ -1205,7 +1205,7 @@ std::string Services::LoadConfigSlowControlFunc(const char* control){ if(run_mode_config_id!=m_run_mode_config_id || base_config_id!=m_base_config_id){ while(count<5){ - if(!GetCachedDeviceConfig(m_local_config, base_config_id, run_mode_config_id)){ + if(!GetCachedDeviceConfig(m_local_config, base_config_id, run_mode_config_id, config_devicename)){ usleep(100000); count++; } @@ -1223,8 +1223,8 @@ std::string Services::LoadConfigSlowControlFunc(const char* control){ ret <<"Loaded config "< #include -#define SERVICES_DEFAULT_TIMEOUT 1800 +#define SERVICES_DEFAULT_TIMEOUT 0 namespace ToolFramework { enum class LogLevel { Error=0, Warning=1, Message=2, Debug=3, Debug1=4, Debug2=5, Debug3=6 }; struct LogMsg { - LogMsg(const std::string& i_message, LogLevel i_severity=LogLevel::Message, const std::string& i_device="", const uint64_t i_timestamp=0) : message{i_message}, severity{i_severity}, device{i_device}, timestamp{i_timestamp} {}; + LogMsg(const std::string& i_message, LogLevel i_severity=LogLevel::Message, const std::string& i_device="", const uint64_t i_timestamp=0) : message{i_message}, severity{i_severity}, device{i_device}, timestamp{i_timestamp}, repeats{0} {}; std::string message; LogLevel severity; std::string device; @@ -136,6 +136,7 @@ namespace ToolFramework { static void BufferThread(Thread_args* args); std::string m_name; + std::string config_devicename; bool m_verbose; zmq::context_t* m_context; ServicesBackend m_backend_client; diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index c0ad933..ac76756 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -179,7 +179,7 @@ bool ServicesBackend::InitZMQ(){ inpoll_timeout=500; // total timeout on how long we wait for response from a command - command_timeout=2000; + command_timeout=300; // Update with user-specified values. m_variables.Get("clt_pub_port",clt_pub_port); @@ -215,23 +215,32 @@ bool ServicesBackend::InitZMQ(){ // socket to publish write commands // ------------------------------- - clt_pub_socket = new zmq::socket_t(*context, ZMQ_PUB); - clt_pub_socket->setsockopt(ZMQ_LINGER,10); - clt_pub_socket->setsockopt(ZMQ_SNDTIMEO, clt_pub_socket_timeout); - clt_pub_socket->setsockopt(ZMQ_LINGER, 10); - clt_pub_socket->bind(std::string("tcp://*:")+std::to_string(clt_pub_port)); - + try{ + clt_pub_socket = new zmq::socket_t(*context, ZMQ_PUB); + clt_pub_socket->setsockopt(ZMQ_LINGER,10); + clt_pub_socket->setsockopt(ZMQ_SNDTIMEO, clt_pub_socket_timeout); + clt_pub_socket->setsockopt(ZMQ_LINGER, 10); + clt_pub_socket->bind(std::string("tcp://*:")+std::to_string(clt_pub_port)); + } catch(zmq::error_t& e){ + std::cerr<<"ServicesBackend caught "<setsockopt(ZMQ_LINGER,10); - clt_dlr_socket->setsockopt(ZMQ_SNDTIMEO, clt_dlr_socket_timeout); - clt_dlr_socket->setsockopt(ZMQ_RCVTIMEO, clt_dlr_socket_timeout); - clt_dlr_socket->setsockopt(ZMQ_IDENTITY, clt_ID.c_str(), clt_ID.length()); - clt_dlr_socket->setsockopt(ZMQ_IMMEDIATE,1); - clt_dlr_socket->setsockopt(ZMQ_LINGER, 10); - clt_dlr_socket->bind(std::string("tcp://*:")+std::to_string(clt_dlr_port)); + try{ + clt_dlr_socket = new zmq::socket_t(*context, ZMQ_DEALER); + clt_dlr_socket->setsockopt(ZMQ_LINGER,10); + clt_dlr_socket->setsockopt(ZMQ_SNDTIMEO, clt_dlr_socket_timeout); + clt_dlr_socket->setsockopt(ZMQ_RCVTIMEO, clt_dlr_socket_timeout); + clt_dlr_socket->setsockopt(ZMQ_IDENTITY, clt_ID.c_str(), clt_ID.length()); + clt_dlr_socket->setsockopt(ZMQ_IMMEDIATE,1); + clt_dlr_socket->setsockopt(ZMQ_LINGER, 10); + clt_dlr_socket->bind(std::string("tcp://*:")+std::to_string(clt_dlr_port)); + } catch(zmq::error_t& e){ + std::cerr<<"ServicesBackend caught "<* results, const uint32_t* timeout_ms, std::string* err){ +bool ServicesBackend::SendCommand(const std::string& topic, const std::string& command, std::vector* results, const uint32_t timeout_ms, std::string* err){ // send a command and receive response. // This is a wrapper that ensures we always return within the requested timeout. if(m_verbosity>10) std::cout<<"ServicesBackend::SendCommand invoked with command '"< resultsvec; @@ -669,9 +678,16 @@ bool ServicesBackend::GetNextResponse(){ int poll_timeout = (waiting_senders.empty()) ? inpoll_timeout : 0; std::vector response; - dlr_socket_mutex.lock(); - int ret = PollAndReceive(clt_dlr_socket, in_polls.at(0), poll_timeout, response); - dlr_socket_mutex.unlock(); + int ret; + try{ + dlr_socket_mutex.lock(); + ret = PollAndReceive(clt_dlr_socket, in_polls.at(0), poll_timeout, response); + dlr_socket_mutex.unlock(); + }catch(zmq::error_t& e){ + dlr_socket_mutex.unlock(); + std::cerr<<"ServicesBackend caught "<10) std::cout<<"ServicesBackend::SendNextCommand calling PollAndSend" <<", message type: "<10) std::cout<<"ServicesBackend::SendNextCommand send returned "<RemoveService("slowcontrol_write"); @@ -1037,6 +1061,7 @@ bool ServicesBackend::Ready(int timeout){ ret = zmq::poll(&out_polls.at(1), 1, timeout); dlr_socket_mutex.unlock(); } catch (zmq::error_t& err){ + dlr_socket_mutex.unlock(); std::cerr<<"ServicesBackend::Ready caught "<* results=nullptr, const uint32_t* timeout_ms=nullptr, std::string* err=nullptr); - bool SendCommand(const std::string& topic, const std::string& command, std::string* results=nullptr, const uint32_t* timeout_ms=nullptr, std::string* err=nullptr); + bool SendCommand(const std::string& topic, const std::string& command, std::vector* results=nullptr, const uint32_t timeout_ms=0, std::string* err=nullptr); + bool SendCommand(const std::string& topic, const std::string& command, std::string* results=nullptr, const uint32_t timeout_ms=0, std::string* err=nullptr); // multicasts bool SendMulticast(MulticastType type, std::string command, std::string* err=nullptr);