Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bd-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ bd-network-quality.path = "../bd-network-quality"
bd-pgv.path = "../bd-pgv"
bd-proto.path = "../bd-proto"
bd-runtime.path = "../bd-runtime"
bd-session.path = "../bd-session"
bd-shutdown.path = "../bd-shutdown"
bd-time.path = "../bd-time"
bd-workspace-hack.workspace = true
Expand Down
134 changes: 104 additions & 30 deletions bd-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::network_quality::DISCONNECTED_OFFLINE_GRACE_PERIOD;
use crate::upload::{self, StateTracker};
use crate::{
DataUpload,
OPAQUE_USER_ID_KEY,
PlatformNetworkManager,
PlatformNetworkStream,
RuntimeBackoffPolicy,
Expand Down Expand Up @@ -51,6 +50,7 @@ pub use bd_proto::protos::client::api::sankey_intent_response::{
Drop as SankeyPathUploadDecisionDrop,
UploadImmediately as SankeyPathUploadDecisionImmediately,
};
use bd_proto::protos::client::api::state_update_request::OpaqueEntityUpdate;
pub use bd_proto::protos::client::api::upload_artifact_intent_response::{
Decision as ArtifactIntentDecision,
Drop as ArtifactIntentDecisionDrop,
Expand All @@ -61,6 +61,7 @@ use bd_proto::protos::client::api::{
ClientKillFile,
HandshakeRequest,
PingRequest,
StateUpdateRequest,
handshake_response,
};
use bd_proto::protos::logging::payload::Data as ProtoData;
Expand Down Expand Up @@ -88,6 +89,14 @@ struct StreamClosureInfo {
retry_after: Option<Duration>,
}

//
// InFlightStateUpdate
//

struct InFlightStateUpdate {
session_update: Option<bd_session::PendingStateUpdate>,
}

//
// HandshakeResult
//
Expand Down Expand Up @@ -342,7 +351,9 @@ pub struct Api {
data_upload_rx: Receiver<DataUpload>,
trigger_upload_tx: Sender<TriggerUpload>,
sleep_mode_active: watch::Receiver<bool>,
store: Arc<bd_key_value::Store>,
session_strategy: Arc<bd_session::Strategy>,
session_updates: watch::Receiver<u64>,
opaque_entity_updates: watch::Receiver<Option<String>>,

static_metadata: Arc<dyn Metadata + Send + Sync>,

Expand Down Expand Up @@ -404,6 +415,8 @@ impl Api {
stats: &Scope,
sleep_mode_active: watch::Receiver<bool>,
store: Arc<bd_key_value::Store>,
session_strategy: Arc<bd_session::Strategy>,
opaque_entity_updates: watch::Receiver<Option<String>>,
) -> Self {
let mut backoff_policy = RuntimeBackoffPolicy::new(runtime_loader.as_ref());
let generic_kill_duration = runtime_loader.register_duration_watch();
Expand All @@ -415,16 +428,18 @@ impl Api {

let backoff = backoff_policy.backoff_mark_update();

let reconnect_state =
crate::reconnect::ReconnectState::new(store.clone(), time_provider.clone());
let reconnect_state = crate::reconnect::ReconnectState::new(store, time_provider.clone());
let session_updates = session_strategy.subscribe_updates();
Self {
sdk_directory,
api_key,
manager,
static_metadata,
data_upload_rx,
trigger_upload_tx,
store,
session_strategy,
session_updates,
opaque_entity_updates,
time_provider,
network_quality_monitor,
runtime_loader,
Expand Down Expand Up @@ -475,7 +490,7 @@ impl Api {
fn handshake_metadata(&self) -> HashMap<String, ProtoData> {
let metadata = self.static_metadata.collect();

let mut handshake_metadata: HashMap<_, _> = metadata
metadata
.iter()
.map(|(k, v)| {
(
Expand All @@ -486,21 +501,31 @@ impl Api {
},
)
})
.collect();

if let Some(opaque_user_id) = self.store.get_string(&OPAQUE_USER_ID_KEY)
&& !opaque_user_id.is_empty()
{
handshake_metadata.insert(
"opaque_user_id".to_string(),
ProtoData {
data_type: Some(Data_type::StringData(opaque_user_id)),
..Default::default()
},
);
.collect()
}

fn current_opaque_entity_update(&self) -> OpaqueEntityUpdate {
OpaqueEntityUpdate {
opaque_entity_id: self.opaque_entity_updates.borrow().clone(),
..Default::default()
}
}

fn state_update_request(
&self,
session_update: Option<&StateUpdateRequest>,
include_opaque_entity: bool,
) -> Option<StateUpdateRequest> {
if session_update.is_none() && !include_opaque_entity {
return None;
}

let mut request = session_update.cloned().unwrap_or_default();
if include_opaque_entity {
request.opaque_entity_update = Some(self.current_opaque_entity_update()).into();
}

handshake_metadata
Some(request)
}

fn hash_api_key(&self) -> Vec<u8> {
Expand Down Expand Up @@ -771,13 +796,10 @@ impl Api {
log::debug!("sending handshake");

let last_disconnect_reason = self.last_disconnect_reason.take();
stream_state
.send_request(
self
.handshake_request(handshake_metadata, last_disconnect_reason)
.await,
)
.await?;
let (handshake_request, handshake_state_update) = self
.handshake_request(handshake_metadata, last_disconnect_reason)
.await;
stream_state.send_request(handshake_request).await?;

log::debug!("waiting for handshake");

Expand All @@ -789,10 +811,20 @@ impl Api {
configuration_update_status,
remaining_responses,
} => {
self
.session_strategy
.acknowledge_state_update(&handshake_state_update)
.await;
stream_state.initialize_stream_settings(stream_settings);

let mut in_flight_state_update = None;

if let Some(stream_closure_info) = self
.handle_responses(remaining_responses, &mut stream_state)
.handle_responses(
remaining_responses,
&mut stream_state,
&mut in_flight_state_update,
)
.await?
{
if let Some(retry_after) = stream_closure_info.retry_after {
Expand Down Expand Up @@ -862,6 +894,7 @@ impl Api {
// Consider the time we've processed the handshake or the spurious upload as the last
// time we received data at the start. This is refreshed below whenever we upload data.
let mut last_data_received_at = Instant::now();
let mut in_flight_state_update = None;

// At this point we have established the stream, so we should start the general
// request/response handling.
Expand Down Expand Up @@ -890,6 +923,30 @@ impl Api {
log::trace!("sleep mode state changed, re-evaluating data idle timeout");
continue;
}
_ = self.session_updates.changed(), if in_flight_state_update.is_none() => {
let _ = self.session_updates.borrow_and_update();
let Some(session_update) = self.session_strategy.pending_state_update().await else {
continue;
};
stream_state.send_request(session_update.request().clone()).await?;
in_flight_state_update = Some(InFlightStateUpdate {
session_update: Some(session_update),
});
self.reconnect_state.record_connectivity_event();
continue;
}
_ = self.opaque_entity_updates.changed(), if in_flight_state_update.is_none() => {
let _ = self.opaque_entity_updates.borrow_and_update();
let Some(request) = self.state_update_request(None, true) else {
continue;
};
stream_state.send_request(request).await?;
in_flight_state_update = Some(InFlightStateUpdate {
session_update: None,
});
self.reconnect_state.record_connectivity_event();
continue;
}
Some(data_upload) = self.data_upload_rx.recv() => {
log::trace!("received data upload");
last_data_received_at = Instant::now();
Expand Down Expand Up @@ -920,7 +977,9 @@ impl Api {

let stream_closure_info = match stream_state.handle_upstream_event(upstream_event).await? {
UpstreamEvent::UpstreamMessages(responses) => {
self.handle_responses(responses, &mut stream_state).await?
self
.handle_responses(responses, &mut stream_state, &mut in_flight_state_update)
.await?
},
UpstreamEvent::StreamClosed(reason) => Some(StreamClosureInfo {
reason,
Expand Down Expand Up @@ -975,6 +1034,7 @@ impl Api {
&self,
responses: Vec<ApiResponse>,
stream_state: &mut StreamState,
in_flight_state_update: &mut Option<InFlightStateUpdate>,
) -> anyhow::Result<Option<StreamClosureInfo>> {
for response in responses {
match response.response_type {
Expand Down Expand Up @@ -1089,6 +1149,16 @@ impl Api {
stream_state.send_request(request).await?;
}
},
Some(Response_type::StateUpdate(_)) => {
if let Some(in_flight_state_update) = in_flight_state_update.take()
&& let Some(session_update) = in_flight_state_update.session_update
{
self
.session_strategy
.acknowledge_state_update(&session_update)
.await;
}
},
None => {
debug_assert!(false, "not handled");
},
Expand All @@ -1103,20 +1173,24 @@ impl Api {
&self,
metadata: &HashMap<String, ProtoData>,
previous_disconnect_reason: Option<String>,
) -> HandshakeRequest {
) -> (HandshakeRequest, bd_session::PendingStateUpdate) {
let opaque_client_state = tokio::fs::read(&self.opaque_client_state_path()).await.ok();
let session_update = self.session_strategy.handshake_state_update().await;
let mut handshake = HandshakeRequest {
static_device_metadata: metadata.clone(),
previous_disconnect_reason: previous_disconnect_reason.unwrap_or_default(),
sleep_mode: *self.sleep_mode_active.borrow(),
opaque_client_state,
state_update: self
.state_update_request(Some(session_update.request()), true)
.into(),
..Default::default()
};

self.config_updater.fill_handshake(&mut handshake);
self.runtime_loader.fill_handshake(&mut handshake);

handshake
(handshake, session_update)
}

async fn process_opaque_client_state(&self, state: Option<Vec<u8>>) {
Expand Down
Loading
Loading