Skip to content

Commit 6e5738f

Browse files
committed
Rename subscription option 'from_seq_id' to 'from'
Replace SubscriptionOptions.from_seq_id with from across backend, CLI tests, and client SDKs (Dart/TS/WASM) and update generated bindings. Add serde alias to keep backwards compatibility for incoming JSON. Introduce SQL parser helpers: normalize_context_keyword_calls_for_sqlparser and rewrite_context_functions_for_datafusion, and wire them into SqlExecutor so sqlparser/DataFusion accept and execute CURRENT_USER()/CURRENT_ROLE()/CURRENT_USER_ID() style calls. Update tests and docs to reflect the new option name and add tests for the context-function handling.
1 parent a101b75 commit 6e5738f

43 files changed

Lines changed: 486 additions & 294 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

backend/crates/kalamdb-api/src/handlers/ws/events/subscription.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ pub async fn handle_subscribe(
6363
let batch_size = subscription.options.batch_size.unwrap_or(MAX_ROWS_PER_BATCH);
6464

6565
// Create initial data options respecting all three options:
66-
// - from_seq_id: Resume from a specific sequence ID
66+
// - from: Resume from a specific sequence ID
6767
// - last_rows: Fetch the last N rows
6868
// - batch_size: Hint for server-side batch sizing
69-
let initial_opts = if let Some(from_seq) = subscription.options.from_seq_id {
69+
let initial_opts = if let Some(from_seq) = subscription.options.from {
7070
// Resume from specific sequence ID - use since_seq for filtering
7171
InitialDataOptions::batch(Some(from_seq), None, batch_size)
7272
} else if let Some(n) = subscription.options.last_rows {

backend/crates/kalamdb-commons/src/websocket.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ pub struct SubscriptionRequest {
281281
///
282282
/// These options control individual subscription behavior including:
283283
/// - Initial data loading (batch_size, last_rows)
284-
/// - Data resumption after reconnection (from_seq_id)
284+
/// - Data resumption after reconnection (from)
285285
///
286286
/// Used by both SQL SUBSCRIBE TO command and WebSocket subscribe messages.
287287
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
@@ -299,8 +299,8 @@ pub struct SubscriptionOptions {
299299
/// Resume subscription from a specific sequence ID
300300
/// When set, the server will only send changes after this seq_id
301301
/// Typically set automatically during reconnection to resume from last received event
302-
#[serde(skip_serializing_if = "Option::is_none")]
303-
pub from_seq_id: Option<SeqId>,
302+
#[serde(skip_serializing_if = "Option::is_none", alias = "from_seq_id")]
303+
pub from: Option<SeqId>,
304304
}
305305

306306
/// Batch control metadata for paginated initial data loading

backend/crates/kalamdb-core/src/live/manager/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ fn create_test_subscription_request(
3131
options: SubscriptionOptions {
3232
batch_size: None,
3333
last_rows,
34-
from_seq_id: None,
34+
from: None,
3535
},
3636
}
3737
}

backend/crates/kalamdb-core/src/sql/executor/sql_executor.rs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,11 @@ impl SqlExecutor {
146146
// custom DDL (CREATE NAMESPACE, CREATE USER, SHOW TABLES, etc.).
147147
// When it fails we fall through with None — the classifier and
148148
// executor handle these statements via their own tokeniser.
149-
let parsed_statement = kalamdb_sql::parse_single_statement(sql).ok().flatten();
149+
let sqlparser_compatible_sql =
150+
kalamdb_sql::normalize_context_keyword_calls_for_sqlparser(sql);
151+
let parsed_statement = kalamdb_sql::parse_single_statement(&sqlparser_compatible_sql)
152+
.ok()
153+
.flatten();
150154
let table_id = parsed_statement.as_ref().and_then(|stmt| {
151155
kalamdb_sql::extract_dml_table_id_from_statement(
152156
stmt,
@@ -300,6 +304,8 @@ impl SqlExecutor {
300304
exec_ctx: &ExecutionContext,
301305
dml_kind: DmlKind,
302306
) -> Result<ExecutionResult, KalamDbError> {
307+
let execution_sql = kalamdb_sql::rewrite_context_functions_for_datafusion(sql);
308+
let execution_sql = execution_sql.as_str();
303309
let parsed_statement = metadata.parsed_statement.as_ref();
304310
self.block_system_namespace_dml(metadata.table_id.as_ref(), dml_kind)?;
305311

@@ -340,7 +346,7 @@ impl SqlExecutor {
340346
let df = if params.is_empty() {
341347
let session = exec_ctx.create_session_with_user();
342348
let plan_start = std::time::Instant::now();
343-
match session.sql(sql).await {
349+
match session.sql(execution_sql).await {
344350
Ok(df) => {
345351
tracing::debug!(plan_ms = %plan_start.elapsed().as_micros() as f64 / 1000.0, "sql.dml_plan");
346352
df
@@ -357,7 +363,7 @@ impl SqlExecutor {
357363
}
358364
let retry_session = exec_ctx.create_session_with_user();
359365
retry_session
360-
.sql(sql)
366+
.sql(execution_sql)
361367
.await
362368
.map_err(|e2| self.log_sql_error(sql, exec_ctx, e2))?
363369
} else {
@@ -366,8 +372,11 @@ impl SqlExecutor {
366372
},
367373
}
368374
} else {
369-
let cache_key =
370-
PlanCacheKey::new(exec_ctx.default_namespace().clone(), exec_ctx.user_role(), sql);
375+
let cache_key = PlanCacheKey::new(
376+
exec_ctx.default_namespace().clone(),
377+
exec_ctx.user_role(),
378+
execution_sql,
379+
);
371380
let session = exec_ctx.create_session_with_user();
372381

373382
if let Some(template_plan) = self.plan_cache.get(&cache_key) {
@@ -381,7 +390,7 @@ impl SqlExecutor {
381390
e
382391
);
383392

384-
match session.sql(sql).await {
393+
match session.sql(execution_sql).await {
385394
Ok(planned_df) => {
386395
let template_plan = planned_df.logical_plan().clone();
387396
self.plan_cache.insert(cache_key.clone(), template_plan.clone());
@@ -404,7 +413,7 @@ impl SqlExecutor {
404413
}
405414
let retry_session = exec_ctx.create_session_with_user();
406415
let retry_df = retry_session
407-
.sql(sql)
416+
.sql(execution_sql)
408417
.await
409418
.map_err(|e2| self.log_sql_error(sql, exec_ctx, e2))?;
410419
let template_plan = retry_df.logical_plan().clone();
@@ -424,7 +433,7 @@ impl SqlExecutor {
424433
},
425434
}
426435
} else {
427-
match session.sql(sql).await {
436+
match session.sql(execution_sql).await {
428437
Ok(planned_df) => {
429438
let template_plan = planned_df.logical_plan().clone();
430439
self.plan_cache.insert(cache_key.clone(), template_plan.clone());
@@ -446,7 +455,7 @@ impl SqlExecutor {
446455
}
447456
let retry_session = exec_ctx.create_session_with_user();
448457
let retry_df = retry_session
449-
.sql(sql)
458+
.sql(execution_sql)
450459
.await
451460
.map_err(|e2| self.log_sql_error(sql, exec_ctx, e2))?;
452461

@@ -497,6 +506,8 @@ impl SqlExecutor {
497506
params: Vec<ScalarValue>,
498507
exec_ctx: &ExecutionContext,
499508
) -> Result<ExecutionResult, KalamDbError> {
509+
let execution_sql = kalamdb_sql::rewrite_context_functions_for_datafusion(sql);
510+
let execution_sql = execution_sql.as_str();
500511
use crate::sql::executor::default_ordering::apply_default_order_by;
501512
use crate::sql::executor::parameter_binding::{
502513
replace_placeholders_in_plan, validate_params,
@@ -511,8 +522,11 @@ impl SqlExecutor {
511522

512523
// Try cached template plan first (works for both plain and parameterized SQL).
513524
// Key excludes user_id because LogicalPlan is user-agnostic - filtering happens at scan time.
514-
let cache_key =
515-
PlanCacheKey::new(exec_ctx.default_namespace().clone(), exec_ctx.user_role(), sql);
525+
let cache_key = PlanCacheKey::new(
526+
exec_ctx.default_namespace().clone(),
527+
exec_ctx.user_role(),
528+
execution_sql,
529+
);
516530

517531
let df = if let Some(template_plan) = self.plan_cache.get(&cache_key) {
518532
let executable_plan = if params.is_empty() {
@@ -525,7 +539,7 @@ impl SqlExecutor {
525539
Ok(df) => df,
526540
Err(e) => {
527541
log::warn!("Failed to execute cached plan, reparsing SQL: {}", e);
528-
let planned_df = match session.sql(sql).await {
542+
let planned_df = match session.sql(execution_sql).await {
529543
Ok(df) => df,
530544
Err(e) => {
531545
if Self::is_table_not_found_error(&e) {
@@ -543,7 +557,7 @@ impl SqlExecutor {
543557
);
544558
}
545559
let retry_session = exec_ctx.create_session_with_user();
546-
match retry_session.sql(sql).await {
560+
match retry_session.sql(execution_sql).await {
547561
Ok(df) => df,
548562
Err(e2) => {
549563
return Err(self.log_sql_error(sql, exec_ctx, e2));
@@ -584,7 +598,7 @@ impl SqlExecutor {
584598
},
585599
}
586600
} else {
587-
let planned_df = match session.sql(sql).await {
601+
let planned_df = match session.sql(execution_sql).await {
588602
Ok(df) => df,
589603
Err(e) => {
590604
if Self::is_table_not_found_error(&e) {
@@ -602,7 +616,7 @@ impl SqlExecutor {
602616
);
603617
}
604618
let retry_session = exec_ctx.create_session_with_user();
605-
match retry_session.sql(sql).await {
619+
match retry_session.sql(execution_sql).await {
606620
Ok(df) => df,
607621
Err(e2) => {
608622
return Err(self.log_sql_error(sql, exec_ctx, e2));
@@ -694,11 +708,13 @@ impl SqlExecutor {
694708
sql: &str,
695709
exec_ctx: &ExecutionContext,
696710
) -> Result<ExecutionResult, KalamDbError> {
711+
let execution_sql = kalamdb_sql::rewrite_context_functions_for_datafusion(sql);
712+
let execution_sql = execution_sql.as_str();
697713
// Create per-request SessionContext with user_id injected
698714
let session = exec_ctx.create_session_with_user();
699715

700716
// Execute the command directly via DataFusion
701-
let df = match session.sql(sql).await {
717+
let df = match session.sql(execution_sql).await {
702718
Ok(df) => df,
703719
Err(e) => {
704720
log::error!(

backend/crates/kalamdb-core/tests/test_context_functions.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,55 @@ async fn test_all_three_functions_together() {
347347
assert_eq!(role_array.value(0), "dba");
348348
}
349349

350+
#[tokio::test]
351+
async fn test_sql_standard_context_function_aliases() {
352+
let username = UserName::new("admin");
353+
let user_id = UserId::new("u_admin");
354+
let role = Role::Dba;
355+
356+
let auth_session = AuthSession::with_username_and_auth_details(
357+
user_id,
358+
username,
359+
role,
360+
kalamdb_commons::models::ConnectionInfo::new(None),
361+
kalamdb_session::AuthMethod::Bearer,
362+
);
363+
364+
let exec_ctx = ExecutionContext::from_session(auth_session, create_test_session());
365+
let session = exec_ctx.create_session_with_user();
366+
367+
let result = session
368+
.sql(
369+
"SELECT CURRENT_USER() AS username, CURRENT_USER_ID() AS user_id, CURRENT_ROLE() AS role",
370+
)
371+
.await;
372+
assert!(result.is_ok(), "Query failed: {:?}", result.err());
373+
374+
let batches = result.unwrap().collect().await.unwrap();
375+
assert_eq!(batches.len(), 1);
376+
assert_eq!(batches[0].num_rows(), 1);
377+
378+
let username_col = batches[0]
379+
.column(0)
380+
.as_any()
381+
.downcast_ref::<datafusion::arrow::array::StringArray>()
382+
.unwrap();
383+
let user_id_col = batches[0]
384+
.column(1)
385+
.as_any()
386+
.downcast_ref::<datafusion::arrow::array::StringArray>()
387+
.unwrap();
388+
let role_col = batches[0]
389+
.column(2)
390+
.as_any()
391+
.downcast_ref::<datafusion::arrow::array::StringArray>()
392+
.unwrap();
393+
394+
assert_eq!(username_col.value(0), "admin");
395+
assert_eq!(user_id_col.value(0), "u_admin");
396+
assert_eq!(role_col.value(0), "dba");
397+
}
398+
350399
#[tokio::test]
351400
async fn test_functions_in_where_clause() {
352401
let username = UserName::new("admin");

0 commit comments

Comments
 (0)