Skip to content

Commit 67066d8

Browse files
committed
try
1 parent 088e296 commit 67066d8

1 file changed

Lines changed: 40 additions & 0 deletions

File tree

booster_sdk/src/dds/rpc.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,15 @@ fn rpc_debug_enabled() -> bool {
112112
.unwrap_or(false)
113113
}
114114

115+
fn rpc_match_timeout() -> Duration {
116+
let default_ms = 8000_u64;
117+
let ms = std::env::var("BOOSTER_RPC_MATCH_TIMEOUT_MS")
118+
.ok()
119+
.and_then(|value| value.parse::<u64>().ok())
120+
.unwrap_or(default_ms);
121+
Duration::from_millis(ms)
122+
}
123+
115124
fn preview_for_log(value: &str, max_chars: usize) -> String {
116125
let mut preview = String::new();
117126
let mut chars = value.chars();
@@ -128,6 +137,19 @@ fn preview_for_log(value: &str, max_chars: usize) -> String {
128137
}
129138

130139
impl RpcClient {
140+
fn wait_for_request_match(&self, max_wait: Duration) -> bool {
141+
let deadline = Instant::now() + max_wait;
142+
loop {
143+
if !self.request_writer.get_matched_subscriptions().is_empty() {
144+
return true;
145+
}
146+
if Instant::now() >= deadline {
147+
return false;
148+
}
149+
std::thread::sleep(Duration::from_millis(50));
150+
}
151+
}
152+
131153
pub fn for_topic(options: RpcClientOptions, service_topic: impl Into<String>) -> Result<Self> {
132154
Self::new(options.with_service_topic(service_topic))
133155
}
@@ -229,6 +251,24 @@ impl RpcClient {
229251
R: DeserializeOwned + Send + 'static,
230252
{
231253
let debug_enabled = rpc_debug_enabled();
254+
let match_timeout = rpc_match_timeout();
255+
let matched = self.wait_for_request_match(match_timeout);
256+
if debug_enabled {
257+
tracing::debug!(
258+
target: "booster_sdk::rpc",
259+
service_topic = %self.service_topic,
260+
matched,
261+
match_wait_ms = match_timeout.as_millis(),
262+
"rpc request writer match status"
263+
);
264+
}
265+
if !matched {
266+
return Err(RpcError::Timeout {
267+
timeout: match_timeout,
268+
}
269+
.into());
270+
}
271+
232272
let request_id = Uuid::new_v4().to_string();
233273
let body = body.into();
234274
let header = serde_json::json!({ "api_id": api_id }).to_string();

0 commit comments

Comments
 (0)