diff --git a/README.md b/README.md index 663675f..a87e3d3 100644 --- a/README.md +++ b/README.md @@ -172,6 +172,31 @@ ctx.add_many([ }, ]) +# Deferred embeddings: raw-first capture, enrich later. +# +# Bulk ingestion often needs to persist source chunks immediately and compute +# embeddings asynchronously (large documents, rate-limited or remote embedding +# providers). Append the raw text first with a stable external_id, then have a +# worker patch in the embedding once it is ready. A record without an embedding +# is durably stored but excluded from vector search until it is enriched. +ctx.add_many([ + { + "role": "source", + "content": "Deferred chunk", + "external_id": "doc-77#chunk-1", + "metadata": {"embedding_status": "pending"}, + }, +]) + +# ...later, from your own worker/queue/batch job: +vector = [0.0] * 1536 +ctx.update( + external_id="doc-77#chunk-1", + embedding=vector, # attach the freshly computed vector + metadata={"embedding_status": "ready"}, +) +# The enriched record now shows up in vector search and hybrid retrieve. + # Time-travel to prior state first_version = ctx.version() ctx.add("assistant", "Let me fetch suggestions…") diff --git a/crates/lance-context-api/src/lib.rs b/crates/lance-context-api/src/lib.rs index bac85e7..dd96b45 100644 --- a/crates/lance-context-api/src/lib.rs +++ b/crates/lance-context-api/src/lib.rs @@ -242,6 +242,8 @@ pub struct RecordPatchDto { pub retired_at: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub retired_reason: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub embedding: Option>, } impl RecordPatchDto { @@ -257,6 +259,7 @@ impl RecordPatchDto { && self.lifecycle_status.is_none() && self.retired_at.is_none() && self.retired_reason.is_none() + && self.embedding.is_none() } } diff --git a/crates/lance-context-core/src/api_impl.rs b/crates/lance-context-core/src/api_impl.rs index a05ed14..4052480 100644 --- a/crates/lance-context-core/src/api_impl.rs +++ b/crates/lance-context-core/src/api_impl.rs @@ -330,6 +330,7 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch { lifecycle_status: patch.lifecycle_status.clone(), retired_at: patch.retired_at, retired_reason: patch.retired_reason.clone(), + embedding: patch.embedding.clone(), } } diff --git a/crates/lance-context-core/src/record.rs b/crates/lance-context-core/src/record.rs index abfa3c2..71306d6 100644 --- a/crates/lance-context-core/src/record.rs +++ b/crates/lance-context-core/src/record.rs @@ -165,6 +165,9 @@ pub struct RecordPatch { pub lifecycle_status: Option, pub retired_at: Option>, pub retired_reason: Option, + /// Vector embedding to attach to the record. Enables deferred embedding + /// workflows: append raw text first, then enrich with an embedding later. + pub embedding: Option>, } impl RecordPatch { @@ -180,6 +183,7 @@ impl RecordPatch { && self.lifecycle_status.is_none() && self.retired_at.is_none() && self.retired_reason.is_none() + && self.embedding.is_none() } } diff --git a/crates/lance-context-core/src/store.rs b/crates/lance-context-core/src/store.rs index 8ddfd37..693bf98 100644 --- a/crates/lance-context-core/src/store.rs +++ b/crates/lance-context-core/src/store.rs @@ -619,6 +619,9 @@ impl ContextStore { if let Some(retired_reason) = patch.retired_reason { record.retired_reason = Some(retired_reason); } + if let Some(embedding) = patch.embedding { + record.embedding = Some(embedding); + } self.validate_new_record_id(&record).await?; let version = self.write_entries(std::slice::from_ref(&record)).await?; @@ -3059,6 +3062,67 @@ mod tests { }); } + #[test] + fn deferred_embedding_patch_makes_raw_record_searchable() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async { + let mut store = ContextStore::open(&uri).await.unwrap(); + + // Raw-first capture: append source chunks without embeddings. + let mut by_ext = text_record("raw-ext", 0.0); + by_ext.embedding = None; + by_ext.external_id = Some("doc-1#chunk-1".to_string()); + let mut by_id = text_record("raw-id", 0.0); + by_id.embedding = None; + by_id.external_id = None; + store.add(&[by_ext.clone(), by_id.clone()]).await.unwrap(); + + // Records without an embedding are invisible to vector search. + let query = make_embedding(1.0); + assert!(store.search(&query, Some(10)).await.unwrap().is_empty()); + + // Enrich-later: patch the embedding by external_id... + let enriched_ext = store + .update_by_external_id( + "doc-1#chunk-1", + RecordPatch { + embedding: Some(make_embedding(1.0)), + ..Default::default() + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(enriched_ext.record.embedding, Some(make_embedding(1.0))); + // Raw payload is carried forward onto the superseding record. + assert_eq!(enriched_ext.record.text_payload, by_ext.text_payload); + + // ...and by internal id. + let enriched_id = store + .update_by_id( + &by_id.id, + RecordPatch { + embedding: Some(make_embedding(0.0)), + ..Default::default() + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(enriched_id.record.embedding, Some(make_embedding(0.0))); + + // Both records now participate in vector search. + let results = store.search(&query, Some(10)).await.unwrap(); + let ids: Vec<&str> = results.iter().map(|r| r.record.id.as_str()).collect(); + assert!(ids.contains(&enriched_ext.record.id.as_str())); + assert!(ids.contains(&enriched_id.record.id.as_str())); + // The query matches the external_id record exactly (distance 0). + assert_eq!(results[0].record.id, enriched_ext.record.id); + }); + } + #[test] fn relationships_roundtrip_and_support_related_lookup() { let dir = TempDir::new().unwrap(); diff --git a/crates/lance-context-server/src/routes/records.rs b/crates/lance-context-server/src/routes/records.rs index fcd8344..dfaea13 100644 --- a/crates/lance-context-server/src/routes/records.rs +++ b/crates/lance-context-server/src/routes/records.rs @@ -397,6 +397,7 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch { lifecycle_status: patch.lifecycle_status.clone(), retired_at: patch.retired_at, retired_reason: patch.retired_reason.clone(), + embedding: patch.embedding.clone(), } } diff --git a/python/python/lance_context/api.py b/python/python/lance_context/api.py index 7f0e1c0..3c1dae1 100644 --- a/python/python/lance_context/api.py +++ b/python/python/lance_context/api.py @@ -533,8 +533,14 @@ def update( lifecycle_status: str | None = None, retired_at: datetime | str | None = None, retired_reason: str | None = None, + embedding: list[float] | None = None, ) -> dict[str, Any]: - """Patch mutable fields on a visible record by id or external_id.""" + """Patch mutable fields on a visible record by id or external_id. + + Pass ``embedding`` to attach or replace a record's vector after it was + appended without one (deferred / enrich-later ingestion). The updated + record participates in vector search once the embedding is set. + """ if (id is None) == (external_id is None): raise ValueError("Specify exactly one of id or external_id") if ( @@ -547,6 +553,7 @@ def update( and lifecycle_status is None and retired_at is None and retired_reason is None + and embedding is None ): raise ValueError("update requires at least one patch field") @@ -562,6 +569,7 @@ def update( lifecycle_status, _coerce_timestamp(retired_at, field_name="retired_at"), retired_reason, + embedding, ) record = result.get("record") return { diff --git a/python/src/lib.rs b/python/src/lib.rs index d0b4c26..fd68108 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -375,7 +375,7 @@ impl Context { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None))] + #[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, embedding = None))] fn update( &mut self, py: Python<'_>, @@ -390,6 +390,7 @@ impl Context { lifecycle_status: Option, retired_at: Option, retired_reason: Option, + embedding: Option>, ) -> PyResult { let patch = RecordPatch { bot_id, @@ -402,6 +403,7 @@ impl Context { lifecycle_status, retired_at: parse_optional_datetime(retired_at, "retired_at")?, retired_reason, + embedding, }; if patch.is_empty() { return Err(PyRuntimeError::new_err( diff --git a/python/tests/test_deferred_embedding.py b/python/tests/test_deferred_embedding.py new file mode 100644 index 0000000..e9709a1 --- /dev/null +++ b/python/tests/test_deferred_embedding.py @@ -0,0 +1,106 @@ +"""End-to-end tests for deferred embedding workflows (issue #88). + +Raw-first ingestion: append source chunks without embeddings, then enrich each +record with an embedding later via ``update()``. The enriched record must then +participate in vector search. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pathlib import Path + +from lance_context.api import Context + + +def _embedding(value: float) -> list[float]: + vector = [0.0] * 1536 + vector[0] = value + return vector + + +def test_update_attaches_embedding_by_external_id(tmp_path: Path) -> None: + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + # Raw-first capture: persist the source chunk immediately, no embedding yet. + ctx.add( + "user", + "raw source chunk", + external_id="doc-1#chunk-1", + metadata={"embedding_status": "pending"}, + ) + + raw = ctx.get(external_id="doc-1#chunk-1") + assert raw is not None + assert raw["embedding"] is None + + # Without an embedding the record is invisible to vector search. + assert ctx.search(_embedding(1.0), limit=10) == [] + + # Enrich-later: a worker computes the embedding and patches it in. + result = ctx.update( + external_id="doc-1#chunk-1", + embedding=_embedding(1.0), + metadata={"embedding_status": "ready"}, + ) + assert result["updated"] is True + assert result["record"]["embedding"] == _embedding(1.0) + # Raw payload is preserved across the enrich update. + assert result["record"]["text"] == "raw source chunk" + + # The enriched record now participates in vector search. + hits = ctx.search(_embedding(1.0), limit=10) + assert [hit["external_id"] for hit in hits] == ["doc-1#chunk-1"] + + +def test_update_attaches_embedding_by_id(tmp_path: Path) -> None: + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + ctx.add("user", "raw source chunk") + raw = ctx.list()[0] + assert raw["embedding"] is None + + result = ctx.update(id=raw["id"], embedding=_embedding(0.0)) + assert result["updated"] is True + assert result["record"]["embedding"] == _embedding(0.0) + + hits = ctx.search(_embedding(0.0), limit=10) + assert len(hits) == 1 + assert hits[0]["id"] == result["record"]["id"] + + +def test_embedding_only_is_a_valid_patch(tmp_path: Path) -> None: + """An embedding-only patch must be accepted (no other field required).""" + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + ctx.add("user", "raw source chunk", external_id="doc-2#chunk-1") + result = ctx.update(external_id="doc-2#chunk-1", embedding=_embedding(1.0)) + assert result["updated"] is True + assert result["record"]["embedding"] == _embedding(1.0) + + +def test_bulk_raw_first_then_enrich(tmp_path: Path) -> None: + """add_many() raw chunks, then enrich each by external_id.""" + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + ctx.add_many( + [ + {"role": "user", "content": "chunk a", "external_id": "doc-3#chunk-1"}, + {"role": "user", "content": "chunk b", "external_id": "doc-3#chunk-2"}, + ] + ) + assert ctx.search(_embedding(1.0), limit=10) == [] + + for ext_id, pivot in (("doc-3#chunk-1", 0.0), ("doc-3#chunk-2", 1.0)): + ctx.update(external_id=ext_id, embedding=_embedding(pivot)) + + hits = ctx.search(_embedding(1.0), limit=10) + assert {hit["external_id"] for hit in hits} == {"doc-3#chunk-1", "doc-3#chunk-2"} + # The exact match ranks first. + assert hits[0]["external_id"] == "doc-3#chunk-2" diff --git a/python/tests/test_search.py b/python/tests/test_search.py index 9ce7c90..a02ab87 100644 --- a/python/tests/test_search.py +++ b/python/tests/test_search.py @@ -177,6 +177,7 @@ def update( lifecycle_status: str | None, retired_at: str | None, retired_reason: str | None, + embedding: list[float] | None = None, ): self.update_calls.append( { @@ -191,6 +192,7 @@ def update( "lifecycle_status": lifecycle_status, "retired_at": retired_at, "retired_reason": retired_reason, + "embedding": embedding, } ) if id == "missing" or external_id == "missing": @@ -1159,6 +1161,7 @@ def test_context_update_returns_operation_metadata_and_record(): "lifecycle_status": "active", "retired_at": None, "retired_reason": None, + "embedding": None, } ] assert result["updated"] is True @@ -1173,6 +1176,28 @@ def test_context_update_returns_operation_metadata_and_record(): assert result["record"]["supersedes_id"] == "old-id" +def test_context_update_forwards_embedding(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + ctx.update(external_id="source-1", embedding=[0.1, 0.2, 0.3]) + + assert dummy.update_calls[0]["embedding"] == [0.1, 0.2, 0.3] + + +def test_context_update_accepts_embedding_only_patch(): + ctx = Context.__new__(Context) + dummy = DummyInner() + ctx._inner = dummy # type: ignore[attr-defined] + + # An embedding is sufficient on its own; no "at least one patch field" error. + result = ctx.update(id="rec-1", embedding=[0.1, 0.2]) + + assert dummy.update_calls[0]["embedding"] == [0.1, 0.2] + assert result["updated"] is True + + def test_context_update_missing_record_returns_not_updated(): ctx = Context.__new__(Context) dummy = DummyInner()