From 9d19ebfbfa1f70db9083618f753abdfe42e8ac66 Mon Sep 17 00:00:00 2001 From: Allen Cheng Date: Thu, 11 Jun 2026 19:45:27 -0700 Subject: [PATCH] feat: expose record parity APIs over REST --- Cargo.lock | 1 + crates/lance-context-api/src/lib.rs | 30 ++ crates/lance-context-client/src/lib.rs | 126 ++++++++ crates/lance-context-core/src/api_impl.rs | 50 ++- crates/lance-context-server/Cargo.toml | 3 + crates/lance-context-server/src/routes/mod.rs | 26 ++ .../src/routes/records.rs | 290 +++++++++++++++++- crates/lance-context/src/lib.rs | 4 +- crates/lance-context/src/unified.rs | 38 ++- 9 files changed, 559 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 229bb7e..a59c25c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5444,6 +5444,7 @@ dependencies = [ "lance-context-core", "serde", "serde_json", + "tempfile", "tokio", "tower-http", "tracing", diff --git a/crates/lance-context-api/src/lib.rs b/crates/lance-context-api/src/lib.rs index 89ae788..f391b7f 100644 --- a/crates/lance-context-api/src/lib.rs +++ b/crates/lance-context-api/src/lib.rs @@ -36,12 +36,36 @@ pub trait ContextStoreApi { fn get(&self, id: &str) -> impl Future>> + Send; + fn get_by_external_id( + &self, + external_id: &str, + ) -> impl Future>> + Send; + + fn delete_by_id( + &mut self, + id: &str, + ) -> impl Future> + Send; + + fn delete_by_external_id( + &mut self, + external_id: &str, + ) -> impl Future> + Send; + fn list( &self, limit: Option, offset: Option, ) -> impl Future>> + Send; + fn related( + &self, + target_id: &str, + relation: Option<&str>, + limit: Option, + include_expired: bool, + include_retired: bool, + ) -> impl Future>> + Send; + fn search( &self, query: &[f32], @@ -225,6 +249,12 @@ pub struct GetRecordResponse { pub record: Option, } +#[derive(Debug, Serialize, Deserialize)] +pub struct DeleteRecordResponse { + pub deleted: bool, + pub version: u64, +} + // --------------------------------------------------------------------------- // Search // --------------------------------------------------------------------------- diff --git a/crates/lance-context-client/src/lib.rs b/crates/lance-context-client/src/lib.rs index ce8ab59..3c52600 100644 --- a/crates/lance-context-client/src/lib.rs +++ b/crates/lance-context-client/src/lib.rs @@ -67,6 +67,38 @@ impl ContextStoreApi for RemoteContextStore { Ok(resp.record) } + async fn get_by_external_id(&self, external_id: &str) -> ContextResult> { + let resp = self + .client + .get_record_by_external_id(&self.context_name, external_id) + .await + .map_err(to_ctx_err)?; + Ok(resp.record) + } + + async fn delete_by_id(&mut self, id: &str) -> ContextResult { + let resp = self + .client + .delete_record(&self.context_name, id) + .await + .map_err(to_ctx_err)?; + self.cached_version = resp.version; + Ok(resp) + } + + async fn delete_by_external_id( + &mut self, + external_id: &str, + ) -> ContextResult { + let resp = self + .client + .delete_record_by_external_id(&self.context_name, external_id) + .await + .map_err(to_ctx_err)?; + self.cached_version = resp.version; + Ok(resp) + } + async fn list( &self, limit: Option, @@ -80,6 +112,29 @@ impl ContextStoreApi for RemoteContextStore { Ok(resp.records) } + async fn related( + &self, + target_id: &str, + relation: Option<&str>, + limit: Option, + include_expired: bool, + include_retired: bool, + ) -> ContextResult> { + let resp = self + .client + .related_records( + &self.context_name, + target_id, + relation, + limit, + include_expired, + include_retired, + ) + .await + .map_err(to_ctx_err)?; + Ok(resp.records) + } + async fn search( &self, query: &[f32], @@ -246,6 +301,47 @@ impl ContextClient { Self::handle_response(resp).await } + pub async fn get_record_by_external_id( + &self, + name: &str, + external_id: &str, + ) -> Result { + let resp = self + .http + .get(self.url(&format!("/contexts/{}/records/by-external-id", name))) + .query(&[("external_id", external_id)]) + .send() + .await?; + Self::handle_response(resp).await + } + + pub async fn delete_record( + &self, + name: &str, + id: &str, + ) -> Result { + let resp = self + .http + .delete(self.url(&format!("/contexts/{}/records/{}", name, id))) + .send() + .await?; + Self::handle_response(resp).await + } + + pub async fn delete_record_by_external_id( + &self, + name: &str, + external_id: &str, + ) -> Result { + let resp = self + .http + .delete(self.url(&format!("/contexts/{}/records", name))) + .query(&[("external_id", external_id)]) + .send() + .await?; + Self::handle_response(resp).await + } + pub async fn list_records( &self, name: &str, @@ -268,6 +364,36 @@ impl ContextClient { Self::handle_response(resp).await } + pub async fn related_records( + &self, + name: &str, + target_id: &str, + relation: Option<&str>, + limit: Option, + include_expired: bool, + include_retired: bool, + ) -> Result { + let mut request = self + .http + .get(self.url(&format!("/contexts/{}/records/related", name))) + .query(&[("target_id", target_id)]); + if let Some(relation) = relation { + request = request.query(&[("relation", relation)]); + } + if let Some(limit) = limit { + request = request.query(&[("limit", limit)]); + } + if include_expired { + request = request.query(&[("include_expired", include_expired)]); + } + if include_retired { + request = request.query(&[("include_retired", include_retired)]); + } + + let resp = request.send().await?; + Self::handle_response(resp).await + } + pub async fn search( &self, name: &str, diff --git a/crates/lance-context-core/src/api_impl.rs b/crates/lance-context-core/src/api_impl.rs index 2875494..a3896ce 100644 --- a/crates/lance-context-core/src/api_impl.rs +++ b/crates/lance-context-core/src/api_impl.rs @@ -3,8 +3,8 @@ use uuid::Uuid; use lance_context_api::{ AddRecordRequest, AddRecordsResponse, CompactRequest, CompactResponse, CompactStatsResponse, - ContextError, ContextResult, ContextStoreApi, RecordDto, RelationshipDto, RetrieveRequest, - RetrieveResultDto, SearchResultDto, StateMetadataDto, + ContextError, ContextResult, ContextStoreApi, DeleteRecordResponse, RecordDto, RelationshipDto, + RetrieveRequest, RetrieveResultDto, SearchResultDto, StateMetadataDto, }; use crate::record::{ @@ -71,6 +71,36 @@ impl ContextStoreApi for ContextStore { Ok(record.map(record_to_dto)) } + async fn get_by_external_id(&self, external_id: &str) -> ContextResult> { + let record = ContextStore::get_by_external_id(self, external_id) + .await + .map_err(to_ctx_err)?; + Ok(record.map(record_to_dto)) + } + + async fn delete_by_id(&mut self, id: &str) -> ContextResult { + let deleted = ContextStore::delete_by_id(self, id) + .await + .map_err(to_ctx_err)?; + Ok(DeleteRecordResponse { + deleted, + version: ContextStore::version(self), + }) + } + + async fn delete_by_external_id( + &mut self, + external_id: &str, + ) -> ContextResult { + let deleted = ContextStore::delete_by_external_id(self, external_id) + .await + .map_err(to_ctx_err)?; + Ok(DeleteRecordResponse { + deleted, + version: ContextStore::version(self), + }) + } + async fn list( &self, limit: Option, @@ -82,6 +112,22 @@ impl ContextStoreApi for ContextStore { Ok(records.into_iter().map(record_to_dto).collect()) } + async fn related( + &self, + target_id: &str, + relation: Option<&str>, + limit: Option, + include_expired: bool, + include_retired: bool, + ) -> ContextResult> { + let options = LifecycleQueryOptions::new(include_expired, include_retired); + let records = + ContextStore::list_related_with_options(self, target_id, relation, limit, options) + .await + .map_err(to_ctx_err)?; + Ok(records.into_iter().map(record_to_dto).collect()) + } + async fn search( &self, query: &[f32], diff --git a/crates/lance-context-server/Cargo.toml b/crates/lance-context-server/Cargo.toml index d54c7f5..15148d5 100644 --- a/crates/lance-context-server/Cargo.toml +++ b/crates/lance-context-server/Cargo.toml @@ -25,3 +25,6 @@ tower-http = { version = "0.6", features = ["cors", "trace"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } uuid = { version = "1", features = ["v4"] } + +[dev-dependencies] +tempfile = "3" diff --git a/crates/lance-context-server/src/routes/mod.rs b/crates/lance-context-server/src/routes/mod.rs index 8e353fe..a5d6596 100644 --- a/crates/lance-context-server/src/routes/mod.rs +++ b/crates/lance-context-server/src/routes/mod.rs @@ -27,10 +27,26 @@ pub fn router() -> Router> { "/api/v1/contexts/{name}/records", get(records::list_records), ) + .route( + "/api/v1/contexts/{name}/records", + delete(records::delete_record_by_external_id), + ) + .route( + "/api/v1/contexts/{name}/records/by-external-id", + get(records::get_record_by_external_id), + ) + .route( + "/api/v1/contexts/{name}/records/related", + get(records::related_records), + ) .route( "/api/v1/contexts/{name}/records/{id}", get(records::get_record), ) + .route( + "/api/v1/contexts/{name}/records/{id}", + delete(records::delete_record), + ) .route("/api/v1/contexts/{name}/search", post(search::search)) .route("/api/v1/contexts/{name}/retrieve", post(search::retrieve)) .route( @@ -44,3 +60,13 @@ pub fn router() -> Router> { get(compact::compact_stats), ) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn router_builds_with_record_parity_routes() { + let _ = router(); + } +} diff --git a/crates/lance-context-server/src/routes/records.rs b/crates/lance-context-server/src/routes/records.rs index c7a062f..a37b07a 100644 --- a/crates/lance-context-server/src/routes/records.rs +++ b/crates/lance-context-server/src/routes/records.rs @@ -4,10 +4,12 @@ use axum::extract::{Path, Query, State}; use axum::Json; use chrono::Utc; use lance_context_api::{ - AddRecordsRequest, AddRecordsResponse, GetRecordResponse, ListRecordsResponse, RecordDto, - RelationshipDto, StateMetadataDto, + AddRecordsRequest, AddRecordsResponse, DeleteRecordResponse, GetRecordResponse, + ListRecordsResponse, RecordDto, RelationshipDto, StateMetadataDto, +}; +use lance_context_core::{ + ContextRecord, LifecycleQueryOptions, Relationship, StateMetadata, LIFECYCLE_ACTIVE, }; -use lance_context_core::{ContextRecord, Relationship, StateMetadata, LIFECYCLE_ACTIVE}; use uuid::Uuid; use crate::error::AppError; @@ -109,6 +111,77 @@ pub async fn get_record( })) } +#[derive(serde::Deserialize)] +pub struct ExternalIdParams { + pub external_id: String, +} + +pub async fn get_record_by_external_id( + State(state): State>, + Path(name): Path, + Query(params): Query, +) -> Result, AppError> { + let stores = state.stores.read().await; + let store_lock = stores + .get(&name) + .ok_or_else(|| AppError::NotFound(format!("Context '{}' does not exist", name)))? + .clone(); + drop(stores); + + let store = store_lock.read().await; + let record = store + .get_by_external_id(¶ms.external_id) + .await + .map_err(AppError::from_lance)?; + + Ok(Json(GetRecordResponse { + record: record.map(record_to_dto), + })) +} + +pub async fn delete_record( + State(state): State>, + Path((name, id)): Path<(String, String)>, +) -> Result, AppError> { + let stores = state.stores.read().await; + let store_lock = stores + .get(&name) + .ok_or_else(|| AppError::NotFound(format!("Context '{}' does not exist", name)))? + .clone(); + drop(stores); + + let mut store = store_lock.write().await; + let deleted = store + .delete_by_id(&id) + .await + .map_err(AppError::from_lance)?; + let version = store.version(); + + Ok(Json(DeleteRecordResponse { deleted, version })) +} + +pub async fn delete_record_by_external_id( + State(state): State>, + Path(name): Path, + Query(params): Query, +) -> Result, AppError> { + let stores = state.stores.read().await; + let store_lock = stores + .get(&name) + .ok_or_else(|| AppError::NotFound(format!("Context '{}' does not exist", name)))? + .clone(); + drop(stores); + + let mut store = store_lock.write().await; + let deleted = store + .delete_by_external_id(¶ms.external_id) + .await + .map_err(AppError::from_lance)?; + let version = store.version(); + + Ok(Json(DeleteRecordResponse { deleted, version })) +} + #[derive(serde::Deserialize)] pub struct ListParams { pub limit: Option, @@ -138,6 +211,45 @@ pub async fn list_records( Ok(Json(ListRecordsResponse { records: dtos })) } +#[derive(serde::Deserialize)] +pub struct RelatedParams { + pub target_id: String, + pub relation: Option, + pub limit: Option, + #[serde(default)] + pub include_expired: bool, + #[serde(default)] + pub include_retired: bool, +} + +pub async fn related_records( + State(state): State>, + Path(name): Path, + Query(params): Query, +) -> Result, AppError> { + let stores = state.stores.read().await; + let store_lock = stores + .get(&name) + .ok_or_else(|| AppError::NotFound(format!("Context '{}' does not exist", name)))? + .clone(); + drop(stores); + + let store = store_lock.read().await; + let records = store + .list_related_with_options( + ¶ms.target_id, + params.relation.as_deref(), + params.limit, + LifecycleQueryOptions::new(params.include_expired, params.include_retired), + ) + .await + .map_err(AppError::from_lance)?; + + let dtos: Vec = records.into_iter().map(record_to_dto).collect(); + + Ok(Json(ListRecordsResponse { records: dtos })) +} + pub fn record_to_dto(r: ContextRecord) -> RecordDto { RecordDto { id: r.id, @@ -188,3 +300,175 @@ fn relationship_to_dto(r: Relationship) -> RelationshipDto { weight: r.weight, } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use axum::extract::{Path, Query, State}; + use axum::Json; + use lance_context_api::{AddRecordRequest, AddRecordsRequest}; + use lance_context_core::ContextStore; + use tempfile::TempDir; + use tokio::sync::RwLock; + + use super::*; + use crate::state::AppState; + + async fn test_state(context_name: &str) -> (Arc, TempDir) { + let dir = TempDir::new().unwrap(); + let uri = dir + .path() + .join(format!("{context_name}.lance")) + .to_string_lossy() + .to_string(); + let store = ContextStore::open(&uri).await.unwrap(); + let mut stores = HashMap::new(); + stores.insert(context_name.to_string(), Arc::new(RwLock::new(store))); + let state = Arc::new(AppState { + stores: RwLock::new(stores), + base_path: dir.path().to_path_buf(), + }); + (state, dir) + } + + fn text_record(text: &str) -> AddRecordRequest { + AddRecordRequest { + role: "user".to_string(), + content_type: "text/plain".to_string(), + text_payload: Some(text.to_string()), + ..Default::default() + } + } + + #[tokio::test] + async fn get_and_delete_by_external_id() { + let context_name = "ctx"; + let (state, _dir) = test_state(context_name).await; + let external_id = "s3://bucket/path/doc.md#chunk?index=1"; + let mut record = text_record("stable source chunk"); + record.external_id = Some(external_id.to_string()); + + let (_, Json(add_response)) = add_records( + State(state.clone()), + Path(context_name.to_string()), + Json(AddRecordsRequest { + records: vec![record], + }), + ) + .await + .unwrap(); + + let Json(get_response) = get_record_by_external_id( + State(state.clone()), + Path(context_name.to_string()), + Query(ExternalIdParams { + external_id: external_id.to_string(), + }), + ) + .await + .unwrap(); + assert_eq!( + get_response.record.unwrap().text_payload.as_deref(), + Some("stable source chunk") + ); + + let Json(delete_response) = delete_record_by_external_id( + State(state.clone()), + Path(context_name.to_string()), + Query(ExternalIdParams { + external_id: external_id.to_string(), + }), + ) + .await + .unwrap(); + assert!(delete_response.deleted); + assert!(delete_response.version >= add_response.version); + + let Json(missing_response) = get_record_by_external_id( + State(state), + Path(context_name.to_string()), + Query(ExternalIdParams { + external_id: external_id.to_string(), + }), + ) + .await + .unwrap(); + assert!(missing_response.record.is_none()); + } + + #[tokio::test] + async fn delete_by_internal_id_returns_false_when_already_absent() { + let context_name = "ctx"; + let (state, _dir) = test_state(context_name).await; + + let (_, Json(add_response)) = add_records( + State(state.clone()), + Path(context_name.to_string()), + Json(AddRecordsRequest { + records: vec![text_record("temporary note")], + }), + ) + .await + .unwrap(); + let id = add_response.ids[0].clone(); + + let Json(delete_response) = delete_record( + State(state.clone()), + Path((context_name.to_string(), id.clone())), + ) + .await + .unwrap(); + assert!(delete_response.deleted); + + let Json(second_response) = + delete_record(State(state), Path((context_name.to_string(), id))) + .await + .unwrap(); + assert!(!second_response.deleted); + } + + #[tokio::test] + async fn related_records_filters_by_target_and_relation() { + let context_name = "ctx"; + let (state, _dir) = test_state(context_name).await; + let mut related = text_record("record that cites the runbook"); + related.relationships = vec![RelationshipDto { + target_id: "doc://runbook#chunk-1".to_string(), + relation: "cites".to_string(), + weight: Some(0.75), + }]; + + let _ = add_records( + State(state.clone()), + Path(context_name.to_string()), + Json(AddRecordsRequest { + records: vec![related, text_record("unrelated record")], + }), + ) + .await + .unwrap(); + + let Json(response) = related_records( + State(state), + Path(context_name.to_string()), + Query(RelatedParams { + target_id: "doc://runbook#chunk-1".to_string(), + relation: Some("cites".to_string()), + limit: Some(10), + include_expired: false, + include_retired: false, + }), + ) + .await + .unwrap(); + + assert_eq!(response.records.len(), 1); + assert_eq!( + response.records[0].text_payload.as_deref(), + Some("record that cites the runbook") + ); + assert_eq!(response.records[0].relationships.len(), 1); + } +} diff --git a/crates/lance-context/src/lib.rs b/crates/lance-context/src/lib.rs index 44f9ed4..00fc48f 100644 --- a/crates/lance-context/src/lib.rs +++ b/crates/lance-context/src/lib.rs @@ -11,8 +11,8 @@ pub use lance_context_core::{ pub use lance_context_api::{ AddRecordRequest, AddRecordsResponse, CompactRequest, CompactResponse, CompactStatsResponse, - ContextError, ContextResult, ContextStoreApi, RecordDto, RelationshipDto, RetrieveRequest, - RetrieveResponse, RetrieveResultDto, SearchResultDto, + ContextError, ContextResult, ContextStoreApi, DeleteRecordResponse, RecordDto, RelationshipDto, + RetrieveRequest, RetrieveResponse, RetrieveResultDto, SearchResultDto, }; #[cfg(feature = "remote")] diff --git a/crates/lance-context/src/unified.rs b/crates/lance-context/src/unified.rs index 95269bf..9751f50 100644 --- a/crates/lance-context/src/unified.rs +++ b/crates/lance-context/src/unified.rs @@ -2,8 +2,8 @@ use std::collections::HashSet; use lance_context_api::{ AddRecordRequest, AddRecordsResponse, CompactRequest, CompactResponse, CompactStatsResponse, - ContextError, ContextResult, ContextStoreApi, RecordDto, RetrieveRequest, RetrieveResultDto, - SearchResultDto, + ContextError, ContextResult, ContextStoreApi, DeleteRecordResponse, RecordDto, RetrieveRequest, + RetrieveResultDto, SearchResultDto, }; use lance_context_core::{ContextStore as LocalStore, ContextStoreOptions, IdIndexType}; @@ -114,6 +114,21 @@ impl ContextStoreApi for ContextStore { dispatch_ref!(self, get, id) } + async fn get_by_external_id(&self, external_id: &str) -> ContextResult> { + dispatch_ref!(self, get_by_external_id, external_id) + } + + async fn delete_by_id(&mut self, id: &str) -> ContextResult { + dispatch_mut!(self, delete_by_id, id) + } + + async fn delete_by_external_id( + &mut self, + external_id: &str, + ) -> ContextResult { + dispatch_mut!(self, delete_by_external_id, external_id) + } + async fn list( &self, limit: Option, @@ -122,6 +137,25 @@ impl ContextStoreApi for ContextStore { dispatch_ref!(self, list, limit, offset) } + async fn related( + &self, + target_id: &str, + relation: Option<&str>, + limit: Option, + include_expired: bool, + include_retired: bool, + ) -> ContextResult> { + dispatch_ref!( + self, + related, + target_id, + relation, + limit, + include_expired, + include_retired + ) + } + async fn search( &self, query: &[f32],