-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathclient.py
More file actions
223 lines (178 loc) · 7.21 KB
/
client.py
File metadata and controls
223 lines (178 loc) · 7.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import asyncio
from typing import Any, Optional, TYPE_CHECKING
import aiohttp
from pydantic import ValidationError
from ..models import VideoModelDefinition, _MODELS
from ..errors import InvalidInputError
from .request import submit_job, get_job_status, get_job_content
from .types import (
JobSubmitResponse,
JobStatusResponse,
QueueJobResult,
QueueJobResultCompleted,
QueueJobResultFailed,
OnStatusChangeCallback,
)
if TYPE_CHECKING:
from ..client import DecartClient
POLLING_INTERVAL = 1.5 # seconds
INITIAL_DELAY = 0.5 # seconds
class QueueClient:
"""
Queue client for async job-based video editing.
Only video models support the queue API.
Jobs are submitted and processed asynchronously, allowing you to
poll for status and retrieve results when ready.
Example:
```python
client = DecartClient(api_key="your-key")
# Option 1: Submit and poll automatically
result = await client.queue.submit_and_poll({
"model": models.video("lucy-pro-v2v"),
"prompt": "Restyle this clip with anime shading and saturated colors",
"data": open("input.mp4", "rb"),
"on_status_change": lambda job: print(f"Status: {job.status}"),
})
# Option 2: Submit and poll manually
job = await client.queue.submit({
"model": models.video("lucy-pro-v2v"),
"prompt": "Add cinematic teal-and-orange grading and subtle film grain",
"data": open("input.mp4", "rb"),
})
status = await client.queue.status(job.job_id)
result = await client.queue.result(job.job_id)
```
"""
def __init__(self, parent: "DecartClient") -> None:
self._parent = parent
async def _get_session(self) -> aiohttp.ClientSession:
return await self._parent._get_session()
async def submit(self, options: dict[str, Any]) -> JobSubmitResponse:
"""
Submit a video editing job to the queue for async processing.
Only video models are supported.
Returns immediately with job_id and initial status.
Args:
options: Submit options including model and inputs
- model: VideoModelDefinition from models.video()
- prompt: Text instructions describing the requested edit
- Additional model-specific inputs
Returns:
JobSubmitResponse with job_id and status
Raises:
InvalidInputError: If inputs are invalid or model is not a video model
QueueSubmitError: If submission fails
"""
if "model" not in options:
raise InvalidInputError("model is required")
model: VideoModelDefinition = options["model"]
# Validate that this is a video model (check against registry)
if model.name not in _MODELS["video"]:
raise InvalidInputError(
f"Model '{model.name}' is not supported by queue API. "
f"Only video models support async queue processing. "
f"For image models, use client.process() instead."
)
inputs = {k: v for k, v in options.items() if k not in ("model", "cancel_token")}
# File fields that need special handling
FILE_FIELDS = {"data", "start", "end", "reference_image"}
# Separate file inputs from regular inputs
file_inputs = {k: v for k, v in inputs.items() if k in FILE_FIELDS}
non_file_inputs = {k: v for k, v in inputs.items() if k not in FILE_FIELDS}
# Validate non-file inputs
validation_inputs = {
**non_file_inputs,
**{k: b"" for k in file_inputs.keys()},
}
try:
validated_inputs = model.input_schema(**validation_inputs)
except ValidationError as e:
raise InvalidInputError(f"Invalid inputs for {model.name}: {str(e)}") from e
# Build final inputs
processed_inputs = {
**validated_inputs.model_dump(exclude_none=True),
**file_inputs,
}
session = await self._get_session()
return await submit_job(
session=session,
base_url=self._parent.base_url,
api_key=self._parent.api_key,
model=model,
inputs=processed_inputs,
integration=self._parent.integration,
)
async def status(self, job_id: str) -> JobStatusResponse:
"""
Get the current status of a job.
Args:
job_id: The job ID returned from submit()
Returns:
JobStatusResponse with job_id and status
Raises:
QueueStatusError: If status check fails
"""
session = await self._get_session()
return await get_job_status(
session=session,
base_url=self._parent.base_url,
api_key=self._parent.api_key,
job_id=job_id,
integration=self._parent.integration,
)
async def result(self, job_id: str) -> bytes:
"""
Get the result of a completed job.
Should only be called when job status is "completed".
Args:
job_id: The job ID returned from submit()
Returns:
Generated media as bytes
Raises:
QueueResultError: If result retrieval fails
"""
session = await self._get_session()
return await get_job_content(
session=session,
base_url=self._parent.base_url,
api_key=self._parent.api_key,
job_id=job_id,
integration=self._parent.integration,
)
async def submit_and_poll(
self,
options: dict[str, Any],
) -> QueueJobResult:
"""
Submit a job and automatically poll until completion.
Returns a result object with status (does not throw on job failure).
Args:
options: Submit options including model, inputs, and optional on_status_change callback
Returns:
QueueJobResult - either completed with data or failed with error
Raises:
InvalidInputError: If inputs are invalid
QueueSubmitError: If submission fails
QueueStatusError: If status check fails
QueueResultError: If result retrieval fails
"""
on_status_change: Optional[OnStatusChangeCallback] = options.pop("on_status_change", None)
# Submit the job
job = await self.submit(options)
# Notify of initial status
if on_status_change:
on_status_change(JobStatusResponse(job_id=job.job_id, status=job.status))
# Initial delay before polling
await asyncio.sleep(INITIAL_DELAY)
# Poll until complete
while True:
status = await self.status(job.job_id)
if on_status_change:
on_status_change(status)
if status.status == "completed":
data = await self.result(job.job_id)
return QueueJobResultCompleted(status="completed", data=data)
if status.status == "failed":
return QueueJobResultFailed(status="failed", error="Job failed")
# Still pending or processing
await asyncio.sleep(POLLING_INTERVAL)