Skip to content

Commit 55c2f0c

Browse files
committed
Add run_status method to run a job without waiting
Expose JobsEndpoint.poll helper to poll for job completion status if using the new JobsEndpoint.run_status method.
1 parent e9106a2 commit 55c2f0c

2 files changed

Lines changed: 77 additions & 13 deletions

File tree

metafold/jobs.py

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class Job:
2323
type: Job type.
2424
parameters: Job parameters.
2525
created: Job creation datetime.
26+
finished: Job finished datetime.
2627
state: Job state. May be one of: pending, started, success, or failure.
2728
assets: List of generated asset resources.
2829
meta: Additional metadata generated by the job.
@@ -106,11 +107,63 @@ def run(
106107
Returns:
107108
Completed job resource.
108109
"""
110+
url = self.run_status(type, params, name=name, project_id=project_id)
111+
try:
112+
r: Response = self.poll(url, timeout)
113+
except PollTimeout as e:
114+
raise RuntimeError(
115+
f"Job '{name or type}' failed to complete within {timeout} seconds"
116+
) from e
117+
return Job(**r.json())
118+
119+
def run_status(
120+
self, type: str, params: dict[str, Any],
121+
name: Optional[str] = None,
122+
project_id: Optional[str] = None,
123+
) -> str:
124+
"""Dispatch a new job and return immediately without waiting for result.
125+
126+
See Metafold API docs for the full list of jobs.
127+
128+
Args:
129+
type: Job type.
130+
params: Job parameters.
131+
name: Optional job name.
132+
project_id: Job project ID.
133+
134+
Returns:
135+
Job status url.
136+
"""
109137
project_id = self._client.project_id(project_id)
110138
payload = asdict(type=type, parameters=params, name=name)
111139
r: Response = self._client.post(f"/projects/{project_id}/jobs", json=payload)
112-
r = self._poll(r.json()["link"], timeout)
113-
return Job(**r.json())
140+
return r.json()["link"]
141+
142+
def poll(
143+
self, url: str,
144+
timeout: Union[int, float] = 120,
145+
every: Union[int, float] = 1,
146+
) -> Response:
147+
"""Poll the given URL in regular intervals.
148+
149+
Helpful for waiting on job results given a status URL.
150+
151+
Args:
152+
timeout: Time in seconds to wait for a result.
153+
every: Frequency in seconds.
154+
155+
Returns:
156+
HTTP response.
157+
"""
158+
t0 = time.monotonic()
159+
r = self._client.get(url)
160+
while r.status_code == 202:
161+
elapsed = time.monotonic() - t0
162+
if elapsed >= timeout:
163+
raise PollTimeout("Job timed out")
164+
time.sleep(1)
165+
r = self._client.get(url)
166+
return r
114167

115168
def update(
116169
self, job_id: str,
@@ -132,14 +185,3 @@ def update(
132185
payload = asdict(name=name)
133186
r: Response = self._client.patch(url, data=payload)
134187
return Job(**r.json())
135-
136-
def _poll(self, url: str, timeout: Union[int, float]) -> Response:
137-
t0 = time.monotonic()
138-
r = self._client.get(url)
139-
while r.status_code == 202:
140-
elapsed = time.monotonic() - t0
141-
if elapsed >= timeout:
142-
raise PollTimeout("Job timed out")
143-
time.sleep(1)
144-
r = self._client.get(url)
145-
return r

tests/test_jobs.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,28 @@ def test_run_job(client):
206206
)
207207

208208

209+
def test_poll_job(client):
210+
params = {
211+
"foo": 1,
212+
"bar": "a",
213+
"baz": [2, "b"],
214+
}
215+
url = client.jobs.run_status("test_job", params, name="My Job")
216+
assert url == "http://localhost:8000/projects/1/jobs/1/status"
217+
218+
r = client.jobs.poll(url)
219+
assert Job(**r.json()) == Job(
220+
id="1",
221+
name="My Job",
222+
type="test_job",
223+
parameters=params,
224+
created=default_dt,
225+
state="success",
226+
assets=[asset_obj],
227+
meta=None,
228+
)
229+
230+
209231
def test_update_job(client):
210232
j = client.jobs.update("1", name="baz")
211233
assert j.name == "baz"

0 commit comments

Comments
 (0)