Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "delega"
version = "0.1.3"
version = "0.2.0"
description = "Official Python SDK for the Delega API"
readme = "README.md"
license = "MIT"
Expand Down
13 changes: 12 additions & 1 deletion src/delega/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,30 @@
DelegaNotFoundError,
DelegaRateLimitError,
)
from .models import Agent, Comment, Project, Task
from .models import (
Agent,
Comment,
DedupResult,
DelegationChain,
DuplicateMatch,
Project,
Task,
)
from .webhooks import verify_webhook

__all__ = [
"Agent",
"AsyncDelega",
"Comment",
"DedupResult",
"Delega",
"DelegaAPIError",
"DelegaAuthError",
"DelegaError",
"DelegaNotFoundError",
"DelegaRateLimitError",
"DelegationChain",
"DuplicateMatch",
"Project",
"Task",
"verify_webhook",
Expand Down
5 changes: 5 additions & 0 deletions src/delega/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ def __init__(self, base_url: str, api_key: str, timeout: int = _DEFAULT_TIMEOUT)
self._api_key = api_key
self._timeout = timeout

@property
def path_prefix(self) -> str:
"""Return the API namespace path ("/v1" for hosted, "/api" for self-hosted)."""
return urllib.parse.urlparse(self._base_url).path or ""

def _headers(self) -> dict[str, str]:
return {
"X-Agent-Key": self._api_key,
Expand Down
2 changes: 1 addition & 1 deletion src/delega/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Package version metadata."""

__version__ = "0.1.3"
__version__ = "0.2.0"
USER_AGENT = f"delega-python/{__version__}"
80 changes: 77 additions & 3 deletions src/delega/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
DelegaNotFoundError,
DelegaRateLimitError,
)
from .models import Agent, Comment, Project, Task
from .models import Agent, Comment, DedupResult, DelegationChain, Project, Task
from ._version import USER_AGENT

_DEFAULT_BASE_URL = "https://api.delega.dev"
Expand Down Expand Up @@ -49,6 +49,12 @@ def __init__(self, base_url: str, api_key: str, timeout: int = 30) -> None:
timeout=timeout,
)

@property
def path_prefix(self) -> str:
"""Return the API namespace path ("/v1" for hosted, "/api" for self-hosted)."""
import urllib.parse
return urllib.parse.urlparse(self._base_url).path or ""

async def request(
self,
method: str,
Expand Down Expand Up @@ -201,16 +207,74 @@ async def delegate(
*,
description: Optional[str] = None,
priority: Optional[int] = None,
project_id: Optional[str] = None,
labels: Optional[list[str]] = None,
due_date: Optional[str] = None,
assigned_to_agent_id: Optional[str] = None,
) -> Task:
"""Create a delegated sub-task under a parent task."""
"""Create a delegated child task under a parent.

The parent's ``status`` flips to ``"delegated"``. Use this — not
``assign()`` — for multi-agent handoffs so the parent/child
accountability chain is recorded.
"""
body: dict[str, Any] = {"content": content}
if description is not None:
body["description"] = description
if priority is not None:
body["priority"] = priority
if project_id is not None:
body["project_id"] = project_id
if labels is not None:
body["labels"] = labels
if due_date is not None:
body["due_date"] = due_date
if assigned_to_agent_id is not None:
body["assigned_to_agent_id"] = assigned_to_agent_id
data = await self._http.post(f"/tasks/{parent_task_id}/delegate", body=body)
return Task.from_dict(data)

async def assign(self, task_id: str, agent_id: Optional[str]) -> Task:
"""Assign a task to an agent (or ``None`` to unassign)."""
data = await self._http.put(
f"/tasks/{task_id}", body={"assigned_to_agent_id": agent_id}
)
return Task.from_dict(data)

async def chain(self, task_id: str) -> DelegationChain:
"""Get the full parent/child delegation chain for a task."""
data = await self._http.get(f"/tasks/{task_id}/chain")
return DelegationChain.from_dict(data)

async def update_context(
self, task_id: str, context: dict[str, Any]
) -> dict[str, Any]:
"""Deep-merge keys into a task's persistent context blob.

Existing keys are preserved; supplied keys are added or overwritten.
"""
data = await self._http.patch(f"/tasks/{task_id}/context", body=context)
if isinstance(data, dict) and "content" in data and "id" in data:
raw_ctx = data.get("context") or {}
if isinstance(raw_ctx, str):
import json as _json
try:
raw_ctx = _json.loads(raw_ctx) if raw_ctx.strip() else {}
except Exception:
raw_ctx = {}
return raw_ctx if isinstance(raw_ctx, dict) else {}
return data if isinstance(data, dict) else {}

async def find_duplicates(
self, content: str, *, threshold: Optional[float] = None
) -> DedupResult:
"""Check whether content is similar to existing open tasks."""
body: dict[str, Any] = {"content": content}
if threshold is not None:
body["threshold"] = threshold
data = await self._http.post("/tasks/dedup", body=body)
return DedupResult.from_dict(data)

async def add_comment(self, task_id: str, content: str) -> Comment:
"""Add a comment to a task."""
data = await self._http.post(f"/tasks/{task_id}/comments", body={"content": content})
Expand Down Expand Up @@ -368,7 +432,17 @@ async def me(self) -> dict[str, Any]:
return await self._http.get("/agent/me") # type: ignore[no-any-return]

async def usage(self) -> dict[str, Any]:
"""Get API usage information."""
"""Get quota and rate-limit information for the current plan.

Hosted API only (``api.delega.dev``). Self-hosted deployments
will raise :class:`DelegaError` before making a request.
"""
if self._http.path_prefix != "/v1":
raise DelegaError(
"usage() is only available on the hosted Delega API "
"(api.delega.dev). Self-hosted deployments do not expose "
"a usage endpoint."
)
return await self._http.get("/usage") # type: ignore[no-any-return]

async def aclose(self) -> None:
Expand Down
131 changes: 124 additions & 7 deletions src/delega/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@

from ._http import HTTPClient
from .exceptions import DelegaError
from .models import Agent, Comment, Project, Task
from .models import (
Agent,
Comment,
DedupResult,
DelegationChain,
Project,
Task,
)

_DEFAULT_BASE_URL = "https://api.delega.dev"

Expand Down Expand Up @@ -153,23 +160,121 @@ def delegate(
*,
description: Optional[str] = None,
priority: Optional[int] = None,
project_id: Optional[str] = None,
labels: Optional[list[str]] = None,
due_date: Optional[str] = None,
assigned_to_agent_id: Optional[str] = None,
) -> Task:
"""Create a delegated sub-task under a parent task.
"""Create a delegated child task under a parent.

The parent's ``status`` flips to ``"delegated"``. Use this — not
``assign()`` — for multi-agent handoffs so the parent/child
accountability chain is recorded (inspectable via ``chain()``).

Args:
parent_task_id: The parent task identifier.
content: The sub-task content/title.
content: The child task content/title.
description: Optional longer description.
priority: Optional priority level.
priority: Optional priority level (1-4).
project_id: Optional project to attach the child to.
labels: Optional labels for the child.
due_date: Optional due date (YYYY-MM-DD).
assigned_to_agent_id: Optional agent to assign the child to.
"""
body: dict[str, Any] = {"content": content}
if description is not None:
body["description"] = description
if priority is not None:
body["priority"] = priority
if project_id is not None:
body["project_id"] = project_id
if labels is not None:
body["labels"] = labels
if due_date is not None:
body["due_date"] = due_date
if assigned_to_agent_id is not None:
body["assigned_to_agent_id"] = assigned_to_agent_id
data = self._http.post(f"/tasks/{parent_task_id}/delegate", body=body)
return Task.from_dict(data)

def assign(self, task_id: str, agent_id: Optional[str]) -> Task:
"""Assign a task to an agent (or ``None`` to unassign).

For multi-agent handoffs where you want the parent/child chain
recorded, use ``delegate()`` instead — ``assign()`` only updates
the assignee on an existing task.

Args:
task_id: The task identifier.
agent_id: The agent identifier, or ``None`` to unassign.
"""
data = self._http.put(
f"/tasks/{task_id}", body={"assigned_to_agent_id": agent_id}
)
return Task.from_dict(data)

def chain(self, task_id: str) -> DelegationChain:
"""Get the full parent/child delegation chain for a task.

Normalizes hosted (``{root_id}``) vs self-hosted (``{root: Task}``)
response shapes so ``DelegationChain.root_id`` is always populated.

Args:
task_id: Any task identifier in the chain.
"""
data = self._http.get(f"/tasks/{task_id}/chain")
return DelegationChain.from_dict(data)

def update_context(
self, task_id: str, context: dict[str, Any]
) -> dict[str, Any]:
"""Deep-merge keys into a task's persistent context blob.

Existing keys are preserved; supplied keys are added or overwritten.
Use this to pass shared state between delegated agents instead of
re-describing context in task descriptions.

Args:
task_id: The task identifier.
context: Keys to merge into the existing context.

Returns:
The merged context dict.
"""
# Hosted returns the bare merged context dict; self-hosted returns
# the full Task with a ``context`` field. Normalize to always
# return the merged context.
data = self._http.patch(f"/tasks/{task_id}/context", body=context)
if isinstance(data, dict) and "content" in data and "id" in data:
# Looks like a full Task.
raw_ctx = data.get("context") or {}
if isinstance(raw_ctx, str):
import json as _json
try:
raw_ctx = _json.loads(raw_ctx) if raw_ctx.strip() else {}
except Exception:
raw_ctx = {}
return raw_ctx if isinstance(raw_ctx, dict) else {}
return data if isinstance(data, dict) else {}

def find_duplicates(
self, content: str, *, threshold: Optional[float] = None
) -> DedupResult:
"""Check whether content is similar to existing open tasks.

Call before ``create()`` to avoid redundant work. Uses Jaccard
similarity against open tasks.

Args:
content: The proposed task content to check.
threshold: Similarity threshold 0-1 (default 0.6 server-side).
"""
body: dict[str, Any] = {"content": content}
if threshold is not None:
body["threshold"] = threshold
data = self._http.post("/tasks/dedup", body=body)
return DedupResult.from_dict(data)

def add_comment(self, task_id: str, content: str) -> Comment:
"""Add a comment to a task.

Expand Down Expand Up @@ -386,9 +491,21 @@ def me(self) -> dict[str, Any]:
return self._http.get("/agent/me") # type: ignore[no-any-return]

def usage(self) -> dict[str, Any]:
"""Get API usage information.
"""Get quota and rate-limit information for the current plan.

Hosted API only (``api.delega.dev``). Self-hosted deployments
will raise :class:`DelegaError` before making a request.

Returns:
Dictionary with usage statistics.
Dict with ``plan``, ``task_count_month``, ``task_limit``,
``reset_date``, ``agent_count``, ``agent_limit``,
``webhook_count``, ``webhook_limit``, ``project_count``,
``project_limit``, ``rate_limit_rpm``, ``max_content_chars``.
"""
return self._http.get("/stats") # type: ignore[no-any-return]
if self._http.path_prefix != "/v1":
raise DelegaError(
"usage() is only available on the hosted Delega API "
"(api.delega.dev). Self-hosted deployments do not expose "
"a usage endpoint."
)
return self._http.get("/usage") # type: ignore[no-any-return]
Loading