diff --git a/README.md b/README.md index 1497e6d..800e41a 100644 --- a/README.md +++ b/README.md @@ -215,6 +215,13 @@ let record = ContextRecord { custom: None, }), metadata: None, + expires_at: None, + retention_policy: None, + lifecycle_status: "active".into(), + retired_at: None, + retired_reason: None, + supersedes_id: None, + superseded_by_id: None, content_type: "text/plain".into(), text_payload: Some("hello world".into()), binary_payload: None, diff --git a/crates/lance-context-core/src/lib.rs b/crates/lance-context-core/src/lib.rs index fd24f82..cf3b98f 100644 --- a/crates/lance-context-core/src/lib.rs +++ b/crates/lance-context-core/src/lib.rs @@ -7,7 +7,10 @@ pub mod serde; mod store; pub use context::{Context, ContextEntry, Snapshot}; -pub use record::{ContextRecord, MetadataFilter, RecordFilters, SearchResult, StateMetadata}; +pub use record::{ + ContextRecord, LifecycleQueryOptions, MetadataFilter, RecordFilters, SearchResult, + StateMetadata, LIFECYCLE_ACTIVE, LIFECYCLE_CONTRADICTED, +}; pub use store::{ CompactionConfig, CompactionStats, ContextStore, ContextStoreOptions, IdIndexType, }; diff --git a/crates/lance-context-core/src/record.rs b/crates/lance-context-core/src/record.rs index ad311f6..0a823ea 100644 --- a/crates/lance-context-core/src/record.rs +++ b/crates/lance-context-core/src/record.rs @@ -4,6 +4,9 @@ use std::collections::HashMap; use crate::serde::CONTENT_TYPE_TOMBSTONE; +pub const LIFECYCLE_ACTIVE: &str = "active"; +pub const LIFECYCLE_CONTRADICTED: &str = "contradicted"; + /// Structured metadata captured alongside each context entry. #[derive(Debug, Clone, Default)] pub struct StateMetadata { @@ -25,6 +28,13 @@ pub struct ContextRecord { pub role: String, pub state_metadata: Option, pub metadata: Option, + pub expires_at: Option>, + pub retention_policy: Option, + pub lifecycle_status: String, + pub retired_at: Option>, + pub retired_reason: Option, + pub supersedes_id: Option, + pub superseded_by_id: Option, pub content_type: String, pub text_payload: Option, pub binary_payload: Option>, @@ -36,6 +46,69 @@ impl ContextRecord { pub fn is_tombstone(&self) -> bool { self.content_type == CONTENT_TYPE_TOMBSTONE } + + #[must_use] + pub fn is_expired_at(&self, now: DateTime) -> bool { + self.expires_at.is_some_and(|expires_at| expires_at <= now) + } + + #[must_use] + pub fn is_hidden_by_lifecycle(&self) -> bool { + if self.lifecycle_status == LIFECYCLE_ACTIVE + || self.lifecycle_status == LIFECYCLE_CONTRADICTED + { + return self.retired_at.is_some() || self.superseded_by_id.is_some(); + } + + true + } + + #[must_use] + pub fn has_non_default_lifecycle(&self) -> bool { + self.expires_at.is_some() + || self.retention_policy.is_some() + || self.lifecycle_status != LIFECYCLE_ACTIVE + || self.retired_at.is_some() + || self.retired_reason.is_some() + || self.supersedes_id.is_some() + || self.superseded_by_id.is_some() + } +} + +/// Query-time controls for lifecycle-aware retrieval. +#[derive(Debug, Clone)] +pub struct LifecycleQueryOptions { + pub include_expired: bool, + pub include_retired: bool, + pub reference_time: DateTime, +} + +impl Default for LifecycleQueryOptions { + fn default() -> Self { + Self { + include_expired: false, + include_retired: false, + reference_time: Utc::now(), + } + } +} + +impl LifecycleQueryOptions { + #[must_use] + pub fn new(include_expired: bool, include_retired: bool) -> Self { + Self { + include_expired, + include_retired, + ..Self::default() + } + } + + #[must_use] + pub fn is_visible(&self, record: &ContextRecord) -> bool { + !record.is_tombstone() + && (self.include_expired || !record.is_expired_at(self.reference_time)) + && (self.include_retired || !record.is_hidden_by_lifecycle()) + } } /// Result returned from a vector similarity search. @@ -163,6 +236,13 @@ mod tests { "tags": ["runbook", "ownership"], "confidence": 0.92 })), + expires_at: None, + retention_policy: None, + lifecycle_status: LIFECYCLE_ACTIVE.to_string(), + retired_at: None, + retired_reason: None, + supersedes_id: None, + superseded_by_id: None, content_type: "text/plain".to_string(), text_payload: Some("hello".to_string()), binary_payload: None, diff --git a/crates/lance-context-core/src/store.rs b/crates/lance-context-core/src/store.rs index 629f804..9d7b973 100644 --- a/crates/lance-context-core/src/store.rs +++ b/crates/lance-context-core/src/store.rs @@ -31,7 +31,10 @@ use tokio::task::JoinHandle; use tracing::{error, info, warn}; use uuid::Uuid; -use crate::record::{ContextRecord, RecordFilters, SearchResult, StateMetadata}; +use crate::record::{ + ContextRecord, LifecycleQueryOptions, RecordFilters, SearchResult, StateMetadata, + LIFECYCLE_ACTIVE, +}; use crate::serde::CONTENT_TYPE_TOMBSTONE; /// Embedding length used for the semantic index column. @@ -292,6 +295,13 @@ impl ContextStore { role: record.role, state_metadata: None, metadata: None, + expires_at: None, + retention_policy: None, + lifecycle_status: LIFECYCLE_ACTIVE.to_string(), + retired_at: None, + retired_reason: None, + supersedes_id: None, + superseded_by_id: None, content_type: CONTENT_TYPE_TOMBSTONE.to_string(), text_payload: None, binary_payload: None, @@ -329,7 +339,10 @@ impl ContextStore { } } - for record in self.list(None, None).await? { + for record in self + .list_with_options(None, None, LifecycleQueryOptions::new(true, true)) + .await? + { if ids.contains(record.id.as_str()) { return Err(ArrowError::InvalidArgumentError(format!( "id '{}' already exists", @@ -384,7 +397,8 @@ impl ContextStore { limit: Option, offset: Option, ) -> LanceResult> { - self.list_filtered(limit, offset, None).await + self.list_filtered_with_options(limit, offset, None, LifecycleQueryOptions::default()) + .await } /// List records matching filters. @@ -393,18 +407,52 @@ impl ContextStore { limit: Option, offset: Option, filters: Option<&RecordFilters>, + ) -> LanceResult> { + self.list_filtered_with_options(limit, offset, filters, LifecycleQueryOptions::default()) + .await + } + + /// List records, applying lifecycle visibility and supersession before offset/limit. + pub async fn list_with_options( + &self, + limit: Option, + offset: Option, + options: LifecycleQueryOptions, + ) -> LanceResult> { + self.list_filtered_with_options(limit, offset, None, options) + .await + } + + /// List records matching filters, applying lifecycle visibility before offset/limit. + pub async fn list_filtered_with_options( + &self, + limit: Option, + offset: Option, + filters: Option<&RecordFilters>, + options: LifecycleQueryOptions, ) -> LanceResult> { let scanner = self.lsm_scanner().await?; let mut stream = scanner.try_into_stream().await?; let mut results = Vec::new(); while let Some(batch) = stream.try_next().await? { - results.extend( - batch_to_records(&batch)? - .into_iter() - .filter(|record| !record.is_tombstone()), - ); + results.extend(batch_to_records(&batch)?); } + let superseded_ids: HashSet = results + .iter() + .filter_map(|record| { + let supersedes_id = record.supersedes_id.as_ref()?; + if supersedes_id == &record.id { + None + } else { + Some(supersedes_id.clone()) + } + }) + .collect(); + results.retain(|record| { + options.is_visible(record) + && (options.include_retired || !superseded_ids.contains(&record.id)) + }); if let Some(filters) = filters.filter(|filters| !filters.is_empty()) { results.retain(|record| filters.matches(record)); } @@ -445,7 +493,8 @@ impl ContextStore { query: &[f32], limit: Option, ) -> LanceResult> { - self.search_filtered(query, limit, None).await + self.search_filtered_with_options(query, limit, None, LifecycleQueryOptions::default()) + .await } /// Perform a nearest-neighbor search over stored embeddings matching filters. @@ -454,6 +503,29 @@ impl ContextStore { query: &[f32], limit: Option, filters: Option<&RecordFilters>, + ) -> LanceResult> { + self.search_filtered_with_options(query, limit, filters, LifecycleQueryOptions::default()) + .await + } + + /// Perform nearest-neighbor search after applying lifecycle visibility. + pub async fn search_with_options( + &self, + query: &[f32], + limit: Option, + options: LifecycleQueryOptions, + ) -> LanceResult> { + self.search_filtered_with_options(query, limit, None, options) + .await + } + + /// Perform nearest-neighbor search after applying filters and lifecycle visibility. + pub async fn search_filtered_with_options( + &self, + query: &[f32], + limit: Option, + filters: Option<&RecordFilters>, + options: LifecycleQueryOptions, ) -> LanceResult> { if query.len() != DEFAULT_EMBEDDING_DIM as usize { return Err(ArrowError::InvalidArgumentError(format!( @@ -470,7 +542,7 @@ impl ContextStore { } let mut results: Vec = self - .list_filtered(None, None, filters) + .list_filtered_with_options(None, None, filters, options) .await? .into_iter() .filter_map(|record| { @@ -733,13 +805,14 @@ impl ContextStore { /// Lance V1 blob encoding (out-of-line binary buffers). For `text_payload`, /// this also changes the Arrow type from `LargeUtf8` to `LargeBinary`. pub fn schema(blob_columns: &HashSet) -> Schema { - Self::schema_with_options(blob_columns, true, true) + Self::schema_with_options(blob_columns, true, true, true) } fn schema_with_options( blob_columns: &HashSet, include_external_id: bool, include_metadata: bool, + include_lifecycle: bool, ) -> Schema { let mut id_metadata = HashMap::new(); id_metadata.insert( @@ -798,6 +871,25 @@ impl ContextStore { if include_metadata { fields.push(Field::new("metadata", DataType::LargeUtf8, true)); } + if include_lifecycle { + fields.extend([ + Field::new( + "expires_at", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + Field::new("retention_policy", DataType::Utf8, true), + Field::new("lifecycle_status", DataType::Utf8, false), + Field::new( + "retired_at", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + Field::new("retired_reason", DataType::Utf8, true), + Field::new("supersedes_id", DataType::Utf8, true), + Field::new("superseded_by_id", DataType::Utf8, true), + ]); + } fields.extend([ Field::new("content_type", DataType::Utf8, false), text_field, @@ -866,6 +958,18 @@ impl ContextStore { .field_paths() .iter() .any(|path| path == "external_id"); + let include_lifecycle = self + .dataset + .schema() + .field_paths() + .iter() + .any(|path| path == "expires_at"); + let include_metadata = self + .dataset + .schema() + .field_paths() + .iter() + .any(|path| path == "metadata"); if !include_external_id && entries.iter().any(|entry| entry.external_id.is_some()) { return Err(ArrowError::InvalidArgumentError( "external_id requires a context dataset created with external_id support" @@ -873,18 +977,19 @@ impl ContextStore { ) .into()); } - let include_metadata = self - .dataset - .schema() - .field_paths() - .iter() - .any(|path| path == "metadata"); if !include_metadata && entries.iter().any(|entry| entry.metadata.is_some()) { return Err(ArrowError::InvalidArgumentError( "metadata requires a context dataset created with metadata support".to_string(), ) .into()); } + if !include_lifecycle && entries.iter().any(ContextRecord::has_non_default_lifecycle) { + return Err(ArrowError::InvalidArgumentError( + "lifecycle fields require a context dataset created with lifecycle support" + .to_string(), + ) + .into()); + } let mut id_builder = StringBuilder::new(); let mut external_id_builder = StringBuilder::new(); @@ -894,6 +999,13 @@ impl ContextStore { let mut created_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len()); let mut role_builder = StringDictionaryBuilder::::new(); let mut metadata_builder = LargeStringBuilder::new(); + let mut expires_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len()); + let mut retention_policy_builder = StringBuilder::new(); + let mut lifecycle_status_builder = StringBuilder::new(); + let mut retired_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len()); + let mut retired_reason_builder = StringBuilder::new(); + let mut supersedes_id_builder = StringBuilder::new(); + let mut superseded_by_id_builder = StringBuilder::new(); let mut content_type_builder = StringBuilder::new(); let mut binary_builder = LargeBinaryBuilder::new(); @@ -940,6 +1052,15 @@ impl ContextStore { Some(metadata) => metadata_builder.append_value(metadata.to_string()), None => metadata_builder.append_null(), } + expires_at_builder + .append_option(entry.expires_at.map(|value| value.timestamp_micros())); + retention_policy_builder.append_option(entry.retention_policy.as_deref()); + lifecycle_status_builder.append_value(&entry.lifecycle_status); + retired_at_builder + .append_option(entry.retired_at.map(|value| value.timestamp_micros())); + retired_reason_builder.append_option(entry.retired_reason.as_deref()); + supersedes_id_builder.append_option(entry.supersedes_id.as_deref()); + superseded_by_id_builder.append_option(entry.superseded_by_id.as_deref()); content_type_builder.append_value(&entry.content_type); if text_is_blob { @@ -1034,6 +1155,13 @@ impl ContextStore { let created_at_array: ArrayRef = Arc::new(created_at_builder.finish()); let role_array: ArrayRef = Arc::new(role_builder.finish()); let metadata_array: ArrayRef = Arc::new(metadata_builder.finish()); + let expires_at_array: ArrayRef = Arc::new(expires_at_builder.finish()); + let retention_policy_array: ArrayRef = Arc::new(retention_policy_builder.finish()); + let lifecycle_status_array: ArrayRef = Arc::new(lifecycle_status_builder.finish()); + let retired_at_array: ArrayRef = Arc::new(retired_at_builder.finish()); + let retired_reason_array: ArrayRef = Arc::new(retired_reason_builder.finish()); + let supersedes_id_array: ArrayRef = Arc::new(supersedes_id_builder.finish()); + let superseded_by_id_array: ArrayRef = Arc::new(superseded_by_id_builder.finish()); let content_type_array: ArrayRef = Arc::new(content_type_builder.finish()); let text_array: ArrayRef = if text_is_blob { Arc::new(text_binary_builder.unwrap().finish()) @@ -1048,6 +1176,7 @@ impl ContextStore { &self.blob_columns, include_external_id, include_metadata, + include_lifecycle, )); let mut arrays = vec![id_array]; if include_external_id { @@ -1064,6 +1193,17 @@ impl ContextStore { if include_metadata { arrays.push(metadata_array); } + if include_lifecycle { + arrays.extend([ + expires_at_array, + retention_policy_array, + lifecycle_status_array, + retired_at_array, + retired_reason_array, + supersedes_id_array, + superseded_by_id_array, + ]); + } arrays.extend([ content_type_array, text_array, @@ -1098,6 +1238,13 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { let role_array = column_as::>(batch, "role")?; let state_array = column_as::(batch, "state_metadata")?; let metadata_array = column_as_optional::(batch, "metadata"); + let expires_at_array = column_as_optional::(batch, "expires_at"); + let retention_policy_array = column_as_optional::(batch, "retention_policy"); + let lifecycle_status_array = column_as_optional::(batch, "lifecycle_status"); + let retired_at_array = column_as_optional::(batch, "retired_at"); + let retired_reason_array = column_as_optional::(batch, "retired_reason"); + let supersedes_id_array = column_as_optional::(batch, "supersedes_id"); + let superseded_by_id_array = column_as_optional::(batch, "superseded_by_id"); let content_type_array = column_as::(batch, "content_type")?; let binary_array = column_as::(batch, "binary_payload")?; let embedding_array = column_as::(batch, "embedding")?; @@ -1162,13 +1309,7 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { let mut results = Vec::with_capacity(batch.num_rows()); for row in 0..batch.num_rows() { - let created_at = - DateTime::from_timestamp_micros(created_at_array.value(row)).ok_or_else(|| { - LanceError::from(ArrowError::InvalidArgumentError(format!( - "invalid timestamp value {}", - created_at_array.value(row) - ))) - })?; + let created_at = timestamp_from_micros(created_at_array.value(row), "created_at")?; let state_metadata = if state_array.is_null(row) { None @@ -1271,6 +1412,14 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { } _ => None, }; + let expires_at = optional_timestamp_from_array(expires_at_array, row, "expires_at")?; + let retention_policy = optional_string_from_array(retention_policy_array, row); + let lifecycle_status = optional_string_from_array(lifecycle_status_array, row) + .unwrap_or_else(|| LIFECYCLE_ACTIVE.to_string()); + let retired_at = optional_timestamp_from_array(retired_at_array, row, "retired_at")?; + let retired_reason = optional_string_from_array(retired_reason_array, row); + let supersedes_id = optional_string_from_array(supersedes_id_array, row); + let superseded_by_id = optional_string_from_array(superseded_by_id_array, row); results.push(ContextRecord { id: id_array.value(row).to_string(), @@ -1288,6 +1437,13 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult> { role, state_metadata, metadata, + expires_at, + retention_policy, + lifecycle_status, + retired_at, + retired_reason, + supersedes_id, + superseded_by_id, content_type: content_type_array.value(row).to_string(), text_payload, binary_payload, @@ -1316,6 +1472,39 @@ fn embedding_from_list(list: &FixedSizeListArray, row: usize) -> LanceResult LanceResult> { + DateTime::from_timestamp_micros(value).ok_or_else(|| { + LanceError::from(ArrowError::InvalidArgumentError(format!( + "invalid timestamp value {value} in column '{column}'" + ))) + }) +} + +fn optional_timestamp_from_array( + array: Option<&TimestampMicrosecondArray>, + row: usize, + column: &str, +) -> LanceResult>> { + let Some(array) = array else { + return Ok(None); + }; + if array.is_null(row) { + Ok(None) + } else { + timestamp_from_micros(array.value(row), column).map(Some) + } +} + +fn optional_string_from_array(array: Option<&StringArray>, row: usize) -> Option { + array.and_then(|arr| { + if arr.is_null(row) { + None + } else { + Some(arr.value(row).to_string()) + } + }) +} + fn l2_distance(left: &[f32], right: &[f32]) -> f32 { left.iter() .zip(right) @@ -1356,7 +1545,7 @@ where mod tests { use super::*; use crate::serde::CONTENT_TYPE_TEXT; - use chrono::Utc; + use chrono::{Duration as ChronoDuration, Utc}; use tempfile::TempDir; fn make_embedding(pivot: f32) -> Vec { @@ -1383,6 +1572,13 @@ mod tests { custom: None, }), metadata: None, + expires_at: None, + retention_policy: None, + lifecycle_status: LIFECYCLE_ACTIVE.to_string(), + retired_at: None, + retired_reason: None, + supersedes_id: None, + superseded_by_id: None, content_type: CONTENT_TYPE_TEXT.to_string(), text_payload: Some(format!("payload-{id}")), binary_payload: None, @@ -1430,6 +1626,112 @@ mod tests { }); } + #[test] + fn list_hides_expired_and_retired_records_by_default() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut store = ContextStore::open(&uri).await.unwrap(); + let active = text_record("active", 0.0); + let mut expired = text_record("expired", 0.0); + expired.expires_at = Some(Utc::now() - ChronoDuration::minutes(1)); + let mut superseded = text_record("superseded", 0.0); + superseded.lifecycle_status = "superseded".to_string(); + superseded.retired_reason = Some("replaced by newer fact".to_string()); + superseded.superseded_by_id = Some("active".to_string()); + + store + .add(&[active.clone(), expired.clone(), superseded.clone()]) + .await + .unwrap(); + + let visible = store.list(None, None).await.unwrap(); + assert_eq!(visible.len(), 1); + assert_eq!(visible[0].id, active.id); + + let all = store + .list_with_options(None, None, LifecycleQueryOptions::new(true, true)) + .await + .unwrap(); + assert_eq!(all.len(), 3); + let expired_roundtrip = all.iter().find(|record| record.id == expired.id).unwrap(); + assert_eq!( + expired_roundtrip + .expires_at + .map(|value| value.timestamp_micros()), + expired.expires_at.map(|value| value.timestamp_micros()) + ); + let superseded_roundtrip = all + .iter() + .find(|record| record.id == superseded.id) + .unwrap(); + assert_eq!(superseded_roundtrip.lifecycle_status, "superseded"); + assert_eq!( + superseded_roundtrip.superseded_by_id.as_deref(), + Some("active") + ); + }); + } + + #[test] + fn list_hides_records_superseded_by_newer_pointer() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut store = ContextStore::open(&uri).await.unwrap(); + let old = text_record("old", 0.0); + let mut replacement = text_record("new", 1.0); + replacement.supersedes_id = Some(old.id.clone()); + store + .add(&[old.clone(), replacement.clone()]) + .await + .unwrap(); + + let visible = store.list(None, None).await.unwrap(); + assert_eq!(visible.len(), 1); + assert_eq!(visible[0].id, replacement.id); + + let history = store + .list_with_options(None, None, LifecycleQueryOptions::new(false, true)) + .await + .unwrap(); + assert_eq!(history.len(), 2); + assert!(history.iter().any(|record| record.id == old.id)); + assert!(history.iter().any(|record| record.id == replacement.id)); + }); + } + + #[test] + fn search_filters_lifecycle_before_ranking() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut store = ContextStore::open(&uri).await.unwrap(); + let active = text_record("active", 1.0); + let mut expired_better_match = text_record("expired", 0.0); + expired_better_match.expires_at = Some(Utc::now() - ChronoDuration::minutes(1)); + store + .add(&[active.clone(), expired_better_match.clone()]) + .await + .unwrap(); + + let query = make_embedding(0.0); + let visible = store.search(&query, Some(1)).await.unwrap(); + assert_eq!(visible.len(), 1); + assert_eq!(visible[0].record.id, active.id); + + let all = store + .search_with_options(&query, Some(1), LifecycleQueryOptions::new(true, false)) + .await + .unwrap(); + assert_eq!(all.len(), 1); + assert_eq!(all[0].record.id, expired_better_match.id); + }); + } + #[test] fn external_id_roundtrips_and_supports_lookup() { let dir = TempDir::new().unwrap(); diff --git a/python/python/lance_context/api.py b/python/python/lance_context/api.py index 0a3c00c..069675c 100644 --- a/python/python/lance_context/api.py +++ b/python/python/lance_context/api.py @@ -111,11 +111,26 @@ def _coerce_vector(query: Any) -> list[float]: raise TypeError("search query must be a sequence of floats") +def _coerce_timestamp(value: datetime | str | None, *, field_name: str) -> str | None: + if value is None: + return None + if isinstance(value, datetime): + if value.tzinfo is None: + raise ValueError(f"{field_name} must include timezone information") + return value.isoformat().replace("+00:00", "Z") + if isinstance(value, str): + return value + raise TypeError(f"{field_name} must be a datetime, RFC3339 string, or None") + + +def _normalize_timestamp(value: Any) -> Any: + if isinstance(value, str): + return datetime.fromisoformat(value.replace("Z", "+00:00")) + return value + + def _normalize_record(raw: dict[str, Any]) -> dict[str, Any]: """Normalize a raw record dict from the Rust layer.""" - created_at = raw.get("created_at") - if isinstance(created_at, str): - created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00")) return { "id": raw.get("id"), "external_id": raw.get("external_id"), @@ -127,9 +142,16 @@ def _normalize_record(raw: dict[str, Any]) -> dict[str, Any]: "text": raw.get("text_payload"), "binary": raw.get("binary_payload"), "embedding": raw.get("embedding"), - "created_at": created_at, + "created_at": _normalize_timestamp(raw.get("created_at")), "state_metadata": raw.get("state_metadata"), "metadata": raw.get("metadata"), + "expires_at": _normalize_timestamp(raw.get("expires_at")), + "retention_policy": raw.get("retention_policy"), + "lifecycle_status": raw.get("lifecycle_status"), + "retired_at": _normalize_timestamp(raw.get("retired_at")), + "retired_reason": raw.get("retired_reason"), + "supersedes_id": raw.get("supersedes_id"), + "superseded_by_id": raw.get("superseded_by_id"), } @@ -366,6 +388,13 @@ def add( session_id: str | None = None, external_id: str | None = None, metadata: dict[str, Any] | None = None, + expires_at: datetime | str | None = None, + retention_policy: str | None = None, + lifecycle_status: str | None = None, + retired_at: datetime | str | None = None, + retired_reason: str | None = None, + supersedes_id: str | None = None, + superseded_by_id: str | None = None, ) -> None: if content_type is not None and data_type is not None: raise ValueError("Specify only one of content_type or data_type") @@ -381,6 +410,13 @@ def add( session_id, external_id, _json_dumps(metadata, "metadata"), + _coerce_timestamp(expires_at, field_name="expires_at"), + retention_policy, + lifecycle_status, + _coerce_timestamp(retired_at, field_name="retired_at"), + retired_reason, + supersedes_id, + superseded_by_id, ) def add_many(self, records: Iterable[Mapping[str, Any]]) -> None: @@ -388,7 +424,8 @@ def add_many(self, records: Iterable[Mapping[str, Any]]) -> None: Each record accepts the same fields as :meth:`add`: ``role``, ``content``, optional ``content_type``/``data_type``, ``embedding``, - ``bot_id``, ``session_id``, ``external_id``, and ``metadata``. + ``bot_id``, ``session_id``, ``external_id``, ``metadata``, and + lifecycle fields such as ``expires_at`` and ``lifecycle_status``. """ normalized: list[dict[str, Any]] = [] for index, record in enumerate(records): @@ -419,6 +456,19 @@ def add_many(self, records: Iterable[Mapping[str, Any]]) -> None: "session_id": record.get("session_id"), "external_id": record.get("external_id"), "metadata_json": _json_dumps(record.get("metadata"), "metadata"), + "expires_at": _coerce_timestamp( + record.get("expires_at"), + field_name=f"records[{index}].expires_at", + ), + "retention_policy": record.get("retention_policy"), + "lifecycle_status": record.get("lifecycle_status"), + "retired_at": _coerce_timestamp( + record.get("retired_at"), + field_name=f"records[{index}].retired_at", + ), + "retired_reason": record.get("retired_reason"), + "supersedes_id": record.get("supersedes_id"), + "superseded_by_id": record.get("superseded_by_id"), } ) @@ -439,9 +489,18 @@ def search( query: Any, limit: int | None = None, filters: dict[str, Any] | None = None, + *, + include_expired: bool = False, + include_retired: bool = False, ) -> list[dict[str, Any]]: vector = _coerce_vector(query) - results = self._inner.search(vector, limit, _json_dumps(filters, "filters")) + results = self._inner.search( + vector, + limit, + _json_dumps(filters, "filters"), + include_expired, + include_retired, + ) return [_normalize_search_hit(item) for item in results] def list( @@ -449,6 +508,9 @@ def list( limit: int | None = None, offset: int | None = None, filters: dict[str, Any] | None = None, + *, + include_expired: bool = False, + include_retired: bool = False, ) -> list[dict[str, Any]]: """Return stored entries. @@ -458,12 +520,21 @@ def list( filters: Optional equality filters for built-in fields (bot_id, session_id, role, content_type), created_at range filters, or metadata fields. + include_expired: Include records whose ``expires_at`` is in the past. + include_retired: Include retired/superseded/revoked records. Returns: List of entry dicts with keys: id, run_id, role, content_type, - text, binary, embedding, created_at, metadata, state_metadata. + text, binary, embedding, created_at, metadata, state_metadata, and + lifecycle metadata. """ - results = self._inner.list(limit, offset, _json_dumps(filters, "filters")) + results = self._inner.list( + limit, + offset, + _json_dumps(filters, "filters"), + include_expired, + include_retired, + ) return [_normalize_record(item) for item in results] def get( @@ -610,6 +681,13 @@ async def add( session_id: str | None = None, external_id: str | None = None, metadata: dict[str, Any] | None = None, + expires_at: datetime | str | None = None, + retention_policy: str | None = None, + lifecycle_status: str | None = None, + retired_at: datetime | str | None = None, + retired_reason: str | None = None, + supersedes_id: str | None = None, + superseded_by_id: str | None = None, ) -> None: loop = asyncio.get_running_loop() await loop.run_in_executor( @@ -624,6 +702,13 @@ async def add( session_id=session_id, external_id=external_id, metadata=metadata, + expires_at=expires_at, + retention_policy=retention_policy, + lifecycle_status=lifecycle_status, + retired_at=retired_at, + retired_reason=retired_reason, + supersedes_id=supersedes_id, + superseded_by_id=superseded_by_id, ), ) @@ -643,10 +728,20 @@ async def search( query: Any, limit: int | None = None, filters: dict[str, Any] | None = None, + *, + include_expired: bool = False, + include_retired: bool = False, ) -> list[dict[str, Any]]: loop = asyncio.get_running_loop() return await loop.run_in_executor( - None, lambda: self._sync.search(query, limit, filters) + None, + lambda: self._sync.search( + query, + limit, + filters, + include_expired=include_expired, + include_retired=include_retired, + ), ) async def list( @@ -654,10 +749,20 @@ async def list( limit: int | None = None, offset: int | None = None, filters: dict[str, Any] | None = None, + *, + include_expired: bool = False, + include_retired: bool = False, ) -> list[dict[str, Any]]: loop = asyncio.get_running_loop() return await loop.run_in_executor( - None, lambda: self._sync.list(limit, offset, filters) + None, + lambda: self._sync.list( + limit, + offset, + filters, + include_expired=include_expired, + include_retired=include_retired, + ), ) async def compact( diff --git a/python/src/lib.rs b/python/src/lib.rs index c4e10a8..ebdf5cb 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -14,7 +14,8 @@ use tokio::runtime::Runtime; use lance_context::serde::CONTENT_TYPE_TEXT; use lance_context::{ CompactionConfig, CompactionMetrics, CompactionStats, Context as RustContext, ContextRecord, - ContextStore, ContextStoreOptions, IdIndexType, MetadataFilter, RecordFilters, SearchResult, + ContextStore, ContextStoreOptions, IdIndexType, LifecycleQueryOptions, MetadataFilter, + RecordFilters, SearchResult, LIFECYCLE_ACTIVE, }; const DEFAULT_BINARY_CONTENT_TYPE: &str = "application/octet-stream"; @@ -35,6 +36,18 @@ struct RecordInput { session_id: Option, external_id: Option, metadata_json: Option, + lifecycle: LifecycleFields, +} + +#[derive(Default)] +struct LifecycleFields { + expires_at: Option>, + retention_policy: Option, + lifecycle_status: Option, + retired_at: Option>, + retired_reason: Option, + supersedes_id: Option, + superseded_by_id: Option, } #[pyfunction] @@ -285,7 +298,7 @@ impl Context { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, external_id = None, metadata_json = None))] + #[pyo3(signature = (role, content, data_type = None, embedding = None, bot_id = None, session_id = None, external_id = None, metadata_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, supersedes_id = None, superseded_by_id = None))] fn add( &mut self, py: Python<'_>, @@ -297,7 +310,23 @@ impl Context { session_id: Option, external_id: Option, metadata_json: Option, + expires_at: Option, + retention_policy: Option, + lifecycle_status: Option, + retired_at: Option, + retired_reason: Option, + supersedes_id: Option, + superseded_by_id: Option, ) -> PyResult<()> { + let lifecycle = LifecycleFields { + expires_at: parse_optional_datetime(expires_at, "expires_at")?, + retention_policy, + lifecycle_status, + retired_at: parse_optional_datetime(retired_at, "retired_at")?, + retired_reason, + supersedes_id, + superseded_by_id, + }; let prepared = self.prepare_record( content, RecordInput { @@ -308,6 +337,7 @@ impl Context { session_id, external_id, metadata_json, + lifecycle, }, 1, )?; @@ -373,18 +403,26 @@ impl Context { Ok(()) } - #[pyo3(signature = (query, limit = None, filters_json = None))] + #[pyo3(signature = (query, limit = None, filters_json = None, include_expired = false, include_retired = false))] fn search( &self, py: Python<'_>, query: Vec, limit: Option, filters_json: Option, + include_expired: bool, + include_retired: bool, ) -> PyResult> { let filters = filters_from_json(filters_json)?; + let options = LifecycleQueryOptions::new(include_expired, include_retired); let hits_res = py.allow_threads(|| { self.runtime - .block_on(self.store.search_filtered(&query, limit, filters.as_ref())) + .block_on(self.store.search_filtered_with_options( + &query, + limit, + filters.as_ref(), + options, + )) }); let hits = hits_res.map_err(to_py_err)?; hits.into_iter() @@ -392,19 +430,27 @@ impl Context { .collect() } - #[pyo3(signature = (limit = None, offset = None, filters_json = None))] + #[pyo3(signature = (limit = None, offset = None, filters_json = None, include_expired = false, include_retired = false))] fn list( &self, py: Python<'_>, limit: Option, offset: Option, filters_json: Option, + include_expired: bool, + include_retired: bool, ) -> PyResult> { let filters = filters_from_json(filters_json)?; + let options = LifecycleQueryOptions::new(include_expired, include_retired); // Release GIL during data retrieval let records = py.allow_threads(|| { self.runtime - .block_on(self.store.list_filtered(limit, offset, filters.as_ref())) + .block_on(self.store.list_filtered_with_options( + limit, + offset, + filters.as_ref(), + options, + )) .map_err(to_py_err) })?; @@ -533,6 +579,28 @@ impl Context { optional_item(dict, "external_id")?.map(|value| value.extract::()); let metadata_json = optional_item(dict, "metadata_json")?.map(|value| value.extract::()); + let expires_at = optional_item(dict, "expires_at")?.map(|value| value.extract::()); + let retention_policy = + optional_item(dict, "retention_policy")?.map(|value| value.extract::()); + let lifecycle_status = + optional_item(dict, "lifecycle_status")?.map(|value| value.extract::()); + let retired_at = optional_item(dict, "retired_at")?.map(|value| value.extract::()); + let retired_reason = + optional_item(dict, "retired_reason")?.map(|value| value.extract::()); + let supersedes_id = + optional_item(dict, "supersedes_id")?.map(|value| value.extract::()); + let superseded_by_id = + optional_item(dict, "superseded_by_id")?.map(|value| value.extract::()); + + let lifecycle = LifecycleFields { + expires_at: parse_optional_datetime(expires_at.transpose()?, "expires_at")?, + retention_policy: retention_policy.transpose()?, + lifecycle_status: lifecycle_status.transpose()?, + retired_at: parse_optional_datetime(retired_at.transpose()?, "retired_at")?, + retired_reason: retired_reason.transpose()?, + supersedes_id: supersedes_id.transpose()?, + superseded_by_id: superseded_by_id.transpose()?, + }; self.prepare_record( &content, @@ -544,6 +612,7 @@ impl Context { session_id: session_id.transpose()?, external_id: external_id.transpose()?, metadata_json: metadata_json.transpose()?, + lifecycle, }, index as u64 + 1, ) @@ -564,6 +633,7 @@ impl Context { session_id, external_id, metadata_json, + lifecycle, } = input; let (content_type, text_payload, binary_payload, inner_content) = @@ -602,6 +672,15 @@ impl Context { role: role.clone(), state_metadata: None, metadata, + expires_at: lifecycle.expires_at, + retention_policy: lifecycle.retention_policy, + lifecycle_status: lifecycle + .lifecycle_status + .unwrap_or_else(|| LIFECYCLE_ACTIVE.to_string()), + retired_at: lifecycle.retired_at, + retired_reason: lifecycle.retired_reason, + supersedes_id: lifecycle.supersedes_id, + superseded_by_id: lifecycle.superseded_by_id, content_type, text_payload, binary_payload, @@ -628,6 +707,23 @@ fn optional_item<'py>(dict: &Bound<'py, PyDict>, key: &str) -> PyResult, + field_name: &str, +) -> PyResult>> { + value + .map(|value| { + DateTime::parse_from_rfc3339(&value) + .map(|dt| dt.with_timezone(&Utc)) + .map_err(|err| { + PyTypeError::new_err(format!( + "{field_name} must be an RFC3339 timestamp: {err}" + )) + }) + }) + .transpose() +} + fn compaction_metrics_to_py(py: Python<'_>, metrics: CompactionMetrics) -> PyResult { let dict = PyDict::new(py); dict.set_item("fragments_removed", metrics.fragments_removed)?; @@ -679,6 +775,13 @@ fn record_to_py(py: Python<'_>, record: ContextRecord) -> PyResult { role, state_metadata, metadata, + expires_at, + retention_policy, + lifecycle_status, + retired_at, + retired_reason, + supersedes_id, + superseded_by_id, content_type, text_payload, binary_payload, @@ -714,6 +817,19 @@ fn record_to_py(py: Python<'_>, record: ContextRecord) -> PyResult { None => py.None().into_pyobject(py)?.unbind(), }; dict.set_item("metadata", metadata_obj)?; + dict.set_item( + "expires_at", + expires_at.map(|dt| dt.to_rfc3339_opts(SecondsFormat::Micros, true)), + )?; + dict.set_item("retention_policy", retention_policy)?; + dict.set_item("lifecycle_status", lifecycle_status)?; + dict.set_item( + "retired_at", + retired_at.map(|dt| dt.to_rfc3339_opts(SecondsFormat::Micros, true)), + )?; + dict.set_item("retired_reason", retired_reason)?; + dict.set_item("supersedes_id", supersedes_id)?; + dict.set_item("superseded_by_id", superseded_by_id)?; dict.set_item("content_type", content_type)?; dict.set_item("text_payload", text_payload)?; match binary_payload { diff --git a/python/tests/test_persistence.py b/python/tests/test_persistence.py index c9b4db5..538a2ba 100644 --- a/python/tests/test_persistence.py +++ b/python/tests/test_persistence.py @@ -5,7 +5,7 @@ import sys import time import uuid -from datetime import datetime +from datetime import datetime, timedelta, timezone from io import BytesIO from pathlib import Path from typing import Any @@ -25,6 +25,12 @@ _S3_REGION = "us-east-1" +def _embedding(pivot: float) -> list[float]: + values = [0.0] * 1536 + values[0] = pivot + return values + + def _free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind(("127.0.0.1", 0)) @@ -250,6 +256,103 @@ def test_search_applies_filters_before_limit(tmp_path: Path) -> None: assert hits[0]["metadata"] == {"scope": "team", "tags": ["runbook"]} +def test_lifecycle_fields_round_trip_and_default_filtering(tmp_path: Path) -> None: + uri = tmp_path / "context.lance" + ctx = Context.create(str(uri)) + now = datetime.now(timezone.utc) + + ctx.add( + "user", + "active memory", + expires_at=now + timedelta(days=1), + retention_policy="long-term", + ) + ctx.add("assistant", "expired trace", expires_at=now - timedelta(days=1)) + ctx.add( + "system", + "superseded fact", + lifecycle_status="superseded", + retired_at=now, + retired_reason="replaced by newer fact", + superseded_by_id="active-id", + ) + ctx.add( + "system", + "failed approach", + lifecycle_status="contradicted", + retired_reason="negative knowledge", + ) + + visible = ctx.list() + assert [record["text"] for record in visible] == [ + "active memory", + "failed approach", + ] + assert visible[0]["retention_policy"] == "long-term" + assert visible[0]["lifecycle_status"] == "active" + + all_records = ctx.list(include_expired=True, include_retired=True) + assert [record["text"] for record in all_records] == [ + "active memory", + "expired trace", + "superseded fact", + "failed approach", + ] + + expired = all_records[1] + assert expired["expires_at"] is not None + assert expired["expires_at"] < datetime.now(timezone.utc) + + superseded = all_records[2] + assert superseded["lifecycle_status"] == "superseded" + assert superseded["retired_reason"] == "replaced by newer fact" + assert superseded["superseded_by_id"] == "active-id" + + +def test_search_applies_lifecycle_filter_before_limit(tmp_path: Path) -> None: + uri = tmp_path / "context.lance" + ctx = Context.create(str(uri)) + now = datetime.now(timezone.utc) + + ctx.add("user", "active memory", embedding=_embedding(1.0)) + ctx.add( + "assistant", + "expired but closer", + embedding=_embedding(0.0), + expires_at=now - timedelta(minutes=1), + ) + + hits = ctx.search(_embedding(0.0), limit=1) + assert [hit["text"] for hit in hits] == ["active memory"] + + hits_with_expired = ctx.search(_embedding(0.0), limit=1, include_expired=True) + assert [hit["text"] for hit in hits_with_expired] == ["expired but closer"] + + +def test_supersedes_pointer_hides_old_record_by_default(tmp_path: Path) -> None: + uri = tmp_path / "context.lance" + ctx = Context.create(str(uri)) + + ctx.add("user", "old value", embedding=_embedding(0.0)) + old = ctx.list()[0] + + ctx.add( + "user", + "new value", + embedding=_embedding(1.0), + supersedes_id=old["id"], + ) + + assert [record["text"] for record in ctx.list()] == ["new value"] + assert [record["text"] for record in ctx.search(_embedding(0.0), limit=10)] == [ + "new value" + ] + + history = ctx.list(include_retired=True) + assert [record["text"] for record in history] == ["old value", "new value"] + assert history[1]["supersedes_id"] == old["id"] + + def test_image_round_trip(tmp_path: Path) -> None: Image = pytest.importorskip("PIL.Image") uri = tmp_path / "context.lance" diff --git a/python/tests/test_search.py b/python/tests/test_search.py index b495a34..ab971c0 100644 --- a/python/tests/test_search.py +++ b/python/tests/test_search.py @@ -1,5 +1,5 @@ import json -from datetime import datetime +from datetime import datetime, timezone from typing import Any import pytest @@ -14,9 +14,12 @@ class DummyInner: def __init__(self) -> None: self.search_calls: list[tuple[list[float], int | None, str | None]] = [] + self.search_lifecycle_calls: list[tuple[bool, bool]] = [] self.list_calls: list[tuple[int | None, int | None, str | None]] = [] + self.list_lifecycle_calls: list[tuple[bool, bool]] = [] self.get_calls: list[tuple[str | None, str | None]] = [] self.delete_calls: list[tuple[str | None, str | None]] = [] + self.lifecycle_add_calls: list[dict[str, Any]] = [] self.add_calls: list[ tuple[ str, @@ -41,6 +44,13 @@ def add( session_id: str | None, external_id: str | None, metadata_json: str | None, + expires_at: str | None = None, + retention_policy: str | None = None, + lifecycle_status: str | None = None, + retired_at: str | None = None, + retired_reason: str | None = None, + supersedes_id: str | None = None, + superseded_by_id: str | None = None, ): self.add_calls.append( ( @@ -54,6 +64,17 @@ def add( metadata_json, ) ) + self.lifecycle_add_calls.append( + { + "expires_at": expires_at, + "retention_policy": retention_policy, + "lifecycle_status": lifecycle_status, + "retired_at": retired_at, + "retired_reason": retired_reason, + "supersedes_id": supersedes_id, + "superseded_by_id": superseded_by_id, + } + ) def get(self, id: str | None, external_id: str | None): self.get_calls.append((id, external_id)) @@ -65,8 +86,16 @@ def delete(self, id: str | None, external_id: str | None): self.delete_calls.append((id, external_id)) return id == "rec-1" or external_id == "source-1" - def search(self, vector: list[float], limit: int | None, filters_json: str | None): + def search( + self, + vector: list[float], + limit: int | None, + filters_json: str | None, + include_expired: bool = False, + include_retired: bool = False, + ): self.search_calls.append((vector, limit, filters_json)) + self.search_lifecycle_calls.append((include_expired, include_retired)) return [ { "id": "rec-1", @@ -83,14 +112,29 @@ def search(self, vector: list[float], limit: int | None, filters_json: str | Non "created_at": "2024-01-01T12:00:00Z", "state_metadata": {"step": 1}, "metadata": {"scope": "team", "tags": ["runbook"]}, + "expires_at": None, + "retention_policy": None, + "lifecycle_status": "active", + "retired_at": None, + "retired_reason": None, + "supersedes_id": None, + "superseded_by_id": None, } ] def add_many(self, records: list[dict[str, Any]]): self.add_many_calls.append(records) - def list(self, limit: int | None, offset: int | None, filters_json: str | None): + def list( + self, + limit: int | None, + offset: int | None, + filters_json: str | None, + include_expired: bool = False, + include_retired: bool = False, + ): self.list_calls.append((limit, offset, filters_json)) + self.list_lifecycle_calls.append((include_expired, include_retired)) return [ { "id": "rec-1", @@ -106,6 +150,13 @@ def list(self, limit: int | None, offset: int | None, filters_json: str | None): "created_at": "2024-01-01T12:00:00Z", "state_metadata": {"step": 1}, "metadata": {"scope": "team", "tags": ["runbook"]}, + "expires_at": None, + "retention_policy": None, + "lifecycle_status": "active", + "retired_at": None, + "retired_reason": None, + "supersedes_id": None, + "superseded_by_id": None, }, { "id": "rec-2", @@ -121,6 +172,13 @@ def list(self, limit: int | None, offset: int | None, filters_json: str | None): "created_at": "2024-01-02T12:00:00Z", "state_metadata": None, "metadata": None, + "expires_at": None, + "retention_policy": None, + "lifecycle_status": "active", + "retired_at": None, + "retired_reason": None, + "supersedes_id": None, + "superseded_by_id": None, }, ] @@ -185,6 +243,16 @@ def test_context_search_forwards_filters(): assert json.loads(filters_json) == {"bot_id": "support_bot", "scope": "team"} +def test_context_search_passes_lifecycle_flags(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + ctx.search([0.5, 0.4], include_expired=True, include_retired=True) + + assert dummy.search_lifecycle_calls == [(True, True)] + + def test_normalize_record_without_distance(): result = _normalize_record( { @@ -339,6 +407,16 @@ def test_context_list_forwards_filters(): } +def test_context_list_passes_lifecycle_flags(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + ctx.list(include_expired=True, include_retired=True) + + assert dummy.list_lifecycle_calls == [(True, True)] + + def test_context_add_with_embedding(): ctx = Context.__new__(Context) dummy = DummyInner() @@ -548,6 +626,47 @@ def test_context_add_rejects_non_json_metadata(): ctx.add("user", "hello", metadata={"bad": object()}) +def test_context_add_with_lifecycle_fields(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + expires_at = datetime(2026, 7, 1, tzinfo=timezone.utc) + retired_at = datetime(2026, 8, 1, tzinfo=timezone.utc) + ctx.add( + "user", + "hello", + expires_at=expires_at, + retention_policy="ttl:30d", + lifecycle_status="active", + retired_at=retired_at, + retired_reason="manual cleanup", + supersedes_id="old-id", + superseded_by_id="new-id", + ) + + assert dummy.lifecycle_add_calls == [ + { + "expires_at": "2026-07-01T00:00:00Z", + "retention_policy": "ttl:30d", + "lifecycle_status": "active", + "retired_at": "2026-08-01T00:00:00Z", + "retired_reason": "manual cleanup", + "supersedes_id": "old-id", + "superseded_by_id": "new-id", + } + ] + + +def test_context_add_rejects_naive_lifecycle_datetime(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + with pytest.raises(ValueError, match="timezone"): + ctx.add("user", "hello", expires_at=datetime(2026, 7, 1)) + + def test_context_add_many_normalizes_records(): ctx = Context.__new__(Context) dummy = DummyInner() @@ -579,6 +698,13 @@ def test_context_add_many_normalizes_records(): "session_id": None, "external_id": None, "metadata_json": None, + "expires_at": None, + "retention_policy": None, + "lifecycle_status": None, + "retired_at": None, + "retired_reason": None, + "supersedes_id": None, + "superseded_by_id": None, }, { "role": "assistant", @@ -589,6 +715,13 @@ def test_context_add_many_normalizes_records(): "session_id": "sess", "external_id": "doc-1#chunk-2", "metadata_json": None, + "expires_at": None, + "retention_policy": None, + "lifecycle_status": None, + "retired_at": None, + "retired_reason": None, + "supersedes_id": None, + "superseded_by_id": None, }, ] ] @@ -624,6 +757,31 @@ def test_context_add_many_forwards_metadata(): assert json.loads(metadata_json) == {"scope": "team", "tags": ["runbook"]} +def test_context_add_many_passes_lifecycle_fields(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + ctx.add_many( + [ + { + "role": "user", + "content": "hello", + "expires_at": datetime(2026, 7, 1, tzinfo=timezone.utc), + "retention_policy": "ttl:30d", + "lifecycle_status": "superseded", + "superseded_by_id": "new-id", + } + ] + ) + + record = dummy.add_many_calls[0][0] + assert record["expires_at"] == "2026-07-01T00:00:00Z" + assert record["retention_policy"] == "ttl:30d" + assert record["lifecycle_status"] == "superseded" + assert record["superseded_by_id"] == "new-id" + + def test_context_add_many_rejects_invalid_records(): ctx = Context.__new__(Context) dummy = DummyInner() @@ -668,3 +826,35 @@ def test_normalize_record_with_agent_and_session_id(): assert result["bot_id"] == "support_bot" assert result["session_id"] == "user_88" assert result["external_id"] == "source-1" + + +def test_normalize_record_with_lifecycle_fields(): + result = _normalize_record( + { + "id": "rec-1", + "external_id": None, + "created_at": "2024-01-01T00:00:00Z", + "content_type": "text/plain", + "text_payload": "hello", + "binary_payload": None, + "embedding": None, + "run_id": "run-1", + "role": "user", + "state_metadata": None, + "metadata": None, + "expires_at": "2026-07-01T00:00:00Z", + "retention_policy": "ttl:30d", + "lifecycle_status": "superseded", + "retired_at": "2026-08-01T00:00:00Z", + "retired_reason": "manual cleanup", + "supersedes_id": "old-id", + "superseded_by_id": "new-id", + } + ) + + assert isinstance(result["expires_at"], datetime) + assert isinstance(result["retired_at"], datetime) + assert result["lifecycle_status"] == "superseded" + assert result["retention_policy"] == "ttl:30d" + assert result["supersedes_id"] == "old-id" + assert result["superseded_by_id"] == "new-id"