Skip to content

Commit a155626

Browse files
committed
feat: add LocalRuntimeBackend backend
This is the async `RuntimeBackend` replacement of the `LocalLauncher`, which will eventually by removed in lieu of this new backend. Some behavioural differences to note: - We now try to await() after a SIGKILL to be sure the process ended, bounded by a short timeout in case blocked at the kernel level. - We now use psutil to enumerate and kill descendent processes in addition to the created subprocess. This won't catch orphaned processes (needs e.g. cgroups), but should cover most sane usage. - The backend does _not_ link the output directories based on status (the `JobSpec.links`, e.g. "passing/", "failed/", "killed/"). The intention is that this detail is not core functionality for either the scheduler or the backends - instead, it will be implemented as an observer on the new async scheduler callbacks when introduced. By using async subprocesses and launching/killing jobs in batch, we are able to more efficiently launch jobs in parallel via async coroutines. We likewise avoid the ned to poll jobs - instead we have an async task awaiting the subprocess' completion, which we then forward to notify the (to be added) scheduler of the job's completion. Note that interactive jobs are still basically handled synchronously as before - assumed that there is only 1 interactive job running at a time. Signed-off-by: Alex Jones <alex.jones@lowrisc.org>
1 parent d4c1224 commit a155626

1 file changed

Lines changed: 334 additions & 0 deletions

File tree

