From 8ce7c1ac01b94ee0a9aa1a36b427238a66739621 Mon Sep 17 00:00:00 2001 From: Allen Cheng Date: Tue, 9 Jun 2026 11:41:09 -0700 Subject: [PATCH 1/4] feat: add lifecycle retention filters --- README.md | 31 +++ crates/lance-context-core/src/lib.rs | 5 +- crates/lance-context-core/src/record.rs | 74 +++++++ crates/lance-context-core/src/store.rs | 263 ++++++++++++++++++++++-- python/python/lance_context/api.py | 85 +++++++- python/src/lib.rs | 208 ++++++++++++++----- python/tests/test_persistence.py | 87 +++++++- python/tests/test_search.py | 184 ++++++++++++++++- 8 files changed, 852 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index b2ae421..36a378f 100644 --- a/README.md +++ b/README.md @@ -11,12 +11,14 @@ Key motivations inspired by the broader Lance roadmap[1](https://github.com - **Multimodal first** – store text, images, and structured data together, keeping the original bytes plus typed metadata. - **Version aware** – each append creates an immutable snapshot, enabling time-travel, branching, and auditability for long-running agents. - **Searchable semantics** – embeddings are managed alongside content so you can run Lance vector search without leaving the dataset. +- **Lifecycle aware** – records can carry TTL, retention, retirement, and supersession metadata, with expired/retired rows hidden from default recall. - **Columnar performance** – backed by the Lance file format, giving fast analytics, compaction, and cloud-friendly storage. ## Features - Unified schema for agent messages (`ContextRecord`) with optional embeddings and metadata. - Automatic versioning via Lance manifests with `checkout(version)` support. +- Query-time lifecycle filtering for expired, retired, superseded, and revoked memories. - Background compaction to optimize storage and read performance. - Remote persistence on any `object_store` backend (S3, GCS, Azure Blob, ...) via the generic `storage_options` dict, aligned with `lance` and `lance-graph`. @@ -83,6 +85,26 @@ ctx.add_many([ }, ]) +# Lifecycle metadata is stored in first-class columns. Expired and retired rows +# are excluded from default list/search calls, while contradicted records stay +# visible so negative knowledge can remain a guardrail for future recall. +ctx.add( + "assistant", + "temporary session note", + expires_at="2026-07-01T00:00:00Z", + retention_policy="session", +) +ctx.add( + "system", + "older fact kept as history", + lifecycle_status="superseded", + retired_reason="replaced by newer fact", + superseded_by_id="new-record-id", +) + +visible = ctx.list() +with_lifecycle_history = ctx.list(include_expired=True, include_retired=True) + # Time-travel to prior state first_version = ctx.version() ctx.add("assistant", "Let me fetch suggestions…") @@ -161,6 +183,8 @@ let record = ContextRecord { id: "run-1-1".into(), external_id: None, run_id: "run-1".into(), + bot_id: None, + session_id: None, created_at: Utc::now(), role: "user".into(), state_metadata: Some(StateMetadata { @@ -169,6 +193,13 @@ let record = ContextRecord { tokens_used: None, custom: None, }), + expires_at: None, + retention_policy: None, + lifecycle_status: "active".into(), + retired_at: None, + retired_reason: None, + supersedes_id: None, + superseded_by_id: None, content_type: "text/plain".into(), text_payload: Some("hello world".into()), binary_payload: None, diff --git a/crates/lance-context-core/src/lib.rs b/crates/lance-context-core/src/lib.rs index 08b1edf..f2f8148 100644 --- a/crates/lance-context-core/src/lib.rs +++ b/crates/lance-context-core/src/lib.rs @@ -6,7 +6,10 @@ pub mod serde; mod store; pub use context::{Context, ContextEntry, Snapshot}; -pub use record::{ContextRecord, SearchResult, StateMetadata}; +pub use record::{ + ContextRecord, LifecycleQueryOptions, SearchResult, StateMetadata, LIFECYCLE_ACTIVE, + LIFECYCLE_CONTRADICTED, +}; pub use store::{ CompactionConfig, CompactionStats, ContextStore, ContextStoreOptions, IdIndexType, }; diff --git a/crates/lance-context-core/src/record.rs b/crates/lance-context-core/src/record.rs index 95cf27b..aba0fbb 100644 --- a/crates/lance-context-core/src/record.rs +++ b/crates/lance-context-core/src/record.rs @@ -1,5 +1,8 @@ use chrono::{DateTime, Utc}; +pub const LIFECYCLE_ACTIVE: &str = "active"; +pub const LIFECYCLE_CONTRADICTED: &str = "contradicted"; + /// Structured metadata captured alongside each context entry. #[derive(Debug, Clone, Default)] pub struct StateMetadata { @@ -20,12 +23,83 @@ pub struct ContextRecord { pub created_at: DateTime, pub role: String, pub state_metadata: Option, + pub expires_at: Option>, + pub retention_policy: Option, + pub lifecycle_status: String, + pub retired_at: Option>, + pub retired_reason: Option, + pub supersedes_id: Option, + pub superseded_by_id: Option, pub content_type: String, pub text_payload: Option, pub binary_payload: Option>, pub embedding: Option>, } +impl ContextRecord { + #[must_use] + pub fn is_expired_at(&self, now: DateTime) -> bool { + self.expires_at.is_some_and(|expires_at| expires_at <= now) + } + + #[must_use] + pub fn is_hidden_by_lifecycle(&self) -> bool { + if self.lifecycle_status == LIFECYCLE_ACTIVE + || self.lifecycle_status == LIFECYCLE_CONTRADICTED + { + return self.retired_at.is_some() || self.superseded_by_id.is_some(); + } + + true + } + + #[must_use] + pub fn has_non_default_lifecycle(&self) -> bool { + self.expires_at.is_some() + || self.retention_policy.is_some() + || self.lifecycle_status != LIFECYCLE_ACTIVE + || self.retired_at.is_some() + || self.retired_reason.is_some() + || self.supersedes_id.is_some() + || self.superseded_by_id.is_some() + } +} + +/// Query-time controls for lifecycle-aware retrieval. +#[derive(Debug, Clone)] +pub struct LifecycleQueryOptions { + pub include_expired: bool, + pub include_retired: bool, + pub reference_time: DateTime, +} + +impl Default for LifecycleQueryOptions { + fn default() -> Self { + Self { + include_expired: false, + include_retired: false, + reference_time: Utc::now(), + } + } +} + +impl LifecycleQueryOptions { + #[must_use] + pub fn new(include_expired: bool, include_retired: bool) -> Self { + Self { + include_expired, + include_retired, + ..Self::default() + } + } + + #[must_use] + pub fn is_visible(&self, record: &ContextRecord) -> bool { + (self.include_expired || !record.is_expired_at(self.reference_time)) + && (self.include_retired || !record.is_hidden_by_lifecycle()) + } +} + /// Result returned from a vector similarity search. #[derive(Debug, Clone)] pub struct SearchResult { diff --git a/crates/lance-context-core/src/store.rs b/crates/lance-context-core/src/store.rs index 5f1f6a1..45f0437 100644 --- a/crates/lance-context-core/src/store.rs +++ b/crates/lance-context-core/src/store.rs @@ -31,7 +31,9 @@ use tokio::task::JoinHandle; use tracing::{error, info, warn}; use uuid::Uuid; -use crate::record::{ContextRecord, SearchResult, StateMetadata}; +use crate::record::{ + ContextRecord, LifecycleQueryOptions, SearchResult, StateMetadata, LIFECYCLE_ACTIVE, +}; /// Embedding length used for the semantic index column. const DEFAULT_EMBEDDING_DIM: i32 = 1536; @@ -274,7 +276,10 @@ impl ContextStore { } } - for record in self.list(None, None).await? { + for record in self + .list_with_options(None, None, LifecycleQueryOptions::new(true, true)) + .await? + { if ids.contains(record.id.as_str()) { return Err(ArrowError::InvalidArgumentError(format!( "id '{}' already exists", @@ -328,12 +333,27 @@ impl ContextStore { &self, limit: Option, offset: Option, + ) -> LanceResult> { + self.list_with_options(limit, offset, LifecycleQueryOptions::default()) + .await + } + + /// List records, applying lifecycle visibility before offset/limit. + pub async fn list_with_options( + &self, + limit: Option, + offset: Option, + options: LifecycleQueryOptions, ) -> LanceResult> { let scanner = self.lsm_scanner().await?; let mut stream = scanner.try_into_stream().await?; let mut results = Vec::new(); while let Some(batch) = stream.try_next().await? { - results.extend(batch_to_records(&batch)?); + results.extend( + batch_to_records(&batch)? + .into_iter() + .filter(|record| options.is_visible(record)), + ); } if let Some(offset) = offset { @@ -371,6 +391,17 @@ impl ContextStore { &self, query: &[f32], limit: Option, + ) -> LanceResult> { + self.search_with_options(query, limit, LifecycleQueryOptions::default()) + .await + } + + /// Perform nearest-neighbor search after applying lifecycle visibility. + pub async fn search_with_options( + &self, + query: &[f32], + limit: Option, + options: LifecycleQueryOptions, ) -> LanceResult> { if query.len() != DEFAULT_EMBEDDING_DIM as usize { return Err(ArrowError::InvalidArgumentError(format!( @@ -387,7 +418,7 @@ impl ContextStore { } let mut results: Vec = self - .list(None, None) + .list_with_options(None, None, options) .await? .into_iter() .filter_map(|record| { @@ -650,10 +681,14 @@ impl ContextStore { /// Lance V1 blob encoding (out-of-line binary buffers). For `text_payload`, /// this also changes the Arrow type from `LargeUtf8` to `LargeBinary`. pub fn schema(blob_columns: &HashSet) -> Schema { - Self::schema_with_options(blob_columns, true) + Self::schema_with_options(blob_columns, true, true) } - fn schema_with_options(blob_columns: &HashSet, include_external_id: bool) -> Schema { + fn schema_with_options( + blob_columns: &HashSet, + include_external_id: bool, + include_lifecycle: bool, + ) -> Schema { let mut id_metadata = HashMap::new(); id_metadata.insert( "lance-schema:unenforced-primary-key".to_string(), @@ -707,6 +742,27 @@ impl ContextStore { ), true, ), + ]); + if include_lifecycle { + fields.extend([ + Field::new( + "expires_at", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + Field::new("retention_policy", DataType::Utf8, true), + Field::new("lifecycle_status", DataType::Utf8, false), + Field::new( + "retired_at", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + Field::new("retired_reason", DataType::Utf8, true), + Field::new("supersedes_id", DataType::Utf8, true), + Field::new("superseded_by_id", DataType::Utf8, true), + ]); + } + fields.extend([ Field::new("content_type", DataType::Utf8, false), text_field, binary_field, @@ -774,6 +830,12 @@ impl ContextStore { .field_paths() .iter() .any(|path| path == "external_id"); + let include_lifecycle = self + .dataset + .schema() + .field_paths() + .iter() + .any(|path| path == "expires_at"); if !include_external_id && entries.iter().any(|entry| entry.external_id.is_some()) { return Err(ArrowError::InvalidArgumentError( "external_id requires a context dataset created with external_id support" @@ -781,6 +843,13 @@ impl ContextStore { ) .into()); } + if !include_lifecycle && entries.iter().any(ContextRecord::has_non_default_lifecycle) { + return Err(ArrowError::InvalidArgumentError( + "lifecycle fields require a context dataset created with lifecycle support" + .to_string(), + ) + .into()); + } let mut id_builder = StringBuilder::new(); let mut external_id_builder = StringBuilder::new(); @@ -789,6 +858,13 @@ impl ContextStore { let mut session_id_builder = StringBuilder::new(); let mut created_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len()); let mut role_builder = StringDictionaryBuilder::::new(); + let mut expires_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len()); + let mut retention_policy_builder = StringBuilder::new(); + let mut lifecycle_status_builder = StringBuilder::new(); + let mut retired_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len()); + let mut retired_reason_builder = StringBuilder::new(); + let mut supersedes_id_builder = StringBuilder::new(); + let mut superseded_by_id_builder = StringBuilder::new(); let mut content_type_builder = StringBuilder::new(); let mut binary_builder = LargeBinaryBuilder::new(); @@ -831,6 +907,15 @@ impl ContextStore { session_id_builder.append_option(entry.session_id.as_deref()); created_at_builder.append_value(entry.created_at.timestamp_micros()); role_builder.append(&entry.role)?; + expires_at_builder + .append_option(entry.expires_at.map(|value| value.timestamp_micros())); + retention_policy_builder.append_option(entry.retention_policy.as_deref()); + lifecycle_status_builder.append_value(&entry.lifecycle_status); + retired_at_builder + .append_option(entry.retired_at.map(|value| value.timestamp_micros())); + retired_reason_builder.append_option(entry.retired_reason.as_deref()); + supersedes_id_builder.append_option(entry.supersedes_id.as_deref()); + superseded_by_id_builder.append_option(entry.superseded_by_id.as_deref()); content_type_builder.append_value(&entry.content_type); if text_is_blob { @@ -924,6 +1009,13 @@ impl ContextStore { let session_id_array: ArrayRef = Arc::new(session_id_builder.finish()); let created_at_array: ArrayRef = Arc::new(created_at_builder.finish()); let role_array: ArrayRef = Arc::new(role_builder.finish()); + let expires_at_array: ArrayRef = Arc::new(expires_at_builder.finish()); + let retention_policy_array: ArrayRef = Arc::new(retention_policy_builder.finish()); + let lifecycle_status_array: ArrayRef = Arc::new(lifecycle_status_builder.finish()); + let retired_at_array: ArrayRef = Arc::new(retired_at_builder.finish()); + let retired_reason_array: ArrayRef = Arc::new(retired_reason_builder.finish()); + let supersedes_id_array: ArrayRef = Arc::new(supersedes_id_builder.finish()); + let superseded_by_id_array: ArrayRef = Arc::new(superseded_by_id_builder.finish()); let content_type_array: ArrayRef = Arc::new(content_type_builder.finish()); let text_array: ArrayRef = if text_is_blob { Arc::new(text_binary_builder.unwrap().finish()) @@ -937,6 +1029,7 @@ impl ContextStore { let schema = Arc::new(Self::schema_with_options( &self.blob_columns, include_external_id, + include_lifecycle, )); let mut arrays = vec![id_array]; if include_external_id { @@ -949,6 +1042,19 @@ impl ContextStore { created_at_array, role_array, state_array, + ]); + if include_lifecycle { + arrays.extend([ + expires_at_array, + retention_policy_array, + lifecycle_status_array, + retired_at_array, + retired_reason_array, + supersedes_id_array, + superseded_by_id_array, + ]); + } + arrays.extend([ content_type_array, text_array, binary_array, @@ -981,6 +1087,13 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { let created_at_array = column_as::(batch, "created_at")?; let role_array = column_as::>(batch, "role")?; let state_array = column_as::(batch, "state_metadata")?; + let expires_at_array = column_as_optional::(batch, "expires_at"); + let retention_policy_array = column_as_optional::(batch, "retention_policy"); + let lifecycle_status_array = column_as_optional::(batch, "lifecycle_status"); + let retired_at_array = column_as_optional::(batch, "retired_at"); + let retired_reason_array = column_as_optional::(batch, "retired_reason"); + let supersedes_id_array = column_as_optional::(batch, "supersedes_id"); + let superseded_by_id_array = column_as_optional::(batch, "superseded_by_id"); let content_type_array = column_as::(batch, "content_type")?; let binary_array = column_as::(batch, "binary_payload")?; let embedding_array = column_as::(batch, "embedding")?; @@ -1045,13 +1158,7 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { let mut results = Vec::with_capacity(batch.num_rows()); for row in 0..batch.num_rows() { - let created_at = - DateTime::from_timestamp_micros(created_at_array.value(row)).ok_or_else(|| { - LanceError::from(ArrowError::InvalidArgumentError(format!( - "invalid timestamp value {}", - created_at_array.value(row) - ))) - })?; + let created_at = timestamp_from_micros(created_at_array.value(row), "created_at")?; let state_metadata = if state_array.is_null(row) { None @@ -1142,6 +1249,15 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { } }); + let expires_at = optional_timestamp_from_array(expires_at_array, row, "expires_at")?; + let retention_policy = optional_string_from_array(retention_policy_array, row); + let lifecycle_status = optional_string_from_array(lifecycle_status_array, row) + .unwrap_or_else(|| LIFECYCLE_ACTIVE.to_string()); + let retired_at = optional_timestamp_from_array(retired_at_array, row, "retired_at")?; + let retired_reason = optional_string_from_array(retired_reason_array, row); + let supersedes_id = optional_string_from_array(supersedes_id_array, row); + let superseded_by_id = optional_string_from_array(superseded_by_id_array, row); + results.push(ContextRecord { id: id_array.value(row).to_string(), external_id: external_id_array.and_then(|arr| { @@ -1157,6 +1273,13 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { created_at, role, state_metadata, + expires_at, + retention_policy, + lifecycle_status, + retired_at, + retired_reason, + supersedes_id, + superseded_by_id, content_type: content_type_array.value(row).to_string(), text_payload, binary_payload, @@ -1185,6 +1308,39 @@ fn embedding_from_list(list: &FixedSizeListArray, row: usize) -> LanceResult LanceResult> { + DateTime::from_timestamp_micros(value).ok_or_else(|| { + LanceError::from(ArrowError::InvalidArgumentError(format!( + "invalid timestamp value {value} in column '{column}'" + ))) + }) +} + +fn optional_timestamp_from_array( + array: Option<&TimestampMicrosecondArray>, + row: usize, + column: &str, +) -> LanceResult>> { + let Some(array) = array else { + return Ok(None); + }; + if array.is_null(row) { + Ok(None) + } else { + timestamp_from_micros(array.value(row), column).map(Some) + } +} + +fn optional_string_from_array(array: Option<&StringArray>, row: usize) -> Option { + array.and_then(|arr| { + if arr.is_null(row) { + None + } else { + Some(arr.value(row).to_string()) + } + }) +} + fn l2_distance(left: &[f32], right: &[f32]) -> f32 { left.iter() .zip(right) @@ -1225,7 +1381,7 @@ where mod tests { use super::*; use crate::serde::CONTENT_TYPE_TEXT; - use chrono::Utc; + use chrono::{Duration as ChronoDuration, Utc}; use tempfile::TempDir; fn make_embedding(pivot: f32) -> Vec { @@ -1251,6 +1407,13 @@ mod tests { tokens_used: Some(10), custom: None, }), + expires_at: None, + retention_policy: None, + lifecycle_status: LIFECYCLE_ACTIVE.to_string(), + retired_at: None, + retired_reason: None, + supersedes_id: None, + superseded_by_id: None, content_type: CONTENT_TYPE_TEXT.to_string(), text_payload: Some(format!("payload-{id}")), binary_payload: None, @@ -1298,6 +1461,78 @@ mod tests { }); } + #[test] + fn list_hides_expired_and_retired_records_by_default() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut store = ContextStore::open(&uri).await.unwrap(); + let active = text_record("active", 0.0); + let mut expired = text_record("expired", 0.0); + expired.expires_at = Some(Utc::now() - ChronoDuration::minutes(1)); + let mut superseded = text_record("superseded", 0.0); + superseded.lifecycle_status = "superseded".to_string(); + superseded.retired_reason = Some("replaced by newer fact".to_string()); + superseded.superseded_by_id = Some("active".to_string()); + + store + .add(&[active.clone(), expired.clone(), superseded.clone()]) + .await + .unwrap(); + + let visible = store.list(None, None).await.unwrap(); + assert_eq!(visible.len(), 1); + assert_eq!(visible[0].id, active.id); + + let all = store + .list_with_options(None, None, LifecycleQueryOptions::new(true, true)) + .await + .unwrap(); + assert_eq!(all.len(), 3); + let expired_roundtrip = all.iter().find(|record| record.id == expired.id).unwrap(); + assert_eq!(expired_roundtrip.expires_at, expired.expires_at); + let superseded_roundtrip = all + .iter() + .find(|record| record.id == superseded.id) + .unwrap(); + assert_eq!(superseded_roundtrip.lifecycle_status, "superseded"); + assert_eq!( + superseded_roundtrip.superseded_by_id.as_deref(), + Some("active") + ); + }); + } + + #[test] + fn search_filters_lifecycle_before_ranking() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut store = ContextStore::open(&uri).await.unwrap(); + let active = text_record("active", 1.0); + let mut expired_better_match = text_record("expired", 0.0); + expired_better_match.expires_at = Some(Utc::now() - ChronoDuration::minutes(1)); + store + .add(&[active.clone(), expired_better_match.clone()]) + .await + .unwrap(); + + let query = make_embedding(0.0); + let visible = store.search(&query, Some(1)).await.unwrap(); + assert_eq!(visible.len(), 1); + assert_eq!(visible[0].record.id, active.id); + + let all = store + .search_with_options(&query, Some(1), LifecycleQueryOptions::new(true, false)) + .await + .unwrap(); + assert_eq!(all.len(), 1); + assert_eq!(all[0].record.id, expired_better_match.id); + }); + } + #[test] fn external_id_roundtrips_and_supports_lookup() { let dir = TempDir::new().unwrap(); diff --git a/python/python/lance_context/api.py b/python/python/lance_context/api.py index 7a01d01..5e3523c 100644 --- a/python/python/lance_context/api.py +++ b/python/python/lance_context/api.py @@ -109,11 +109,26 @@ def _coerce_vector(query: Any) -> list[float]: raise TypeError("search query must be a sequence of floats") +def _coerce_timestamp(value: datetime | str | None, *, field_name: str) -> str | None: + if value is None: + return None + if isinstance(value, datetime): + if value.tzinfo is None: + raise ValueError(f"{field_name} must include timezone information") + return value.isoformat().replace("+00:00", "Z") + if isinstance(value, str): + return value + raise TypeError(f"{field_name} must be a datetime, RFC3339 string, or None") + + +def _normalize_timestamp(value: Any) -> Any: + if isinstance(value, str): + return datetime.fromisoformat(value.replace("Z", "+00:00")) + return value + + def _normalize_record(raw: dict[str, Any]) -> dict[str, Any]: """Normalize a raw record dict from the Rust layer.""" - created_at = raw.get("created_at") - if isinstance(created_at, str): - created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00")) return { "id": raw.get("id"), "external_id": raw.get("external_id"), @@ -125,8 +140,15 @@ def _normalize_record(raw: dict[str, Any]) -> dict[str, Any]: "text": raw.get("text_payload"), "binary": raw.get("binary_payload"), "embedding": raw.get("embedding"), - "created_at": created_at, + "created_at": _normalize_timestamp(raw.get("created_at")), "state_metadata": raw.get("state_metadata"), + "expires_at": _normalize_timestamp(raw.get("expires_at")), + "retention_policy": raw.get("retention_policy"), + "lifecycle_status": raw.get("lifecycle_status"), + "retired_at": _normalize_timestamp(raw.get("retired_at")), + "retired_reason": raw.get("retired_reason"), + "supersedes_id": raw.get("supersedes_id"), + "superseded_by_id": raw.get("superseded_by_id"), } @@ -353,6 +375,13 @@ def add( bot_id: str | None = None, session_id: str | None = None, external_id: str | None = None, + expires_at: datetime | str | None = None, + retention_policy: str | None = None, + lifecycle_status: str | None = None, + retired_at: datetime | str | None = None, + retired_reason: str | None = None, + supersedes_id: str | None = None, + superseded_by_id: str | None = None, ) -> None: if content_type is not None and data_type is not None: raise ValueError("Specify only one of content_type or data_type") @@ -367,6 +396,13 @@ def add( bot_id, session_id, external_id, + _coerce_timestamp(expires_at, field_name="expires_at"), + retention_policy, + lifecycle_status, + _coerce_timestamp(retired_at, field_name="retired_at"), + retired_reason, + supersedes_id, + superseded_by_id, ) def add_many(self, records: Iterable[Mapping[str, Any]]) -> None: @@ -374,7 +410,8 @@ def add_many(self, records: Iterable[Mapping[str, Any]]) -> None: Each record accepts the same fields as :meth:`add`: ``role``, ``content``, optional ``content_type``/``data_type``, ``embedding``, - ``bot_id``, ``session_id``, and ``external_id``. + ``bot_id``, ``session_id``, ``external_id``, and lifecycle fields such + as ``expires_at`` and ``lifecycle_status``. """ normalized: list[dict[str, Any]] = [] for index, record in enumerate(records): @@ -404,6 +441,19 @@ def add_many(self, records: Iterable[Mapping[str, Any]]) -> None: "bot_id": record.get("bot_id"), "session_id": record.get("session_id"), "external_id": record.get("external_id"), + "expires_at": _coerce_timestamp( + record.get("expires_at"), + field_name=f"records[{index}].expires_at", + ), + "retention_policy": record.get("retention_policy"), + "lifecycle_status": record.get("lifecycle_status"), + "retired_at": _coerce_timestamp( + record.get("retired_at"), + field_name=f"records[{index}].retired_at", + ), + "retired_reason": record.get("retired_reason"), + "supersedes_id": record.get("supersedes_id"), + "superseded_by_id": record.get("superseded_by_id"), } ) @@ -419,25 +469,40 @@ def fork(self, branch_name: str) -> Context: def checkout(self, version_id: int | str) -> None: self._inner.checkout(int(version_id)) - def search(self, query: Any, limit: int | None = None) -> list[dict[str, Any]]: + def search( + self, + query: Any, + limit: int | None = None, + *, + include_expired: bool = False, + include_retired: bool = False, + ) -> list[dict[str, Any]]: vector = _coerce_vector(query) - results = self._inner.search(vector, limit) + results = self._inner.search(vector, limit, include_expired, include_retired) return [_normalize_search_hit(item) for item in results] def list( - self, limit: int | None = None, offset: int | None = None + self, + limit: int | None = None, + offset: int | None = None, + *, + include_expired: bool = False, + include_retired: bool = False, ) -> list[dict[str, Any]]: """Return stored entries. Args: limit: Maximum number of entries to return. If None, returns all. offset: Number of entries to skip before returning results. + include_expired: Include records whose ``expires_at`` is in the past. + include_retired: Include retired/superseded/revoked records. Returns: List of entry dicts with keys: id, run_id, role, content_type, - text, binary, embedding, created_at, state_metadata. + text, binary, embedding, created_at, state_metadata, and lifecycle + metadata. """ - results = self._inner.list(limit, offset) + results = self._inner.list(limit, offset, include_expired, include_retired) return [_normalize_record(item) for item in results] def get( diff --git a/python/src/lib.rs b/python/src/lib.rs index 144c4bc..25d313f 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use chrono::{SecondsFormat, Utc}; +use chrono::{DateTime, SecondsFormat, Utc}; use pyo3::exceptions::{PyRuntimeError, PyTypeError}; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyDict, PyType}; @@ -11,7 +11,8 @@ use tokio::runtime::Runtime; use lance_context::serde::CONTENT_TYPE_TEXT; use lance_context::{ CompactionConfig, CompactionMetrics, CompactionStats, Context as RustContext, ContextRecord, - ContextStore, ContextStoreOptions, IdIndexType, SearchResult, + ContextStore, ContextStoreOptions, IdIndexType, LifecycleQueryOptions, SearchResult, + LIFECYCLE_ACTIVE, }; const DEFAULT_BINARY_CONTENT_TYPE: &str = "application/octet-stream"; @@ -24,6 +25,27 @@ struct PreparedRecord { data_type: Option, } +struct RecordOptions { + data_type: Option, + embedding: Option>, + bot_id: Option, + session_id: Option, + external_id: Option, + lifecycle: LifecycleFields, + offset: u64, +} + +#[derive(Default)] +struct LifecycleFields { + expires_at: Option>, + retention_policy: Option, + lifecycle_status: Option, + retired_at: Option>, + retired_reason: Option, + supersedes_id: Option, + superseded_by_id: Option, +} + #[pyfunction] fn version() -> &'static str { env!("CARGO_PKG_VERSION") @@ -179,7 +201,7 @@ impl Context { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, external_id = None))] + #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, external_id = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, supersedes_id = None, superseded_by_id = None))] fn add( &mut self, py: Python<'_>, @@ -190,16 +212,35 @@ impl Context { bot_id: Option, session_id: Option, external_id: Option, + expires_at: Option, + retention_policy: Option, + lifecycle_status: Option, + retired_at: Option, + retired_reason: Option, + supersedes_id: Option, + superseded_by_id: Option, ) -> PyResult<()> { + let lifecycle = LifecycleFields { + expires_at: parse_optional_datetime(expires_at, "expires_at")?, + retention_policy, + lifecycle_status, + retired_at: parse_optional_datetime(retired_at, "retired_at")?, + retired_reason, + supersedes_id, + superseded_by_id, + }; let prepared = self.prepare_record( role.to_string(), content, - data_type.map(str::to_string), - embedding, - bot_id, - session_id, - external_id, - 1, + RecordOptions { + data_type: data_type.map(str::to_string), + embedding, + bot_id, + session_id, + external_id, + lifecycle, + offset: 1, + }, )?; let add_res = py.allow_threads(|| { @@ -220,9 +261,9 @@ impl Context { let mut prepared = Vec::new(); for (index, item) in records.try_iter()?.enumerate() { let item = item?; - let dict = item.downcast::().map_err(|_| { - PyTypeError::new_err(format!("records[{index}] must be a dict")) - })?; + let dict = item + .downcast::() + .map_err(|_| PyTypeError::new_err(format!("records[{index}] must be a dict")))?; prepared.push(self.prepare_record_from_dict(dict, index)?); } @@ -232,8 +273,7 @@ impl Context { let context_records: Vec = prepared.iter().map(|item| item.record.clone()).collect(); - let add_res = - py.allow_threads(|| self.runtime.block_on(self.store.add(&context_records))); + let add_res = py.allow_threads(|| self.runtime.block_on(self.store.add(&context_records))); add_res.map_err(to_py_err)?; for item in prepared { @@ -264,31 +304,40 @@ impl Context { Ok(()) } - #[pyo3(signature = (query, limit = None))] + #[pyo3(signature = (query, limit = None, include_expired = false, include_retired = false))] fn search( &self, py: Python<'_>, query: Vec, limit: Option, + include_expired: bool, + include_retired: bool, ) -> PyResult> { - let hits_res = py.allow_threads(|| self.runtime.block_on(self.store.search(&query, limit))); + let options = LifecycleQueryOptions::new(include_expired, include_retired); + let hits_res = py.allow_threads(|| { + self.runtime + .block_on(self.store.search_with_options(&query, limit, options)) + }); let hits = hits_res.map_err(to_py_err)?; hits.into_iter() .map(|hit| search_hit_to_py(py, hit)) .collect() } - #[pyo3(signature = (limit = None, offset = None))] + #[pyo3(signature = (limit = None, offset = None, include_expired = false, include_retired = false))] fn list( &self, py: Python<'_>, limit: Option, offset: Option, + include_expired: bool, + include_retired: bool, ) -> PyResult> { + let options = LifecycleQueryOptions::new(include_expired, include_retired); // Release GIL during data retrieval let records = py.allow_threads(|| { self.runtime - .block_on(self.store.list(limit, offset)) + .block_on(self.store.list_with_options(limit, offset, options)) .map_err(to_py_err) })?; @@ -382,25 +431,47 @@ impl Context { ) -> PyResult { let role = required_item(dict, "role", index)?.extract::()?; let content = required_item(dict, "content", index)?; - let data_type = - optional_item(dict, "data_type")?.map(|value| value.extract::()); - let embedding = - optional_item(dict, "embedding")?.map(|value| value.extract::>()); + let data_type = optional_item(dict, "data_type")?.map(|value| value.extract::()); + let embedding = optional_item(dict, "embedding")?.map(|value| value.extract::>()); let bot_id = optional_item(dict, "bot_id")?.map(|value| value.extract::()); - let session_id = - optional_item(dict, "session_id")?.map(|value| value.extract::()); + let session_id = optional_item(dict, "session_id")?.map(|value| value.extract::()); let external_id = optional_item(dict, "external_id")?.map(|value| value.extract::()); + let expires_at = optional_item(dict, "expires_at")?.map(|value| value.extract::()); + let retention_policy = + optional_item(dict, "retention_policy")?.map(|value| value.extract::()); + let lifecycle_status = + optional_item(dict, "lifecycle_status")?.map(|value| value.extract::()); + let retired_at = optional_item(dict, "retired_at")?.map(|value| value.extract::()); + let retired_reason = + optional_item(dict, "retired_reason")?.map(|value| value.extract::()); + let supersedes_id = + optional_item(dict, "supersedes_id")?.map(|value| value.extract::()); + let superseded_by_id = + optional_item(dict, "superseded_by_id")?.map(|value| value.extract::()); + + let lifecycle = LifecycleFields { + expires_at: parse_optional_datetime(expires_at.transpose()?, "expires_at")?, + retention_policy: retention_policy.transpose()?, + lifecycle_status: lifecycle_status.transpose()?, + retired_at: parse_optional_datetime(retired_at.transpose()?, "retired_at")?, + retired_reason: retired_reason.transpose()?, + supersedes_id: supersedes_id.transpose()?, + superseded_by_id: superseded_by_id.transpose()?, + }; self.prepare_record( role, &content, - data_type.transpose()?, - embedding.transpose()?, - bot_id.transpose()?, - session_id.transpose()?, - external_id.transpose()?, - index as u64 + 1, + RecordOptions { + data_type: data_type.transpose()?, + embedding: embedding.transpose()?, + bot_id: bot_id.transpose()?, + session_id: session_id.transpose()?, + external_id: external_id.transpose()?, + lifecycle, + offset: index as u64 + 1, + }, ) } @@ -408,17 +479,13 @@ impl Context { &self, role: String, content: &Bound<'_, PyAny>, - data_type: Option, - embedding: Option>, - bot_id: Option, - session_id: Option, - external_id: Option, - offset: u64, + options: RecordOptions, ) -> PyResult { let (content_type, text_payload, binary_payload, inner_content) = match content.extract::<&[u8]>() { Ok(bytes) => ( - data_type + options + .data_type .clone() .unwrap_or_else(|| DEFAULT_BINARY_CONTENT_TYPE.to_string()), None, @@ -428,7 +495,8 @@ impl Context { Err(_) => { let content_str = content.str()?.to_string(); ( - data_type + options + .data_type .clone() .unwrap_or_else(|| CONTENT_TYPE_TEXT.to_string()), Some(content_str.clone()), @@ -438,25 +506,35 @@ impl Context { } }; - let record_id = format!("{}-{}", self.run_id, self.inner.entries() + offset); + let record_id = format!("{}-{}", self.run_id, self.inner.entries() + options.offset); Ok(PreparedRecord { record: ContextRecord { id: record_id, - external_id, + external_id: options.external_id, run_id: self.run_id.clone(), - bot_id, - session_id, + bot_id: options.bot_id, + session_id: options.session_id, created_at: Utc::now(), role: role.clone(), state_metadata: None, + expires_at: options.lifecycle.expires_at, + retention_policy: options.lifecycle.retention_policy, + lifecycle_status: options + .lifecycle + .lifecycle_status + .unwrap_or_else(|| LIFECYCLE_ACTIVE.to_string()), + retired_at: options.lifecycle.retired_at, + retired_reason: options.lifecycle.retired_reason, + supersedes_id: options.lifecycle.supersedes_id, + superseded_by_id: options.lifecycle.superseded_by_id, content_type, text_payload, binary_payload, - embedding, + embedding: options.embedding, }, role, inner_content, - data_type, + data_type: options.data_type, }) } } @@ -471,13 +549,27 @@ fn required_item<'py>( }) } -fn optional_item<'py>( - dict: &Bound<'py, PyDict>, - key: &str, -) -> PyResult>> { +fn optional_item<'py>(dict: &Bound<'py, PyDict>, key: &str) -> PyResult>> { Ok(dict.get_item(key)?.filter(|value| !value.is_none())) } +fn parse_optional_datetime( + value: Option, + field_name: &str, +) -> PyResult>> { + value + .map(|value| { + DateTime::parse_from_rfc3339(&value) + .map(|dt| dt.with_timezone(&Utc)) + .map_err(|err| { + PyTypeError::new_err(format!( + "{field_name} must be an RFC3339 timestamp: {err}" + )) + }) + }) + .transpose() +} + fn compaction_metrics_to_py(py: Python<'_>, metrics: CompactionMetrics) -> PyResult { let dict = PyDict::new(py); dict.set_item("fragments_removed", metrics.fragments_removed)?; @@ -528,6 +620,13 @@ fn record_to_py(py: Python<'_>, record: ContextRecord) -> PyResult { created_at, role, state_metadata, + expires_at, + retention_policy, + lifecycle_status, + retired_at, + retired_reason, + supersedes_id, + superseded_by_id, content_type, text_payload, binary_payload, @@ -558,6 +657,19 @@ fn record_to_py(py: Python<'_>, record: ContextRecord) -> PyResult { None => py.None().into_pyobject(py)?.unbind(), }; dict.set_item("state_metadata", state_obj)?; + dict.set_item( + "expires_at", + expires_at.map(|dt| dt.to_rfc3339_opts(SecondsFormat::Micros, true)), + )?; + dict.set_item("retention_policy", retention_policy)?; + dict.set_item("lifecycle_status", lifecycle_status)?; + dict.set_item( + "retired_at", + retired_at.map(|dt| dt.to_rfc3339_opts(SecondsFormat::Micros, true)), + )?; + dict.set_item("retired_reason", retired_reason)?; + dict.set_item("supersedes_id", supersedes_id)?; + dict.set_item("superseded_by_id", superseded_by_id)?; dict.set_item("content_type", content_type)?; dict.set_item("text_payload", text_payload)?; match binary_payload { diff --git a/python/tests/test_persistence.py b/python/tests/test_persistence.py index e94bda0..54ac38e 100644 --- a/python/tests/test_persistence.py +++ b/python/tests/test_persistence.py @@ -5,9 +5,10 @@ import sys import time import uuid +from datetime import datetime, timedelta, timezone from io import BytesIO from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any import pytest @@ -19,11 +20,20 @@ lance = pytest.importorskip("lance") +if TYPE_CHECKING: + from collections.abc import Iterator + _S3_ACCESS_KEY = "test" _S3_SECRET_KEY = "test" _S3_REGION = "us-east-1" +def _embedding(pivot: float) -> list[float]: + values = [0.0] * 1536 + values[0] = pivot + return values + + def _free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind(("127.0.0.1", 0)) @@ -55,7 +65,7 @@ def _wait_for_moto_ready(client: Any, timeout: float = 5.0) -> None: @pytest.fixture(scope="module") -def moto_endpoint() -> str: +def moto_endpoint() -> Iterator[str]: pytest.importorskip("moto.server") boto3 = pytest.importorskip("boto3") from botocore.config import Config # type: ignore[import-not-found] @@ -156,6 +166,79 @@ def test_text_round_trip(tmp_path: Path) -> None: assert record["content_type"] == "text/plain" +def test_lifecycle_fields_round_trip_and_default_filtering(tmp_path: Path) -> None: + uri = tmp_path / "context.lance" + ctx = Context.create(str(uri)) + now = datetime.now(timezone.utc) + + ctx.add( + "user", + "active memory", + expires_at=now + timedelta(days=1), + retention_policy="long-term", + ) + ctx.add("assistant", "expired trace", expires_at=now - timedelta(days=1)) + ctx.add( + "system", + "superseded fact", + lifecycle_status="superseded", + retired_at=now, + retired_reason="replaced by newer fact", + superseded_by_id="active-id", + ) + ctx.add( + "system", + "failed approach", + lifecycle_status="contradicted", + retired_reason="negative knowledge", + ) + + visible = ctx.list() + assert [record["text"] for record in visible] == [ + "active memory", + "failed approach", + ] + assert visible[0]["retention_policy"] == "long-term" + assert visible[0]["lifecycle_status"] == "active" + + all_records = ctx.list(include_expired=True, include_retired=True) + assert [record["text"] for record in all_records] == [ + "active memory", + "expired trace", + "superseded fact", + "failed approach", + ] + + expired = all_records[1] + assert expired["expires_at"] is not None + assert expired["expires_at"] < datetime.now(timezone.utc) + + superseded = all_records[2] + assert superseded["lifecycle_status"] == "superseded" + assert superseded["retired_reason"] == "replaced by newer fact" + assert superseded["superseded_by_id"] == "active-id" + + +def test_search_applies_lifecycle_filter_before_limit(tmp_path: Path) -> None: + uri = tmp_path / "context.lance" + ctx = Context.create(str(uri)) + now = datetime.now(timezone.utc) + + ctx.add("user", "active memory", embedding=_embedding(1.0)) + ctx.add( + "assistant", + "expired but closer", + embedding=_embedding(0.0), + expires_at=now - timedelta(minutes=1), + ) + + hits = ctx.search(_embedding(0.0), limit=1) + assert [hit["text"] for hit in hits] == ["active memory"] + + hits_with_expired = ctx.search(_embedding(0.0), limit=1, include_expired=True) + assert [hit["text"] for hit in hits_with_expired] == ["expired but closer"] + + def test_image_round_trip(tmp_path: Path) -> None: Image = pytest.importorskip("PIL.Image") uri = tmp_path / "context.lance" diff --git a/python/tests/test_search.py b/python/tests/test_search.py index f29609e..b0abaa0 100644 --- a/python/tests/test_search.py +++ b/python/tests/test_search.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timezone from typing import Any import pytest @@ -12,8 +12,8 @@ class DummyInner: def __init__(self) -> None: - self.search_calls: list[tuple[list[float], int | None]] = [] - self.list_calls: list[tuple[int | None, int | None]] = [] + self.search_calls: list[tuple[list[float], int | None, bool, bool]] = [] + self.list_calls: list[tuple[int | None, int | None, bool, bool]] = [] self.get_calls: list[tuple[str | None, str | None]] = [] self.add_calls: list[ tuple[ @@ -26,6 +26,7 @@ def __init__(self) -> None: str | None, ] ] = [] + self.lifecycle_add_calls: list[dict[str, Any]] = [] self.add_many_calls: list[list[dict[str, Any]]] = [] def add( @@ -37,10 +38,28 @@ def add( bot_id: str | None, session_id: str | None, external_id: str | None, + expires_at: str | None = None, + retention_policy: str | None = None, + lifecycle_status: str | None = None, + retired_at: str | None = None, + retired_reason: str | None = None, + supersedes_id: str | None = None, + superseded_by_id: str | None = None, ): self.add_calls.append( (role, content, data_type, embedding, bot_id, session_id, external_id) ) + self.lifecycle_add_calls.append( + { + "expires_at": expires_at, + "retention_policy": retention_policy, + "lifecycle_status": lifecycle_status, + "retired_at": retired_at, + "retired_reason": retired_reason, + "supersedes_id": supersedes_id, + "superseded_by_id": superseded_by_id, + } + ) def get(self, id: str | None, external_id: str | None): self.get_calls.append((id, external_id)) @@ -51,8 +70,14 @@ def get(self, id: str | None, external_id: str | None): def add_many(self, records: list[dict[str, Any]]): self.add_many_calls.append(records) - def search(self, vector: list[float], limit: int | None): - self.search_calls.append((vector, limit)) + def search( + self, + vector: list[float], + limit: int | None, + include_expired: bool = False, + include_retired: bool = False, + ): + self.search_calls.append((vector, limit, include_expired, include_retired)) return [ { "id": "rec-1", @@ -71,8 +96,14 @@ def search(self, vector: list[float], limit: int | None): } ] - def list(self, limit: int | None, offset: int | None): - self.list_calls.append((limit, offset)) + def list( + self, + limit: int | None, + offset: int | None, + include_expired: bool = False, + include_retired: bool = False, + ): + self.list_calls.append((limit, offset, include_expired, include_retired)) return [ { "id": "rec-1", @@ -145,13 +176,23 @@ def test_context_search_formats_results(): hits = ctx.search([0.5, 0.4], limit=3) - assert dummy.search_calls == [([0.5, 0.4], 3)] + assert dummy.search_calls == [([0.5, 0.4], 3, False, False)] assert hits[0]["id"] == "rec-1" assert hits[0]["text"] == "hello" assert hits[0]["binary"] is None assert isinstance(hits[0]["created_at"], datetime) +def test_context_search_passes_lifecycle_flags(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + ctx.search([0.5, 0.4], include_expired=True, include_retired=True) + + assert dummy.search_calls == [([0.5, 0.4], None, True, True)] + + def test_normalize_record_without_distance(): result = _normalize_record( { @@ -179,7 +220,7 @@ def test_context_list_returns_entries(): entries = ctx.list(limit=10, offset=5) - assert dummy.list_calls == [(10, 5)] + assert dummy.list_calls == [(10, 5, False, False)] assert len(entries) == 2 assert entries[0]["id"] == "rec-1" assert entries[0]["text"] == "hello" @@ -190,6 +231,16 @@ def test_context_list_returns_entries(): assert isinstance(entries[0]["created_at"], datetime) +def test_context_list_passes_lifecycle_flags(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + ctx.list(include_expired=True, include_retired=True) + + assert dummy.list_calls == [(None, None, True, True)] + + def test_context_get_by_external_id(): ctx = Context.__new__(Context) dummy = DummyInner() @@ -241,7 +292,7 @@ def test_context_list_default_args(): ctx.list() - assert dummy.list_calls == [(None, None)] + assert dummy.list_calls == [(None, None, False, False)] def test_context_add_with_embedding(): @@ -376,6 +427,46 @@ def test_context_add_with_all_options(): assert external_id == "doc-1#chunk-1" +def test_context_add_with_lifecycle_fields(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + expires_at = datetime(2026, 7, 1, tzinfo=timezone.utc) + ctx.add( + "assistant", + "temporary session note", + expires_at=expires_at, + retention_policy="session", + lifecycle_status="active", + retired_at="2026-08-01T00:00:00Z", + retired_reason="policy-revoked", + supersedes_id="old-id", + superseded_by_id="new-id", + ) + + assert dummy.lifecycle_add_calls == [ + { + "expires_at": "2026-07-01T00:00:00Z", + "retention_policy": "session", + "lifecycle_status": "active", + "retired_at": "2026-08-01T00:00:00Z", + "retired_reason": "policy-revoked", + "supersedes_id": "old-id", + "superseded_by_id": "new-id", + } + ] + + +def test_context_add_rejects_naive_lifecycle_datetime(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + with pytest.raises(ValueError, match="timezone"): + ctx.add("user", "hello", expires_at=datetime(2026, 7, 1)) + + def test_context_add_many_normalizes_records(): ctx = Context.__new__(Context) dummy = DummyInner() @@ -406,6 +497,13 @@ def test_context_add_many_normalizes_records(): "bot_id": None, "session_id": None, "external_id": None, + "expires_at": None, + "retention_policy": None, + "lifecycle_status": None, + "retired_at": None, + "retired_reason": None, + "supersedes_id": None, + "superseded_by_id": None, }, { "role": "assistant", @@ -415,6 +513,13 @@ def test_context_add_many_normalizes_records(): "bot_id": "bot", "session_id": "sess", "external_id": "doc-1#chunk-2", + "expires_at": None, + "retention_policy": None, + "lifecycle_status": None, + "retired_at": None, + "retired_reason": None, + "supersedes_id": None, + "superseded_by_id": None, }, ] ] @@ -430,6 +535,33 @@ def test_context_add_many_accepts_data_type_alias(): assert dummy.add_many_calls[0][0]["data_type"] == "text/plain" +def test_context_add_many_passes_lifecycle_fields(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + ctx.add_many( + [ + { + "role": "assistant", + "content": "temporary", + "expires_at": datetime(2026, 7, 1, tzinfo=timezone.utc), + "retention_policy": "session", + "lifecycle_status": "superseded", + "retired_reason": "replaced", + "superseded_by_id": "new-id", + } + ] + ) + + record = dummy.add_many_calls[0][0] + assert record["expires_at"] == "2026-07-01T00:00:00Z" + assert record["retention_policy"] == "session" + assert record["lifecycle_status"] == "superseded" + assert record["retired_reason"] == "replaced" + assert record["superseded_by_id"] == "new-id" + + def test_context_add_many_rejects_invalid_records(): ctx = Context.__new__(Context) dummy = DummyInner() @@ -474,3 +606,35 @@ def test_normalize_record_with_agent_and_session_id(): assert result["bot_id"] == "support_bot" assert result["session_id"] == "user_88" assert result["external_id"] == "source-1" + + +def test_normalize_record_with_lifecycle_fields(): + result = _normalize_record( + { + "id": "rec-1", + "external_id": "source-1", + "created_at": "2024-01-01T00:00:00Z", + "content_type": "text/plain", + "text_payload": "hello", + "binary_payload": None, + "embedding": None, + "run_id": "run-1", + "role": "user", + "state_metadata": None, + "expires_at": "2026-07-01T00:00:00Z", + "retention_policy": "session", + "lifecycle_status": "superseded", + "retired_at": "2026-07-02T00:00:00Z", + "retired_reason": "replaced", + "supersedes_id": "old-id", + "superseded_by_id": "new-id", + } + ) + + assert isinstance(result["expires_at"], datetime) + assert result["retention_policy"] == "session" + assert result["lifecycle_status"] == "superseded" + assert isinstance(result["retired_at"], datetime) + assert result["retired_reason"] == "replaced" + assert result["supersedes_id"] == "old-id" + assert result["superseded_by_id"] == "new-id" From b534cf9e09051ef316730b9199637481838db15e Mon Sep 17 00:00:00 2001 From: Allen Cheng Date: Tue, 9 Jun 2026 14:23:57 -0700 Subject: [PATCH 2/4] fix: honor supersession pointers in lifecycle reads --- README.md | 17 ++++----- crates/lance-context-core/src/store.rs | 53 +++++++++++++++++++++++--- python/tests/test_persistence.py | 24 ++++++++++++ 3 files changed, 78 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 02d54b3..e09b58b 100644 --- a/README.md +++ b/README.md @@ -87,22 +87,19 @@ ctx.add_many([ }, ]) -# Lifecycle metadata is stored in first-class columns. Expired and retired rows -# are excluded from default list/search calls, while contradicted records stay -# visible so negative knowledge can remain a guardrail for future recall. +# Lifecycle metadata is stored in first-class columns. Expired, retired, and +# superseded rows are excluded from default list/search calls, while contradicted +# records stay visible so negative knowledge can remain a guardrail for recall. ctx.add( "assistant", "temporary session note", expires_at="2026-07-01T00:00:00Z", retention_policy="session", ) -ctx.add( - "system", - "older fact kept as history", - lifecycle_status="superseded", - retired_reason="replaced by newer fact", - superseded_by_id="new-record-id", -) +ctx.add("system", "deployment endpoint is /v1") +old_endpoint = ctx.list()[-1] +ctx.add("system", "deployment endpoint is /v2", supersedes_id=old_endpoint["id"]) +ctx.add("system", "do not use the legacy parser", lifecycle_status="contradicted") visible = ctx.list() with_lifecycle_history = ctx.list(include_expired=True, include_retired=True) diff --git a/crates/lance-context-core/src/store.rs b/crates/lance-context-core/src/store.rs index 4271735..9af111e 100644 --- a/crates/lance-context-core/src/store.rs +++ b/crates/lance-context-core/src/store.rs @@ -399,7 +399,7 @@ impl ContextStore { .await } - /// List records, applying lifecycle visibility before offset/limit. + /// List records, applying lifecycle visibility and supersession before offset/limit. pub async fn list_with_options( &self, limit: Option, @@ -410,13 +410,25 @@ impl ContextStore { let mut stream = scanner.try_into_stream().await?; let mut results = Vec::new(); while let Some(batch) = stream.try_next().await? { - results.extend( - batch_to_records(&batch)? - .into_iter() - .filter(|record| options.is_visible(record)), - ); + results.extend(batch_to_records(&batch)?); } + let superseded_ids: HashSet = results + .iter() + .filter_map(|record| { + let supersedes_id = record.supersedes_id.as_ref()?; + if supersedes_id == &record.id { + None + } else { + Some(supersedes_id.clone()) + } + }) + .collect(); + results.retain(|record| { + options.is_visible(record) + && (options.include_retired || !superseded_ids.contains(&record.id)) + }); + if let Some(offset) = offset { results = results.into_iter().skip(offset).collect(); } @@ -1565,6 +1577,35 @@ mod tests { }); } + #[test] + fn list_hides_records_superseded_by_newer_pointer() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut store = ContextStore::open(&uri).await.unwrap(); + let old = text_record("old", 0.0); + let mut replacement = text_record("new", 1.0); + replacement.supersedes_id = Some(old.id.clone()); + store + .add(&[old.clone(), replacement.clone()]) + .await + .unwrap(); + + let visible = store.list(None, None).await.unwrap(); + assert_eq!(visible.len(), 1); + assert_eq!(visible[0].id, replacement.id); + + let history = store + .list_with_options(None, None, LifecycleQueryOptions::new(false, true)) + .await + .unwrap(); + assert_eq!(history.len(), 2); + assert!(history.iter().any(|record| record.id == old.id)); + assert!(history.iter().any(|record| record.id == replacement.id)); + }); + } + #[test] fn search_filters_lifecycle_before_ranking() { let dir = TempDir::new().unwrap(); diff --git a/python/tests/test_persistence.py b/python/tests/test_persistence.py index 54ac38e..8d0334b 100644 --- a/python/tests/test_persistence.py +++ b/python/tests/test_persistence.py @@ -239,6 +239,30 @@ def test_search_applies_lifecycle_filter_before_limit(tmp_path: Path) -> None: assert [hit["text"] for hit in hits_with_expired] == ["expired but closer"] +def test_supersedes_pointer_hides_old_record_by_default(tmp_path: Path) -> None: + uri = tmp_path / "context.lance" + ctx = Context.create(str(uri)) + + ctx.add("user", "old value", embedding=_embedding(0.0)) + old = ctx.list()[0] + + ctx.add( + "user", + "new value", + embedding=_embedding(1.0), + supersedes_id=old["id"], + ) + + assert [record["text"] for record in ctx.list()] == ["new value"] + assert [record["text"] for record in ctx.search(_embedding(0.0), limit=10)] == [ + "new value" + ] + + history = ctx.list(include_retired=True) + assert [record["text"] for record in history] == ["old value", "new value"] + assert history[1]["supersedes_id"] == old["id"] + + def test_image_round_trip(tmp_path: Path) -> None: Image = pytest.importorskip("PIL.Image") uri = tmp_path / "context.lance" From 75130f6d96a3689e9bee63805948616adbfdb45b Mon Sep 17 00:00:00 2001 From: Allen Cheng Date: Tue, 9 Jun 2026 15:57:15 -0700 Subject: [PATCH 3/4] fix: compare lifecycle timestamps at stored precision --- crates/lance-context-core/src/store.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/lance-context-core/src/store.rs b/crates/lance-context-core/src/store.rs index 9af111e..b027323 100644 --- a/crates/lance-context-core/src/store.rs +++ b/crates/lance-context-core/src/store.rs @@ -1564,7 +1564,12 @@ mod tests { .unwrap(); assert_eq!(all.len(), 3); let expired_roundtrip = all.iter().find(|record| record.id == expired.id).unwrap(); - assert_eq!(expired_roundtrip.expires_at, expired.expires_at); + assert_eq!( + expired_roundtrip + .expires_at + .map(|value| value.timestamp_micros()), + expired.expires_at.map(|value| value.timestamp_micros()) + ); let superseded_roundtrip = all .iter() .find(|record| record.id == superseded.id) From a90bddf61c66630dceec884431aac1b9cc98acfd Mon Sep 17 00:00:00 2001 From: Allen Cheng Date: Tue, 9 Jun 2026 16:49:28 -0700 Subject: [PATCH 4/4] fix: restore core recursion limit --- crates/lance-context-core/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/lance-context-core/src/lib.rs b/crates/lance-context-core/src/lib.rs index 1b661b8..cf3b98f 100644 --- a/crates/lance-context-core/src/lib.rs +++ b/crates/lance-context-core/src/lib.rs @@ -1,4 +1,5 @@ //! Core types for the lance-context storage layer. +#![recursion_limit = "256"] mod context; mod record;