Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,7 @@ jobs:
sed -i 's|default_storage_path = "./data/storage"|default_storage_path = "./test-data/storage"|g' server.toml
sed -i 's|logs_path = "./logs"|logs_path = "./test-data/logs"|g' server.toml
sed -i 's|jwt_secret = ".*"|jwt_secret = "pg-native-test-secret-key-minimum-32-characters-long"|g' server.toml
printf '\n[cluster]\ncluster_id = "pg-native-tests"\nnode_id = 1\nrpc_addr = "127.0.0.1:9188"\napi_addr = "http://127.0.0.1:8080"\nuser_shards = 1\nshared_shards = 1\n' >> server.toml

- name: Start server
shell: bash
Expand All @@ -1285,7 +1286,7 @@ jobs:
KALAMDB_ROOT_PASSWORD: "kalamdb123"
KALAMDB_JWT_SECRET: "pg-native-test-secret-key-minimum-32-characters-long"
KALAMDB_NODE_ID: "1"
KALAMDB_CLUSTER_RPC_ADDR: "0.0.0.0:9188"
KALAMDB_CLUSTER_RPC_ADDR: "127.0.0.1:9188"
KALAMDB_CLUSTER_API_ADDR: "http://127.0.0.1:8080"
run: |
set -euo pipefail
Expand Down Expand Up @@ -1319,20 +1320,41 @@ jobs:
set -euo pipefail
./pg/scripts/pgrx-test-setup.sh 2>&1 | tee pg-pgrx-setup-output.txt

- name: Run native PG extension e2e tests
- name: Run native PG extension perf tests (informational)
continue-on-error: true
shell: bash
env:
KALAMDB_SERVER_URL: "http://127.0.0.1:8080"
KALAMDB_ROOT_PASSWORD: "kalamdb123"
run: |
set -euo pipefail
: > pg-native-test-output.txt
cargo nextest run \
-p kalam-pg-extension \
--features e2e \
-E 'test(e2e)' \
--test e2e_perf \
--test-threads 1 \
--no-fail-fast \
2>&1 | tee pg-native-test-output.txt

- name: Run native PG extension e2e tests
shell: bash
env:
KALAMDB_SERVER_URL: "http://127.0.0.1:8080"
KALAMDB_ROOT_PASSWORD: "kalamdb123"
run: |
set -euo pipefail
cargo nextest run \
-p kalam-pg-extension \
--features e2e \
--test e2e_ddl \
--test e2e_dml \
--test e2e_scenarios \
--test extension_metadata \
--test session_settings \
--test-threads 1 \
2>&1 | tee -a pg-native-test-output.txt

- name: Stop pgrx PostgreSQL
if: always()
shell: bash
Expand Down Expand Up @@ -1826,6 +1848,8 @@ jobs:
push: true
platforms: linux/amd64
build-args: |
PG_MAJOR=${{ env.PG_EXTENSION_MAJOR }}
POSTGRES_BASE_IMAGE=public.ecr.aws/docker/library/postgres:${{ env.PG_EXTENSION_MAJOR }}-bookworm
OCI_IMAGE_DESCRIPTION=${{ steps.pg_image_description.outputs.value }}
labels: |
org.opencontainers.image.title=pg-kalam
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,4 @@ ts-sdk-repro/server.toml
/benchv2/logs
/benchv2/logs
link/sdks/typescript/client/.npmrc
/target-pg-bench
40 changes: 23 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ url = "2.5.8"
# WebSocket
tokio-tungstenite = { version = "0.29.0", features = ["rustls-tls-webpki-roots"] }

# Security floor pins for vulnerable transitive TLS/QUIC crates.
aws-lc-rs = { version = "1.16.2", default-features = false }
quinn-proto = { version = "0.11.14", default-features = false }
rustls-webpki = { version = "0.103.10", default-features = false }

# Time handling
chrono = { version = "0.4.44", features = ["serde"] }

Expand Down Expand Up @@ -118,7 +123,7 @@ actix-multipart = "0.7"
uuid = { version = "1.23.0", features = ["v4", "v7", "serde"] }

# NanoID generation (21-char URL-safe unique IDs)
nanoid = "0.4.0"
nanoid = "0.5.0"

# ULID generation
ulid = "1.1"
Expand Down Expand Up @@ -195,7 +200,7 @@ quote = "1.0.44"
syn = { version = "2.0.117", features = ["full", "extra-traits"] }

# Object storage abstraction (S3/GCS/Azure/local)
object_store = { version = "0.13.1" }
object_store = { version = "0.13.2" }

