Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,31 @@ ctx.add_many([
},
])

# Deferred embeddings: raw-first capture, enrich later.
#
# Bulk ingestion often needs to persist source chunks immediately and compute
# embeddings asynchronously (large documents, rate-limited or remote embedding
# providers). Append the raw text first with a stable external_id, then have a
# worker patch in the embedding once it is ready. A record without an embedding
# is durably stored but excluded from vector search until it is enriched.
ctx.add_many([
{
"role": "source",
"content": "Deferred chunk",
"external_id": "doc-77#chunk-1",
"metadata": {"embedding_status": "pending"},
},
])

# ...later, from your own worker/queue/batch job:
vector = [0.0] * 1536
ctx.update(
external_id="doc-77#chunk-1",
embedding=vector, # attach the freshly computed vector
metadata={"embedding_status": "ready"},
)
# The enriched record now shows up in vector search and hybrid retrieve.

# Time-travel to prior state
first_version = ctx.version()
ctx.add("assistant", "Let me fetch suggestions…")
Expand Down
3 changes: 3 additions & 0 deletions crates/lance-context-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ pub struct RecordPatchDto {
pub retired_at: Option<DateTime<Utc>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub retired_reason: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub embedding: Option<Vec<f32>>,
}

impl RecordPatchDto {
Expand All @@ -257,6 +259,7 @@ impl RecordPatchDto {
&& self.lifecycle_status.is_none()
&& self.retired_at.is_none()
&& self.retired_reason.is_none()
&& self.embedding.is_none()
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/lance-context-core/src/api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch {
lifecycle_status: patch.lifecycle_status.clone(),
retired_at: patch.retired_at,
retired_reason: patch.retired_reason.clone(),
embedding: patch.embedding.clone(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/lance-context-core/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ pub struct RecordPatch {
pub lifecycle_status: Option<String>,
pub retired_at: Option<DateTime<Utc>>,
pub retired_reason: Option<String>,
/// Vector embedding to attach to the record. Enables deferred embedding
/// workflows: append raw text first, then enrich with an embedding later.
pub embedding: Option<Vec<f32>>,
}

impl RecordPatch {
Expand All @@ -180,6 +183,7 @@ impl RecordPatch {
&& self.lifecycle_status.is_none()
&& self.retired_at.is_none()
&& self.retired_reason.is_none()
&& self.embedding.is_none()
}
}

Expand Down
64 changes: 64 additions & 0 deletions crates/lance-context-core/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,9 @@ impl ContextStore {
if let Some(retired_reason) = patch.retired_reason {
record.retired_reason = Some(retired_reason);
}
if let Some(embedding) = patch.embedding {
record.embedding = Some(embedding);
}

self.validate_new_record_id(&record).await?;
let version = self.write_entries(std::slice::from_ref(&record)).await?;
Expand Down Expand Up @@ -3059,6 +3062,67 @@ mod tests {
});
}

#[test]
fn deferred_embedding_patch_makes_raw_record_searchable() {
let dir = TempDir::new().unwrap();
let uri = dir.path().to_string_lossy().to_string();
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
let mut store = ContextStore::open(&uri).await.unwrap();

// Raw-first capture: append source chunks without embeddings.
let mut by_ext = text_record("raw-ext", 0.0);
by_ext.embedding = None;
by_ext.external_id = Some("doc-1#chunk-1".to_string());
let mut by_id = text_record("raw-id", 0.0);
by_id.embedding = None;
by_id.external_id = None;
store.add(&[by_ext.clone(), by_id.clone()]).await.unwrap();

// Records without an embedding are invisible to vector search.
let query = make_embedding(1.0);
assert!(store.search(&query, Some(10)).await.unwrap().is_empty());

// Enrich-later: patch the embedding by external_id...
let enriched_ext = store
.update_by_external_id(
"doc-1#chunk-1",
RecordPatch {
embedding: Some(make_embedding(1.0)),
..Default::default()
},
)
.await
.unwrap()
.unwrap();
assert_eq!(enriched_ext.record.embedding, Some(make_embedding(1.0)));
// Raw payload is carried forward onto the superseding record.
assert_eq!(enriched_ext.record.text_payload, by_ext.text_payload);

// ...and by internal id.
let enriched_id = store
.update_by_id(
&by_id.id,
RecordPatch {
embedding: Some(make_embedding(0.0)),
..Default::default()
},
)
.await
.unwrap()
.unwrap();
assert_eq!(enriched_id.record.embedding, Some(make_embedding(0.0)));

// Both records now participate in vector search.
let results = store.search(&query, Some(10)).await.unwrap();
let ids: Vec<&str> = results.iter().map(|r| r.record.id.as_str()).collect();
assert!(ids.contains(&enriched_ext.record.id.as_str()));
assert!(ids.contains(&enriched_id.record.id.as_str()));
// The query matches the external_id record exactly (distance 0).
assert_eq!(results[0].record.id, enriched_ext.record.id);
});
}

#[test]
fn relationships_roundtrip_and_support_related_lookup() {
let dir = TempDir::new().unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/lance-context-server/src/routes/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ fn patch_from_dto(patch: &RecordPatchDto) -> RecordPatch {
lifecycle_status: patch.lifecycle_status.clone(),
retired_at: patch.retired_at,
retired_reason: patch.retired_reason.clone(),
embedding: patch.embedding.clone(),
}
}

Expand Down
10 changes: 9 additions & 1 deletion python/python/lance_context/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,14 @@ def update(
lifecycle_status: str | None = None,
retired_at: datetime | str | None = None,
retired_reason: str | None = None,
embedding: list[float] | None = None,
) -> dict[str, Any]:
"""Patch mutable fields on a visible record by id or external_id."""
"""Patch mutable fields on a visible record by id or external_id.

Pass ``embedding`` to attach or replace a record's vector after it was
appended without one (deferred / enrich-later ingestion). The updated
record participates in vector search once the embedding is set.
"""
if (id is None) == (external_id is None):
raise ValueError("Specify exactly one of id or external_id")
if (
Expand All @@ -547,6 +553,7 @@ def update(
and lifecycle_status is None
and retired_at is None
and retired_reason is None
and embedding is None
):
raise ValueError("update requires at least one patch field")

Expand All @@ -562,6 +569,7 @@ def update(
lifecycle_status,
_coerce_timestamp(retired_at, field_name="retired_at"),
retired_reason,
embedding,
)
record = result.get("record")
return {
Expand Down
4 changes: 3 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ impl Context {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None))]
#[pyo3(signature = (id = None, external_id = None, bot_id = None, session_id = None, metadata_json = None, relationships_json = None, expires_at = None, retention_policy = None, lifecycle_status = None, retired_at = None, retired_reason = None, embedding = None))]
fn update(
&mut self,
py: Python<'_>,
Expand All @@ -390,6 +390,7 @@ impl Context {
lifecycle_status: Option<String>,
retired_at: Option<String>,
retired_reason: Option<String>,
embedding: Option<Vec<f32>>,
) -> PyResult<PyObject> {
let patch = RecordPatch {
bot_id,
Expand All @@ -402,6 +403,7 @@ impl Context {
lifecycle_status,
retired_at: parse_optional_datetime(retired_at, "retired_at")?,
retired_reason,
embedding,
};
if patch.is_empty() {
return Err(PyRuntimeError::new_err(
Expand Down
106 changes: 106 additions & 0 deletions python/tests/test_deferred_embedding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""End-to-end tests for deferred embedding workflows (issue #88).

Raw-first ingestion: append source chunks without embeddings, then enrich each
record with an embedding later via ``update()``. The enriched record must then
participate in vector search.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pathlib import Path

from lance_context.api import Context


def _embedding(value: float) -> list[float]:
vector = [0.0] * 1536
vector[0] = value
return vector


def test_update_attaches_embedding_by_external_id(tmp_path: Path) -> None:
uri = str(tmp_path / "context.lance")
ctx = Context.create(uri)

# Raw-first capture: persist the source chunk immediately, no embedding yet.
ctx.add(
"user",
"raw source chunk",
external_id="doc-1#chunk-1",
metadata={"embedding_status": "pending"},
)

raw = ctx.get(external_id="doc-1#chunk-1")
assert raw is not None
assert raw["embedding"] is None

# Without an embedding the record is invisible to vector search.
assert ctx.search(_embedding(1.0), limit=10) == []

# Enrich-later: a worker computes the embedding and patches it in.
result = ctx.update(
external_id="doc-1#chunk-1",
embedding=_embedding(1.0),
metadata={"embedding_status": "ready"},
)
assert result["updated"] is True
assert result["record"]["embedding"] == _embedding(1.0)
# Raw payload is preserved across the enrich update.
assert result["record"]["text"] == "raw source chunk"

# The enriched record now participates in vector search.
hits = ctx.search(_embedding(1.0), limit=10)
assert [hit["external_id"] for hit in hits] == ["doc-1#chunk-1"]


def test_update_attaches_embedding_by_id(tmp_path: Path) -> None:
uri = str(tmp_path / "context.lance")
ctx = Context.create(uri)

ctx.add("user", "raw source chunk")
raw = ctx.list()[0]
assert raw["embedding"] is None

result = ctx.update(id=raw["id"], embedding=_embedding(0.0))
assert result["updated"] is True
assert result["record"]["embedding"] == _embedding(0.0)

hits = ctx.search(_embedding(0.0), limit=10)
assert len(hits) == 1
assert hits[0]["id"] == result["record"]["id"]


def test_embedding_only_is_a_valid_patch(tmp_path: Path) -> None:
"""An embedding-only patch must be accepted (no other field required)."""
uri = str(tmp_path / "context.lance")
ctx = Context.create(uri)

ctx.add("user", "raw source chunk", external_id="doc-2#chunk-1")
result = ctx.update(external_id="doc-2#chunk-1", embedding=_embedding(1.0))
assert result["updated"] is True
assert result["record"]["embedding"] == _embedding(1.0)


def test_bulk_raw_first_then_enrich(tmp_path: Path) -> None:
"""add_many() raw chunks, then enrich each by external_id."""
uri = str(tmp_path / "context.lance")
ctx = Context.create(uri)

ctx.add_many(
[
{"role": "user", "content": "chunk a", "external_id": "doc-3#chunk-1"},
{"role": "user", "content": "chunk b", "external_id": "doc-3#chunk-2"},
]
)
assert ctx.search(_embedding(1.0), limit=10) == []

for ext_id, pivot in (("doc-3#chunk-1", 0.0), ("doc-3#chunk-2", 1.0)):
ctx.update(external_id=ext_id, embedding=_embedding(pivot))

hits = ctx.search(_embedding(1.0), limit=10)
assert {hit["external_id"] for hit in hits} == {"doc-3#chunk-1", "doc-3#chunk-2"}
# The exact match ranks first.
assert hits[0]["external_id"] == "doc-3#chunk-2"
25 changes: 25 additions & 0 deletions python/tests/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def update(
lifecycle_status: str | None,
retired_at: str | None,
retired_reason: str | None,
embedding: list[float] | None = None,
):
self.update_calls.append(
{
Expand All @@ -191,6 +192,7 @@ def update(
"lifecycle_status": lifecycle_status,
"retired_at": retired_at,
"retired_reason": retired_reason,
"embedding": embedding,
}
)
if id == "missing" or external_id == "missing":
Expand Down Expand Up @@ -1159,6 +1161,7 @@ def test_context_update_returns_operation_metadata_and_record():
"lifecycle_status": "active",
"retired_at": None,
"retired_reason": None,
"embedding": None,
}
]
assert result["updated"] is True
Expand All @@ -1173,6 +1176,28 @@ def test_context_update_returns_operation_metadata_and_record():
assert result["record"]["supersedes_id"] == "old-id"


def test_context_update_forwards_embedding():
ctx = Context.__new__(Context)
dummy = DummyInner()
ctx._inner = dummy # type: ignore[attr-defined]

ctx.update(external_id="source-1", embedding=[0.1, 0.2, 0.3])

assert dummy.update_calls[0]["embedding"] == [0.1, 0.2, 0.3]


def test_context_update_accepts_embedding_only_patch():
ctx = Context.__new__(Context)
dummy = DummyInner()
ctx._inner = dummy # type: ignore[attr-defined]

# An embedding is sufficient on its own; no "at least one patch field" error.
result = ctx.update(id="rec-1", embedding=[0.1, 0.2])

assert dummy.update_calls[0]["embedding"] == [0.1, 0.2]
assert result["updated"] is True


def test_context_update_missing_record_returns_not_updated():
ctx = Context.__new__(Context)
dummy = DummyInner()
Expand Down
Loading