Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ crate-type = ["cdylib"]

[dependencies]
chrono = { version = "0.4", default-features = false, features = ["clock"] }
lance-context-api = { path = "../crates/lance-context-api" }
lance-context-client = { path = "../crates/lance-context-client" }
lance-context-core = { path = "../crates/lance-context-core" }
pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39", "py-clone"] }
serde_json = "1"
Expand Down
2 changes: 2 additions & 0 deletions python/python/lance_context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Context,
ContextNamespace,
EmbeddingProvider,
RemoteContext,
__version__,
)

Expand All @@ -13,5 +14,6 @@
"Context",
"ContextNamespace",
"EmbeddingProvider",
"RemoteContext",
"__version__",
]
327 changes: 327 additions & 0 deletions python/python/lance_context/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from ._internal import ( # pyright: ignore[reportMissingImports]
ContextNamespace as _ContextNamespace,
)
from ._internal import ( # pyright: ignore[reportMissingImports]
RemoteContext as _RemoteContext,
)
from ._internal import version as _version # pyright: ignore[reportMissingImports]
from .embeddings import EmbeddingProvider, _build_provider

Expand All @@ -20,6 +23,7 @@
"Context",
"ContextNamespace",
"EmbeddingProvider",
"RemoteContext",
"__version__",
]

Expand Down Expand Up @@ -1364,3 +1368,326 @@ def __repr__(self) -> str:
f"branch={self._sync.branch()!r}, "
f"entries={self._sync.entries()})"
)


class RemoteContext:
"""Async wrapper for a remote lance-context server over HTTP."""

def __init__(self, sync_ctx: _RemoteContext) -> None:
self._sync = sync_ctx

@classmethod
async def connect(cls, base_url: str, name: str) -> "RemoteContext":
"""Connect to a remote context server."""
loop = asyncio.get_running_loop()
sync_ctx = await loop.run_in_executor(
None, lambda: _RemoteContext.connect(base_url, name)
)
return cls(sync_ctx)

def version(self) -> int:
"""Return the current version (cached, sync)."""
return self._sync.version()

async def add(
self,
*,
role: str = "user",
content: Any = None,
content_type: str | None = None,
embedding: Iterable[float] | None = None,
bot_id: str | None = None,
session_id: str | None = None,
external_id: str | None = None,
state_metadata: Mapping[str, Any] | None = None,
metadata: Any = None,
expires_at: datetime | str | None = None,
retention_policy: str | None = None,
supersedes_id: str | None = None,
relationships: Iterable[Mapping[str, Any]] | None = None,
) -> dict[str, Any]:
"""Add a record to the remote context."""
payload, data_type = _normalize_content(content, content_type)
emb = _coerce_vector(embedding) if embedding is not None else None
sm = _normalize_state_metadata(state_metadata)
meta_json = _json_dumps(metadata, "metadata")
rel_list = list(relationships) if relationships else None
rel_json = _json_dumps(rel_list, "relationships")
exp = _coerce_timestamp(expires_at, field_name="expires_at")

loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None,
lambda: self._sync.add(
role=role,
content=payload,
data_type=data_type,
embedding=emb,
bot_id=bot_id,
session_id=session_id,
external_id=external_id,
state_metadata=sm,
metadata_json=meta_json,
expires_at=exp,
retention_policy=retention_policy,
supersedes_id=supersedes_id,
relationships_json=rel_json,
),
)

async def upsert(
self,
*,
role: str = "user",
content: Any = None,
content_type: str | None = None,
embedding: Iterable[float] | None = None,
bot_id: str | None = None,
session_id: str | None = None,
external_id: str | None = None,
metadata: Any = None,
expires_at: datetime | str | None = None,
retention_policy: str | None = None,
supersedes_id: str | None = None,
relationships: Iterable[Mapping[str, Any]] | None = None,
key: str = "external_id",
) -> dict[str, Any]:
"""Upsert a record by external_id."""
payload, data_type = _normalize_content(content, content_type)
emb = _coerce_vector(embedding) if embedding is not None else None
meta_json = _json_dumps(metadata, "metadata")
rel_list = list(relationships) if relationships else None
rel_json = _json_dumps(rel_list, "relationships")
exp = _coerce_timestamp(expires_at, field_name="expires_at")

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
lambda: self._sync.upsert(
role=role,
content=payload,
data_type=data_type,
embedding=emb,
bot_id=bot_id,
session_id=session_id,
external_id=external_id,
metadata_json=meta_json,
expires_at=exp,
retention_policy=retention_policy,
supersedes_id=supersedes_id,
relationships_json=rel_json,
key=key,
),
)
result["record"] = _normalize_record(result["record"])
return result