src/dvsim/runtime/local.py

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
# Copyright lowRISC contributors (OpenTitan project).
2+
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
"""Legacy launcher adapter interface for the new async scheduler design."""
6+
7+
import asyncio
8+
import contextlib
9+
import shlex
10+
import signal
11+
import subprocess
12+
import time
13+
from collections.abc import Hashable, Iterable
14+
from dataclasses import dataclass
15+
from typing import TextIO
16+
17+
import psutil
18+
19+
from dvsim.job.data import JobSpec, JobStatusInfo
20+
from dvsim.job.status import JobStatus
21+
from dvsim.job.time import JobTime
22+
from dvsim.logging import log
23+
from dvsim.runtime.backend import RuntimeBackend
24+
from dvsim.runtime.data import JobCompletionEvent, JobHandle
25+
26+
27+
@dataclass(kw_only=True)
28+
class LocalJobHandle(JobHandle):
29+
"""Job handle for a job belonging to a legacy launcher adapter runtime backend."""
30+
31+
process: asyncio.subprocess.Process | None
32+
log_file: TextIO | None
33+
start_time: float
34+
kill_requested: bool = False
35+
36+
37+
class LocalRuntimeBackend(RuntimeBackend):
38+
"""Launch jobs as subprocesses on the user's local machine."""
39+
40+
name = "local"
41+
supports_interactive = True
42+
43+
DEFAULT_SIGTERM_TIMEOUT = 2.0 # in seconds
44+
DEFAULT_SIGKILL_TIMEOUT = 2.0 # in seconds
45+
46+
def __init__(
47+
self,
48+
*,
49+
max_parallelism: int | None = None,
50+
sigterm_timeout: float | None = None,
51+
sigkill_timeout: float | None = None,
52+
) -> None:
53+
"""Construct a local runtime backend.
54+
55+
Args:
56+
max_parallelism: The maximum number of jobs that can be dispatched to this backend
57+
at once. `0` means no limit, `None` means no override is applied to the default.
58+
sigterm_timeout: The time to wait for a process to die after a SIGTERM when killing
59+
it, before sending SIGKILL.
60+
sigkill_timeout: The time to wait for a process to die after a SIGKILL when killing
61+
it, before giving up (so the scheduler can progress).
62+
63+
"""
64+
super().__init__(max_parallelism=max_parallelism)
65+
self.sigterm_timeout = (
66+
sigterm_timeout if sigterm_timeout is not None else self.DEFAULT_SIGTERM_TIMEOUT
67+
)
68+
self.sigkill_timeout = (
69+
sigkill_timeout if sigkill_timeout is not None else self.DEFAULT_SIGKILL_TIMEOUT
70+
)
71+
72+
# Retain references to created asyncio tasks so they don't get GC'd.
73+
self._tasks: set[asyncio.Task] = set()
74+
75+
async def _log_from_pipe(
76+
self, handle: LocalJobHandle, stream: asyncio.StreamReader | None
77+
) -> None:
78+
"""Write piped asyncio subprocess stream contents to a job's log file."""
79+
if stream is None or not handle.log_file:
80+
return
81+
try:
82+
async for line in stream:
83+
decoded = line.decode("utf-8", errors="surrogateescape")
84+
handle.log_file.write(decoded)
85+
handle.log_file.flush()
86+
except asyncio.CancelledError:
87+
pass
88+
89+
async def _monitor_job(self, handle: LocalJobHandle) -> None:
90+
"""Wait for subprocess completion and emit a completion event."""
91+
if handle.process is None:
92+
return
93+
94+
if handle.log_file:
95+
handle.log_file.write(f"[Executing]:\n{handle.spec.cmd}\n\n")
96+
handle.log_file.flush()
97+
98+
reader_tasks = [
99+
asyncio.create_task(self._log_from_pipe(handle, handle.process.stdout)),
100+
asyncio.create_task(self._log_from_pipe(handle, handle.process.stderr)),
101+
]
102+
status = JobStatus.KILLED
103+
reason = None
104+
105+
try:
106+
exit_code = await asyncio.wait_for(
107+
handle.process.wait(), timeout=handle.spec.timeout_secs
108+
)
109+
runtime = time.monotonic() - handle.start_time
110+
status, reason = self._finish_job(handle, exit_code, runtime)
111+
except asyncio.TimeoutError:
112+
await self._kill_job(handle)
113+
status = JobStatus.KILLED
114+
timeout_message = f"Job timed out after {handle.spec.timeout_mins} minutes"
115+
reason = JobStatusInfo(message=timeout_message)
116+
finally:
117+
# Explicitly cancel reader tasks and wait for them to finish before closing the log
118+
# file. We first give them a second to finish naturally to reduce log loss.
119+
with contextlib.suppress(asyncio.TimeoutError):
120+
await asyncio.wait(reader_tasks, timeout=1)
121+
for task in reader_tasks:
122+
if not task.done():
123+
task.cancel()
124+
await asyncio.gather(*reader_tasks, return_exceptions=True)
125+
126+
if handle.log_file:
127+
handle.log_file.close()
128+
if handle.kill_requested:
129+
status = JobStatus.KILLED
130+
reason = JobStatusInfo(message="Job killed!")
131+
await self._emit_completion([JobCompletionEvent(handle.spec, status, reason)])
132+
133+
def _launch_interactive_job(
134+
self,
135+
job: JobSpec,
136+
log_file: TextIO | None,
137+
env: dict[str, str],
138+
) -> tuple[LocalJobHandle, JobCompletionEvent | None]:
139+
"""Launch a job in interactive mode with transparent stdin and stdout."""
140+
start_time = time.monotonic()
141+
exit_code = None
142+
completion = None
143+
144+
if log_file is not None:
145+
try:
146+
proc = subprocess.Popen(
147+
shlex.split(job.cmd),
148+
# Transparent stdin/stdout, stdout & stderr muxed and tee'd via the pipe.
149+
stdin=None,
150+
stdout=subprocess.PIPE,
151+
stderr=subprocess.STDOUT,
152+
universal_newlines=True,
153+
env=env,
154+
)
155+
if proc.stdout is not None:
156+
for line in proc.stdout:
157+
print(line, end="") # noqa: T201
158+
log_file.write(line)
159+
log_file.flush()
160+
161+
exit_code = proc.wait()
162+
except subprocess.SubprocessError as e:
163+
log_file.close()
164+
log.exception("Error launching job subprocess: %s", job.full_name)
165+
reason = JobStatusInfo(message=f"Failed to launch job: {e}")
166+
completion = JobCompletionEvent(job, JobStatus.KILLED, reason)
167+
168+
runtime = time.monotonic() - start_time
169+
handle = LocalJobHandle(
170+
spec=job,
171+
backend=self.name,
172+
job_runtime=JobTime(),
173+
simulated_time=JobTime(),
174+
process=None,
175+
log_file=log_file,
176+
start_time=start_time,
177+
)
178+
179+
if exit_code is not None:
180+
status, reason = self._finish_job(handle, exit_code, runtime)
181+
completion = JobCompletionEvent(job, status, reason)
182+
183+
return handle, completion
184+
185+
async def _launch_job(
186+
self,
187+
job: JobSpec,
188+
log_file: TextIO | None,
189+
env: dict[str, str],
190+
) -> tuple[LocalJobHandle | None, JobCompletionEvent | None]:
191+
"""Launch a job (in non-interactive mode) as an async subprocess."""
192+
proc = None
193+
completion = None
194+
if log_file is not None:
195+
try:
196+
proc = await asyncio.create_subprocess_exec(
197+
*shlex.split(job.cmd),
198+
# TODO: currently we mux the stdout and stderr streams by default. It would be
199+
# useful to make this behaviour optional on some global `IoPolicy`.
200+
stdout=asyncio.subprocess.PIPE,
201+
stderr=asyncio.subprocess.PIPE,
202+
env=env,
203+
)
204+
except BlockingIOError:
205+
# Skip this job for now; the scheduler should re-try to launch it later.
206+
log_file.close()
207+
return None, None
208+
except subprocess.SubprocessError as e:
209+
log_file.close()
210+
log.exception("Error launching job subprocess: %s", job.full_name)
211+
reason = JobStatusInfo(message=f"Failed to launch job: {e}")
212+
completion = JobCompletionEvent(job, JobStatus.KILLED, reason)
213+
214+
handle = LocalJobHandle(
215+
spec=job,
216+
backend=self.name,
217+
job_runtime=JobTime(),
218+
simulated_time=JobTime(),
219+
process=proc,
220+
log_file=log_file,
221+
start_time=time.monotonic(),
222+
)
223+
224+
# Create a task to asynchronously monitor the launched subprocess.
225+
# We must store a reference in self._tasks to ensure the task is not GC'd.
226+
if proc is not None:
227+
task = asyncio.create_task(self._monitor_job(handle))
228+
self._tasks.add(task)
229+
task.add_done_callback(self._tasks.discard)
230+
231+
return handle, completion
232+
233+
async def submit_many(self, jobs: Iterable[JobSpec]) -> dict[Hashable, JobHandle]:
234+
"""Submit & launch multiple jobs.
235+
236+
Returns:
237+
mapping from job.id -> JobHandle. Entries are only present for jobs that successfully
238+
launched; jobs that failed in a non-fatal way are missing, and should be retried.
239+
240+
"""
241+
completions: list[JobCompletionEvent] = []
242+
handles: dict[Hashable, JobHandle] = {}
243+
244+
for job in jobs:
245+
env = self._build_job_env(job)
246+
self._prepare_launch(job)
247+
248+
log_file = None
249+
try:
250+
log_file = job.log_path.open("w", encoding="utf-8", errors="surrogateescape")
251+
except BlockingIOError:
252+
continue # Skip this job for now; the scheduler should re-try to launch it later.
253+
except OSError as e:
254+
log.exception("Error writing to job log file: %s", job.full_name)
255+
reason = JobStatusInfo(message=f"Failed to launch job: {e}")
256+
completions.append(JobCompletionEvent(job, JobStatus.KILLED, reason))
257+
258+
if job.interactive:
259+
handle, completion = self._launch_interactive_job(job, log_file, env)
260+
else:
261+
handle, completion = await self._launch_job(job, log_file, env)
262+
if completion is not None:
263+
completions.append(completion)
264+
if handle is not None:
265+
handles[job.id] = handle
266+
267+
if completions:
268+
await self._emit_completion(completions)
269+
270+
return handles
271+
272+
def _send_kill_signal(self, proc: asyncio.subprocess.Process, signal_num: int) -> None:
273+
"""Send a (kill) signal to a process and all its descendent processes."""
274+
# TODO: maybe this should use cgroups in the future to be thorough?
275+
for child in psutil.Process(proc.pid).children(recursive=True):
276+
child.send_signal(signal_num)
277+
proc.send_signal(signal_num)
278+
279+
async def _kill_job(self, handle: LocalJobHandle) -> None:
280+
"""Kill the running local process, sending SIGTERM and then SIGKILL if that didn't work."""
281+
proc = handle.process
282+
if proc is None:
283+
return
284+
285+
if proc.returncode is None:
286+
handle.kill_requested = True
287+
try:
288+
self._send_kill_signal(proc, signal.SIGTERM)
289+
except ProcessLookupError:
290+
return
291+
292+
try:
293+
await asyncio.wait_for(proc.wait(), timeout=self.sigterm_timeout)
294+
except asyncio.TimeoutError:
295+
pass
296+
else:
297+
return
298+
299+
if proc.returncode is None:
300+
log.warning(
301+
"Job '%s' was not killed with SIGTERM after %g seconds, sending SIGKILL.",
302+
handle.spec.full_name,
303+
self.sigterm_timeout,
304+
)
305+
try:
306+
self._send_kill_signal(proc, signal.SIGKILL)
307+
except ProcessLookupError:
308+
return
309+
310+
try:
311+
await asyncio.wait_for(proc.wait(), timeout=self.sigkill_timeout)
312+
except asyncio.TimeoutError:
313+
# proc.wait() completes only when the kernel reaps the process. If we sent SIGKILL
314+
# and did not see this happen for a bit, the process is probably blocked in the
315+
# kernel somewhere (e.g. NFS hang, slow or dead disk I/O).
316+
log.error(
317+
"Job '%s' was not killed with SIGKILL after %g seconds, so give up on it.",
318+
handle.spec.full_name,
319+
self.sigkill_timeout,
320+
)
321+
322+
async def kill_many(self, handles: Iterable[JobHandle]) -> None:
323+
"""Cancel ongoing jobs via their handle. Killed jobs should still "complete"."""
324+
tasks = []
325+
for handle in handles:
326+
if not isinstance(handle, LocalJobHandle):
327+
msg = f"Local backend expected handle of type LocalJobHandle, not `{type(handle)}`."
328+
raise TypeError(msg)
329+
if handle.process and not handle.kill_requested and handle.process.returncode is None:
330+
tasks.append(asyncio.create_task(self._kill_job(handle)))
331+
332+
if tasks:
333+
# Wait for all job subprocesses to be killed; `_monitor_job` handles the completions.
334+
await asyncio.gather(*tasks)

0 commit comments

Comments
 (0)