# Vector ANN engines
usearch = "2.24.0"
Expand Down
20 changes: 7 additions & 13 deletions backend/crates/kalamdb-api/src/http/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,8 @@ pub async fn execute_sql_v1(
// 4. Build execution context
let default_namespace = namespace_id.clone().unwrap_or_else(|| NamespaceId::new("default"));
let base_session = app_context.base_session_context();
let mut exec_ctx =
ExecutionContext::from_session(session, Arc::clone(&base_session)).with_namespace_id(
default_namespace.clone(),
);
let mut exec_ctx = ExecutionContext::from_session(session, Arc::clone(&base_session))
.with_namespace_id(default_namespace.clone());
let is_meta_leader = app_context.executor().is_leader(GroupId::Meta).await;

// 5. File uploads must go to the leader
Expand Down Expand Up @@ -188,15 +186,11 @@ pub async fn execute_sql_v1(
};

// 8. Split, parse, and classify SQL statements
let prepared_statements = match split_and_prepare_statements(
&sql,
&exec_ctx,
sql_executor.get_ref(),
start_time,
) {
Ok(stmts) => stmts,
Err(resp) => return resp,
};
let prepared_statements =
match split_and_prepare_statements(&sql, &exec_ctx, sql_executor.get_ref(), start_time) {
Ok(stmts) => stmts,
Err(resp) => return resp,
};

if exec_ctx.request_id().is_none() && batch_requires_request_id(&prepared_statements) {
exec_ctx = exec_ctx.with_request_id(Uuid::now_v7().to_string());
Expand Down
57 changes: 24 additions & 33 deletions backend/crates/kalamdb-api/src/http/sql/execution_paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use kalamdb_commons::schemas::TableType;
use kalamdb_core::app_context::AppContext;
use kalamdb_core::schema_registry::SchemaRegistry;
use kalamdb_core::sql::context::ExecutionContext;
use kalamdb_core::sql::executor::{PreparedExecutionStatement, ScalarValue, SqlExecutor};
use kalamdb_core::sql::executor::request_transaction_state::RequestTransactionState;
use kalamdb_core::sql::executor::{PreparedExecutionStatement, ScalarValue, SqlExecutor};
use kalamdb_core::sql::SqlImpersonationService;
use kalamdb_sql::classifier::SqlStatementKind;
use kalamdb_system::FileSubfolderState;
Expand Down Expand Up @@ -256,17 +256,17 @@ pub(super) async fn execute_batch_path(
let mut total_updated = 0usize;
let mut total_deleted = 0usize;
let mut params_remaining = Some(params);
let mut request_transaction_state = match RequestTransactionState::from_execution_context(exec_ctx)
{
Ok(state) => state,
Err(err) => {
return HttpResponse::BadRequest().json(SqlResponse::error(
ErrorCode::SqlExecutionError,
&err.to_string(),
took_ms(start_time),
));
},
};
let mut request_transaction_state =
match RequestTransactionState::from_execution_context(exec_ctx) {
Ok(state) => state,
Err(err) => {
return HttpResponse::BadRequest().json(SqlResponse::error(
ErrorCode::SqlExecutionError,
&err.to_string(),
took_ms(start_time),
));
},
};
if let Some(state) = request_transaction_state.as_mut() {
state.sync_from_coordinator(app_context);
}
Expand Down Expand Up @@ -294,20 +294,18 @@ pub(super) async fn execute_batch_path(
let batch_len = batch_end - idx;

if batch_len > 1 {
let batch_stmts: Vec<&PreparedExecutionStatement> =
prepared_statements[idx..batch_end]
.iter()
.map(|s| &s.prepared_statement)
.collect();
let batch_stmts: Vec<&PreparedExecutionStatement> = prepared_statements
[idx..batch_end]
.iter()
.map(|s| &s.prepared_statement)
.collect();
let batch_start = Instant::now();

match sql_executor
.try_batch_insert_in_transaction(
&batch_stmts,
exec_ctx,
transaction_id,
)
{
match sql_executor.try_batch_insert_in_transaction(
&batch_stmts,
exec_ctx,
transaction_id,
) {
Ok(Some(results)) => {
let batch_rows: usize =
results.iter().map(|r| r.affected_rows()).sum();
Expand All @@ -334,11 +332,7 @@ pub(super) async fn execute_batch_path(
return HttpResponse::BadRequest().json(
SqlResponse::error_with_details(
ErrorCode::SqlExecutionError,
&format!(
"Statement {} failed: {}",
idx + 1,
err
),
&format!("Statement {} failed: {}", idx + 1, err),
&prepared_statements[idx].prepared_statement.sql,
took_ms(start_time),
),
Expand Down Expand Up @@ -595,10 +589,7 @@ fn is_batchable_insert(stmt: &PreparedApiExecutionStatement) -> bool {
return false;
}
matches!(
stmt.prepared_statement
.classified_statement
.as_ref()
.map(|c| c.kind()),
stmt.prepared_statement.classified_statement.as_ref().map(|c| c.kind()),
Some(SqlStatementKind::Insert(_))
)
}
11 changes: 2 additions & 9 deletions backend/crates/kalamdb-api/src/http/sql/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,6 @@ pub async fn handle_not_leader_error(
);
}

forward_sql_grpc(
ForwardTarget::Leader,
http_req,
req,
app_context,
request_id,
start_time,
)
.await
forward_sql_grpc(ForwardTarget::Leader, http_req, req, app_context, request_id, start_time)
.await
}
4 changes: 1 addition & 3 deletions backend/crates/kalamdb-api/src/http/sql/helpers/converter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
//! Arrow to JSON conversion helpers

use arrow::record_batch::RecordBatch;
use kalamdb_commons::conversions::{
mask_sensitive_rows_for_role, schema_fields_from_arrow_schema,
};
use kalamdb_commons::conversions::{mask_sensitive_rows_for_role, schema_fields_from_arrow_schema};
use kalamdb_commons::models::Role;
use kalamdb_commons::models::Username;
use kalamdb_commons::schemas::SchemaField;
Expand Down
Loading
Loading