async def update(
self,
*,
id: str | None = None,
external_id: str | None = None,
bot_id: str | None = None,
session_id: str | None = None,
metadata: Any = None,
relationships: Iterable[Mapping[str, Any]] | None = None,
expires_at: datetime | str | None = None,
retention_policy: str | None = None,
lifecycle_status: str | None = None,
retired_at: datetime | str | None = None,
retired_reason: str | None = None,
embedding: Iterable[float] | None = None,
) -> dict[str, Any]:
"""Update a record by id or external_id."""
meta_json = _json_dumps(metadata, "metadata")
rel_list = list(relationships) if relationships else None
rel_json = _json_dumps(rel_list, "relationships")
exp = _coerce_timestamp(expires_at, field_name="expires_at")
ret = _coerce_timestamp(retired_at, field_name="retired_at")
emb = _coerce_vector(embedding) if embedding is not None else None

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
lambda: self._sync.update(
id=id,
external_id=external_id,
bot_id=bot_id,
session_id=session_id,
metadata_json=meta_json,
relationships_json=rel_json,
expires_at=exp,
retention_policy=retention_policy,
lifecycle_status=lifecycle_status,
retired_at=ret,
retired_reason=retired_reason,
embedding=emb,
),
)
if result.get("record") is not None:
result["record"] = _normalize_record(result["record"])
return result

async def get(
self,
*,
id: str | None = None,
external_id: str | None = None,
) -> dict[str, Any] | None:
"""Get a record by id or external_id."""
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
lambda: self._sync.get(id=id, external_id=external_id),
)
return _normalize_record(result) if result is not None else None

async def delete(
self,
*,
id: str | None = None,
external_id: str | None = None,
) -> bool:
"""Delete a record by id or external_id."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None,
lambda: self._sync.delete(id=id, external_id=external_id),
)

async def list(
self,
*,
limit: int | None = None,
offset: int | None = None,
filters: Any = None,
include_expired: bool = False,
include_retired: bool = False,
) -> list[dict[str, Any]]:
"""List records with optional filters."""
filters_json = _json_dumps(filters, "filters")
loop = asyncio.get_running_loop()
results = await loop.run_in_executor(
None,
lambda: self._sync.list(
limit=limit,
offset=offset,
filters_json=filters_json,
include_expired=include_expired,
include_retired=include_retired,
),
)
return [_normalize_record(r) for r in results]

async def related(
self,
target_id: str,
*,
relation: str | None = None,
limit: int | None = None,
include_expired: bool = False,
include_retired: bool = False,
) -> list[dict[str, Any]]:
"""Get records related to a target."""
loop = asyncio.get_running_loop()
results = await loop.run_in_executor(
None,
lambda: self._sync.related(
target_id=target_id,
relation=relation,
limit=limit,
include_expired=include_expired,
include_retired=include_retired,
),
)
return [_normalize_record(r) for r in results]

async def search(
self,
query: Iterable[float],
*,
limit: int | None = None,
filters: Any = None,
include_expired: bool = False,
include_retired: bool = False,
include_relationships: bool = False,
) -> list[dict[str, Any]]:
"""Vector search over the context."""
q = _coerce_vector(query)
filters_json = _json_dumps(filters, "filters")
loop = asyncio.get_running_loop()
results = await loop.run_in_executor(
None,
lambda: self._sync.search(
query=q,
limit=limit,
filters_json=filters_json,
include_expired=include_expired,
include_retired=include_retired,
include_relationships=include_relationships,
),
)
return [_normalize_search_hit(h) for h in results]

async def retrieve(
self,
*,
text: str | None = None,
vector: Iterable[float] | None = None,
limit: int | None = None,
filters: Any = None,
include_expired: bool = False,
include_retired: bool = False,
include_relationships: bool = False,
fusion: str | None = None,
) -> list[dict[str, Any]]:
"""Hybrid retrieval (text + vector)."""
v = _coerce_vector(vector) if vector is not None else None
filters_json = _json_dumps(filters, "filters")
loop = asyncio.get_running_loop()
results = await loop.run_in_executor(
None,
lambda: self._sync.retrieve(
text=text,
vector=v,
limit=limit,
filters_json=filters_json,
include_expired=include_expired,
include_retired=include_retired,
include_relationships=include_relationships,
fusion=fusion,
),
)
return [_normalize_retrieve_hit(h) for h in results]

async def checkout(self, version: int) -> None:
"""Checkout a specific version."""
loop = asyncio.get_running_loop()
await loop.run_in_executor(
None,
lambda: self._sync.checkout(version),
)

async def compact(
self,
*,
target_rows_per_fragment: int | None = None,
materialize_deletions: bool | None = None,
) -> dict[str, Any]:
"""Run compaction on the remote context."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None,
lambda: self._sync.compact(
target_rows_per_fragment=target_rows_per_fragment,
materialize_deletions=materialize_deletions,
),
)

async def compaction_stats(self) -> dict[str, Any]:
"""Get compaction statistics."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, self._sync.compaction_stats)

def __repr__(self) -> str:
return f"RemoteContext(version={self._sync.version()})"
Loading
Loading