Skip to content

Commit 0e875bb

Browse files
committed
Added support for a new "on_close_connection" hook.
1 parent 6122a3a commit 0e875bb

1 file changed

Lines changed: 34 additions & 0 deletions

File tree

  • rust/loro-websocket-server/src

rust/loro-websocket-server/src/lib.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,18 @@ pub struct HandshakeAuthArgs<'a> {
127127

128128
type HandshakeAuthFn = dyn Fn(HandshakeAuthArgs) -> bool + Send + Sync;
129129

130+
/// Arguments provided to `on_close_connection`.
131+
pub struct CloseConnectionArgs {
132+
pub workspace: String,
133+
pub conn_id: u64,
134+
pub rooms: Vec<(CrdtType, String)>,
135+
}
136+
137+
type CloseConnectionFuture =
138+
Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'static>>;
139+
type CloseConnectionFn =
140+
Arc<dyn Fn(CloseConnectionArgs) -> CloseConnectionFuture + Send + Sync>;
141+
130142
#[derive(Clone)]
131143
pub struct ServerConfig<DocCtx = ()> {
132144
pub on_load_document: Option<LoadFn<DocCtx>>,
@@ -144,6 +156,9 @@ pub struct ServerConfig<DocCtx = ()> {
144156
///
145157
/// Return true to accept, false to reject with 401.
146158
pub handshake_auth: Option<Arc<HandshakeAuthFn>>,
159+
/// Optional hook invoked after a connection fully closes.
160+
/// Receives the workspace id, connection id, and rooms the client had joined.
161+
pub on_close_connection: Option<CloseConnectionFn>,
147162
}
148163

149164
// CRDT document abstraction to reduce match-based branching
@@ -459,6 +474,7 @@ impl<DocCtx> Default for ServerConfig<DocCtx> {
459474
default_permission: Permission::Write,
460475
authenticate: None,
461476
handshake_auth: None,
477+
on_close_connection: None,
462478
}
463479
}
464480
}
@@ -909,6 +925,7 @@ where
909925

910926
// Capture config outside of non-async closure
911927
let handshake_auth = registry.config.handshake_auth.clone();
928+
let close_connection = registry.config.on_close_connection.clone();
912929
let workspace_holder: Arc<std::sync::Mutex<Option<String>>> =
913930
Arc::new(std::sync::Mutex::new(None));
914931
let workspace_holder_c = workspace_holder.clone();
@@ -1422,6 +1439,11 @@ where
14221439
}
14231440
}
14241441

1442+
let rooms_for_hook: Vec<(CrdtType, String)> = joined_rooms
1443+
.into_iter()
1444+
.map(|RoomKey { crdt, room }| (crdt, room))
1445+
.collect();
1446+
14251447
// cleanup
14261448
{
14271449
let mut h = hub.lock().await;
@@ -1430,6 +1452,18 @@ where
14301452
// drop tx to stop writer
14311453
drop(tx);
14321454
let _ = sink_task.await;
1455+
1456+
if let Some(hook) = close_connection {
1457+
let args = CloseConnectionArgs {
1458+
workspace: workspace_id.clone(),
1459+
conn_id,
1460+
rooms: rooms_for_hook,
1461+
};
1462+
if let Err(e) = (hook)(args).await {
1463+
warn!(conn_id, %e, "on_close_connection hook failed");
1464+
}
1465+
}
1466+
14331467
debug!(conn_id, "connection closed and cleaned up");
14341468
Ok(())
14351469
}

0 commit comments

Comments
 (0)