Skip to content

Commit 93762f4

Browse files
authored
Merge pull request #219 from kalamstack/028-auth-integration
More improvements to performance of ws and fixes to @kalamdb/orm package
2 parents ab0abfe + ce0adf9 commit 93762f4

77 files changed

Lines changed: 7450 additions & 706 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,4 @@ link/sdks/typescript/client/.npmrc
126126
/link/kalam-client/sdks/typescript/client/.wasm-cargo-home-size-current2
127127
/link/kalam-client/sdks/typescript/client/.wasm-target-size-current
128128
/link/kalam-client/sdks/typescript/client/.wasm-target-size-current2
129+
/benchv2/logs

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ arrow-ipc = { version = "58.1.0", default-features = false }
106106
arrow-schema = { version = "58.1.0" }
107107
datafusion = { version = "53.1.0", default-features = false, features = ["sql", "parquet", "recursive_protection", "nested_expressions"] }
108108
datafusion-datasource = { version = "53.1.0", default-features = false }
109-
datafusion-common = { version = "53.1.0" }
109+
datafusion-common = { version = "53.1.0", default-features = false }
110110
datafusion-expr = { version = "53.1.0" }
111111
datafusion-functions-json = { version = "0.53.0" }
112112
sqlparser = { version = "0.61.0" }

backend/crates/kalamdb-api/src/http/auth/models/login_request.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const MAX_PASSWORD_LENGTH: usize = 256;
1212
#[serde(deny_unknown_fields)]
1313
pub struct LoginRequest {
1414
/// Canonical user identifier for authentication
15-
#[serde(deserialize_with = "validate_user_length")]
15+
#[serde(alias = "username", deserialize_with = "validate_user_length")]
1616
pub user: String,
1717
/// Password for authentication
1818
#[serde(deserialize_with = "validate_password_length")]

backend/crates/kalamdb-api/src/ws/compression.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
//!
33
//! Provides gzip compression for WebSocket messages to reduce bandwidth.
44
//! Compression is enabled by default for all messages over the threshold.
5+
//!
6+
//! Performance: pre-sizes the output buffer to a fraction of the input so
7+
//! the encoder doesn't grow through multiple doublings on large payloads.
8+
//! actix-ws consumes owned `Vec<u8>`, so a single allocation per message is
9+
//! unavoidable without a per-connection write pipeline; this keeps it to
10+
//! exactly one modestly-sized allocation rather than several doublings.
511
612
use flate2::write::GzEncoder;
713
use flate2::Compression;
@@ -11,20 +17,28 @@ use std::io::Write;
1117
/// Messages smaller than this are sent uncompressed
1218
pub const COMPRESSION_THRESHOLD: usize = 512;
1319

14-
/// Compress data using gzip
20+
/// Heuristic initial capacity for gzip output. Real-world JSON payloads
21+
/// compress to 20–40% of original; pre-sizing to 1/3 avoids 2–3
22+
/// `Vec::grow` reallocations inside the encoder on the hot path.
23+
#[inline]
24+
fn gzip_initial_capacity(input_len: usize) -> usize {
25+
// Minimum gzip header + footer overhead is ~20 bytes.
26+
(input_len / 3).max(64)
27+
}
28+
29+
/// Compress data using gzip.
1530
///
16-
/// Returns compressed bytes on success, or the original data if compression fails
31+
/// Returns compressed bytes on success, or the original data if compression fails.
1732
pub fn compress_gzip(data: &[u8]) -> Vec<u8> {
18-
let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
33+
let buf = Vec::with_capacity(gzip_initial_capacity(data.len()));
34+
let mut encoder = GzEncoder::new(buf, Compression::fast());
1935
if encoder.write_all(data).is_ok() {
2036
if let Ok(compressed) = encoder.finish() {
21-
// Only use compressed if it's actually smaller
2237
if compressed.len() < data.len() {
2338
return compressed;
2439
}
2540
}
2641
}
27-
// Fallback to original data
2842
data.to_vec()
2943
}
3044

backend/crates/kalamdb-api/src/ws/events/mod.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,21 @@ pub async fn send_message<T: serde::Serialize>(
122122
/// sent as binary frames. When `false`, the raw payload is always sent as a
123123
/// text frame, which is easier to inspect during development.
124124
async fn send_data(session: &mut Session, data: &[u8], compress: bool) -> Result<(), ()> {
125+
// Fast path: no compression — send as Text frame directly without a
126+
// UTF-8 round-trip. Callers only reach this path with bytes that they
127+
// just produced from `serde_json`/`rmp_serde`, so they are already valid
128+
// UTF-8 when `compress == false` and serialization chose the text branch.
125129
if !compress {
126-
let text = String::from_utf8_lossy(data);
127-
return session.text(text.into_owned()).await.map_err(|_| ());
130+
// `String::from_utf8_lossy(..).into_owned()` previously allocated a
131+
// fresh String and scanned every byte even for known-valid JSON. Use
132+
// `from_utf8` and fall back to lossy only on the (never-observed)
133+
// error path to stay defensive without paying the cost on the hot
134+
// path.
135+
let owned = match std::str::from_utf8(data) {
136+
Ok(s) => s.to_owned(),
137+
Err(_) => String::from_utf8_lossy(data).into_owned(),
138+
};
139+
return session.text(owned).await.map_err(|_| ());
128140
}
129141

130142
let (payload, compressed) = maybe_compress(data);
@@ -133,10 +145,13 @@ async fn send_data(session: &mut Session, data: &[u8], compress: bool) -> Result
133145
// Send compressed data as binary frame
134146
session.binary(payload).await.map_err(|_| ())
135147
} else {
136-
// Send uncompressed data as text frame
137-
// Safe to convert since original data was valid JSON string
138-
let text = String::from_utf8_lossy(&payload);
139-
session.text(text.into_owned()).await.map_err(|_| ())
148+
// Send uncompressed data as text frame. `maybe_compress` returned the
149+
// original bytes unchanged, so they remain valid UTF-8 JSON.
150+
let owned = match std::str::from_utf8(&payload) {
151+
Ok(s) => s.to_owned(),
152+
Err(_) => String::from_utf8_lossy(&payload).into_owned(),
153+
};
154+
session.text(owned).await.map_err(|_| ())
140155
}
141156
}
142157

backend/crates/kalamdb-commons/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ once_cell = { workspace = true }
1717
hex = { workspace = true }
1818
sha2 = { workspace = true }
1919
thiserror = { workspace = true }
20-
datafusion-common = { workspace = true, optional = true }
20+
datafusion-common = { workspace = true, optional = true, default-features = false }
2121
parking_lot = { workspace = true }
2222
storekey = { workspace = true }
2323
# Optional dependencies - only enabled via features

backend/crates/kalamdb-commons/src/websocket.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -857,9 +857,21 @@ impl WireNotification {
857857
let mut buf = Vec::with_capacity(est);
858858

859859
buf.extend_from_slice(b"{\"type\":\"change\",\"subscription_id\":\"");
860-
// Escape the subscription_id JSON-safely (ids are alphanumeric, but be safe).
861-
let escaped = self.subscription_id.replace('\\', "\\\\").replace('"', "\\\"");
862-
buf.extend_from_slice(escaped.as_bytes());
860+
// Fast path: the overwhelming majority of real subscription IDs are
861+
// plain ASCII (UUIDs, cuids, alnum+`-_`), so we can splice the raw
862+
// bytes without allocating a second String for escaping. Only fall
863+
// back to the allocating path when we actually see a byte that would
864+
// need JSON escaping.
865+
let sid_bytes = self.subscription_id.as_bytes();
866+
let needs_escape = sid_bytes
867+
.iter()
868+
.any(|&b| b == b'\\' || b == b'"' || b < 0x20 || b >= 0x7f);
869+
if needs_escape {
870+
let escaped = self.subscription_id.replace('\\', "\\\\").replace('"', "\\\"");
871+
buf.extend_from_slice(escaped.as_bytes());
872+
} else {
873+
buf.extend_from_slice(sid_bytes);
874+
}
863875
buf.extend_from_slice(b"\",\"change_type\":\"");
864876
buf.extend_from_slice(p.change_type.as_str().as_bytes());
865877
buf.push(b'"');

backend/crates/kalamdb-live/src/helpers/initial_data.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,10 +296,43 @@ impl InitialDataFetcher {
296296
})
297297
}
298298

299-
/// Compute snapshot end sequence for a subscription
299+
/// Compute snapshot end sequence for a subscription.
300300
///
301-
/// Uses MAX(_seq) with the same filters as initial data to define a snapshot boundary.
301+
/// Fast path: since `_seq` is a Snowflake ID with embedded timestamp, the
302+
/// maximum possible `_seq` at the current wall-clock millisecond is an
303+
/// upper bound on every row already written. Any write performed *after*
304+
/// this boundary is computed will get a strictly larger `_seq` (different
305+
/// timestamp component) and therefore flow through the live notification
306+
/// path, not the initial snapshot.
307+
///
308+
/// This removes an entire DataFusion execution from the subscribe critical
309+
/// path (previously ~several ms to tens of ms depending on planning cost),
310+
/// which is one of the biggest wins for time-to-first-row.
311+
///
312+
/// All arguments are accepted for API compatibility; `role`, `table_id`,
313+
/// `table_type`, `options`, and `where_clause` are unused on the fast path.
302314
pub async fn compute_snapshot_end_seq(
315+
&self,
316+
_live_id: &kalamdb_commons::models::LiveQueryId,
317+
_role: Role,
318+
_table_id: &TableId,
319+
_table_type: TableType,
320+
_options: &InitialDataOptions,
321+
_where_clause: Option<&str>,
322+
) -> Result<Option<SeqId>, LiveError> {
323+
let now_ms = std::time::SystemTime::now()
324+
.duration_since(std::time::UNIX_EPOCH)
325+
.map(|d| d.as_millis() as u64)
326+
.unwrap_or(SeqId::EPOCH);
327+
328+
match SeqId::max_id_for_timestamp(now_ms) {
329+
Ok(seq) => Ok(Some(seq)),
330+
Err(e) => Err(LiveError::Other(format!("Failed to compute snapshot boundary: {}", e))),
331+
}
332+
}
333+
334+
#[allow(dead_code)]
335+
async fn compute_snapshot_end_seq_sql_fallback(
303336
&self,
304337
live_id: &kalamdb_commons::models::LiveQueryId,
305338
role: Role,
@@ -308,7 +341,6 @@ impl InitialDataFetcher {
308341
options: &InitialDataOptions,
309342
where_clause: Option<&str>,
310343
) -> Result<Option<SeqId>, LiveError> {
311-
// Extract user_id from LiveId for RLS
312344
let user_id = live_id.user_id().clone();
313345

314346
let table_name = table_id.full_name();

backend/crates/kalamdb-live/src/models/connection.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ fn intern_subscription_str(value: &str) -> Arc<str> {
7373
/// Maximum pending notifications per connection before dropping new ones.
7474
/// Keep this modest: large snapshot catch-up is handled by per-subscription
7575
/// flow control, while a smaller live buffer reduces worst-case memory per
76-
/// slow connection.
76+
/// slow connection. At 100k concurrent idle connections this directly
77+
/// governs the per-connection memory floor.
7778
pub const NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
7879

7980
/// Maximum pending control events per connection.

backend/crates/kalamdb-live/src/notification.rs

Lines changed: 55 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,26 @@ use tokio::sync::mpsc;
2929
/// Number of sharded notification workers.
3030
/// Deterministic routing by table_id hash preserves per-table ordering
3131
/// while achieving parallelism across different tables.
32-
const NUM_NOTIFY_WORKERS: usize = 4;
32+
///
33+
/// Scales with available CPUs (up to a hard cap) so multi-core deployments
34+
/// can fan out across more tables in parallel. Falls back to 4 on the
35+
/// (rare) platforms where `available_parallelism` is unavailable.
36+
fn num_notify_workers() -> usize {
37+
// Cap at 16 to bound DashMap contention and worker overhead.
38+
// Minimum of 4 preserves previous baseline behavior on small machines.
39+
let cpus =
40+
std::thread::available_parallelism().map(std::num::NonZeroUsize::get).unwrap_or(4);
41+
cpus.clamp(4, 16)
42+
}
3343

34-
/// Per-worker queue capacity. Total capacity = NUM_NOTIFY_WORKERS × this value.
44+
/// Per-worker queue capacity. Total capacity = workers × this value.
3545
const NOTIFY_QUEUE_PER_WORKER: usize = 4_096;
3646

37-
/// Number of subscribers per parallel chunk for shared table streaming notification.
38-
/// Tuned to amortize tokio::spawn overhead while achieving parallelism at scale.
39-
const SHARED_NOTIFY_CHUNK_SIZE: usize = 256;
47+
/// Subscriber count above which we break fan-out into spawned chunks.
48+
/// For single-table fan-out at high subscriber counts (e.g. 100K on one
49+
/// table all hashing to one worker), spawning per-chunk lets the tokio
50+
/// runtime parallelise delivery across its thread pool.
51+
const SHARED_NOTIFY_CHUNK_SIZE: usize = 512;
4052

4153
struct NotificationTask {
4254
user_id: Option<UserId>,
@@ -274,10 +286,11 @@ impl NotificationService {
274286
}
275287

276288
pub fn new(registry: Arc<ConnectionsManager>) -> Arc<Self> {
277-
let mut worker_txs = Vec::with_capacity(NUM_NOTIFY_WORKERS);
278-
let mut worker_rxs = Vec::with_capacity(NUM_NOTIFY_WORKERS);
289+
let worker_count = num_notify_workers();
290+
let mut worker_txs = Vec::with_capacity(worker_count);
291+
let mut worker_rxs = Vec::with_capacity(worker_count);
279292

280-
for _ in 0..NUM_NOTIFY_WORKERS {
293+
for _ in 0..worker_count {
281294
let (tx, rx) = mpsc::channel(NOTIFY_QUEUE_PER_WORKER);
282295
worker_txs.push(tx);
283296
worker_rxs.push(rx);
@@ -424,35 +437,46 @@ impl NotificationService {
424437
);
425438
}
426439

427-
// Large fan-out: parallel chunked dispatch
428-
// Large fan-out: collect handles once, then parallel chunked dispatch
429-
let handles: Vec<SubscriptionHandle> =
440+
// Large fan-out: spawn a task per chunk so the tokio runtime can
441+
// parallelise delivery across its thread pool. When all subscribers
442+
// are on the same table they hash to one notification worker —
443+
// spawning is the only way to utilise multiple cores for the fan-out.
444+
let handles_vec: Vec<SubscriptionHandle> =
430445
all_handles.iter().map(|entry| entry.value().clone()).collect();
431446

432-
let mut join_handles = Vec::with_capacity(
433-
(handles.len() + SHARED_NOTIFY_CHUNK_SIZE - 1) / SHARED_NOTIFY_CHUNK_SIZE,
434-
);
435-
436-
for chunk in handles.chunks(SHARED_NOTIFY_CHUNK_SIZE) {
437-
let chunk = chunk.to_vec();
438-
let nr = Arc::clone(&new_row);
439-
let or = old_row.as_ref().map(Arc::clone);
440-
let ct = change_type.clone();
441-
let pk = Arc::clone(&pk_columns);
442-
443-
join_handles.push(tokio::spawn(async move {
444-
dispatch_chunk(chunk.into_iter(), &nr, or.as_deref(), &ct, &pk, seq_value)
447+
let table_id = table_id.clone();
448+
let mut tasks = Vec::new();
449+
450+
for chunk in handles_vec.chunks(SHARED_NOTIFY_CHUNK_SIZE) {
451+
let chunk_handles: Vec<SubscriptionHandle> = chunk.to_vec();
452+
let new_row = Arc::clone(&new_row);
453+
let old_row = old_row.as_ref().map(Arc::clone);
454+
let change_type = change_type.clone();
455+
let pk_columns = Arc::clone(&pk_columns);
456+
let table_id = table_id.clone();
457+
458+
tasks.push(tokio::spawn(async move {
459+
match dispatch_chunk(
460+
chunk_handles.into_iter(),
461+
&new_row,
462+
old_row.as_deref(),
463+
&change_type,
464+
&pk_columns,
465+
seq_value,
466+
) {
467+
Ok(count) => count,
468+
Err(e) => {
469+
log::error!("Notification dispatch error for table {}: {}", table_id, e);
470+
0
471+
},
472+
}
445473
}));
446474
}
447475

448476
let mut total = 0usize;
449-
for jh in join_handles {
450-
match jh.await {
451-
Ok(Ok(count)) => total += count,
452-
Ok(Err(e)) => {
453-
log::error!("Notification dispatch error for table {}: {}", table_id, e);
454-
},
455-
Err(e) => log::error!("Notification chunk task panicked: {}", e),
477+
for task in tasks {
478+
if let Ok(count) = task.await {
479+
total += count;
456480
}
457481
}
458482

0 commit comments

Comments
 (0)