Skip to content

Commit 65df3d6

Browse files
committed
feat: introduce LegacyLauncherAdapter runtime backend
This acts as an adapter interface for the legacy non-async Launcher classes so that they can be used with the upcoming new async Scheduler and RuntimeBackend interfaces without needing to completely rewrite them. This is important because there are many existing launchers which are at present difficult to test or validate - we can minimise breakage to downstream dependencies by following the Strangler Fig pattern, incrementally replacing the legacy launcher functionality, launcher by launcher, replacing them with new runtime backend implementations. The legacy launcher tries to respect all features originally implemented by the current synchronous scheduler - poll frequency, max poll limits, parallelism limits, error messages, etc. This is all managed by an asynchronous polling task (`_poller`) which polls jobs in a FIFO poll queue at set intervals. Submitting and killing jobs then just requires launching and killing the underlying launchers and modifying the poll queue accordingly. Signed-off-by: Alex Jones <alex.jones@lowrisc.org>
1 parent cbb46d2 commit 65df3d6

1 file changed

Lines changed: 229 additions & 0 deletions

File tree

src/dvsim/runtime/legacy.py

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
# Copyright lowRISC contributors (OpenTitan project).
2+
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
"""Legacy launcher adapter interface for the new async scheduler design."""
6+
7+
import asyncio
8+
import time
9+
from collections.abc import Hashable, Iterable
10+
from dataclasses import dataclass
11+
12+
from dvsim.job.data import JobSpec, JobStatusInfo
13+
from dvsim.job.status import JobStatus
14+
from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError
15+
from dvsim.logging import log
16+
from dvsim.runtime.backend import RuntimeBackend
17+
from dvsim.runtime.data import JobCompletionEvent, JobHandle
18+
19+
20+
@dataclass(kw_only=True)
21+
class LauncherJobHandle(JobHandle):
22+
"""Job handle for a job belonging to a legacy launcher adapter runtime backend."""
23+
24+
launcher: Launcher
25+
26+
27+
class LegacyLauncherAdapter(RuntimeBackend):
28+
"""Adapter that allows legacy polling-based launchers to run appearing as a new async backend.
29+
30+
Each job receives its own legacy launcher instance to account for the fact that launcher
31+
constructs consume an individual job spec. A single async poll task is used which respects
32+
the launcher's configured `max_parallel`, `max_polls` and `poll_freq` configuration, to match
33+
the original scheduler/launcher behaviour more closely.
34+
"""
35+
36+
name = "legacy"
37+
38+
def __init__(
39+
self,
40+
launcher_cls: type[Launcher],
41+
*,
42+
poll_interval: float | None = None,
43+
max_polls_per_interval: int | None = None,
44+
max_parallelism: int | None = None,
45+
) -> None:
46+
"""Construct a legacy launcher adapter.
47+
48+
Args:
49+
launcher_cls: The legacy launcher class to adapt as a backend.
50+
poll_interval: Override for the interval (seconds) between poll attempts.
51+
max_polls_per_interval: The maximum number of jobs that can be polled in a single
52+
interval. `0` means no limit, `None` means no override is applied to the default.
53+
max_parallelism: The maximum number of jobs that can be dispatched to this backend
54+
at once. `0` means no limit, `None` means no override is applied to the default.
55+
56+
"""
57+
if max_parallelism is None:
58+
max_parallelism = launcher_cls.max_parallel
59+
super().__init__(max_parallelism=max_parallelism)
60+
61+
self.launcher_cls = launcher_cls
62+
63+
# Get the name from the defined variant, or from the class name, or just "legacy".
64+
if launcher_cls.variant is not None:
65+
self.name = launcher_cls.variant
66+
else:
67+
name = launcher_cls.__name__.lower()
68+
if name.endswith("launcher"):
69+
self.name = name.removesuffix("launcher")
70+
71+
self.poll_interval = poll_interval if poll_interval is not None else launcher_cls.poll_freq
72+
self.max_polls_per_interval = (
73+
max_polls_per_interval if max_polls_per_interval is not None else launcher_cls.max_poll
74+
)
75+
# This is just hardcoded for now; from inspection, it seems these two launchers are
76+
# the only ones that actually support interactivity.
77+
self.supports_interactive = self.name in ("local", "nc")
78+
79+
# Track launchers that have been created but not yet launched due to LauncherBusyErrors.
80+
self._pending_launchers: dict[Hashable, Launcher] = {}
81+
82+
# FIFO Queue defining the order in which jobs should be polled.
83+
self._poll_queue: list[LauncherJobHandle] = []
84+
85+
self._poller_task: asyncio.Task | None = None
86+
self._closed: bool = False
87+
88+
def _ensure_poller(self) -> None:
89+
"""Lazily make sure that a poller task exists."""
90+
if self._poller_task is None:
91+
log.warning("Using legacy runtime adapter for the '%s' launcher.", self.name)
92+
log.warning("Consider rewriting the launcher as a runtime backend if needed.")
93+
self._poller_task = asyncio.create_task(self._poller())
94+
95+
def _extract_launcher_job_failure(
96+
self, status: JobStatus, handle: LauncherJobHandle
97+
) -> JobStatusInfo | None:
98+
"""Extract relevant information about a job failure within a launcher."""
99+
if status not in (JobStatus.FAILED, JobStatus.KILLED):
100+
return None
101+
fail_msg = handle.launcher.fail_msg
102+
if fail_msg is None or not fail_msg.message:
103+
return None
104+
105+
return JobStatusInfo(
106+
message=fail_msg.message,
107+
lines=[fail_msg.line_number] if fail_msg.line_number is not None else None,
108+
context=fail_msg.context,
109+
)
110+
111+
async def _poller(self) -> None:
112+
"""Poll all dispatched legacy jobs in a serial polling loop."""
113+
next_tick = time.monotonic() + self.poll_interval
114+
try:
115+
while not self._closed:
116+
completions: list[JobCompletionEvent] = []
117+
118+
if self.max_polls_per_interval:
119+
handles_to_poll = self._poll_queue[: self.max_polls_per_interval]
120+
self._poll_queue = self._poll_queue[len(handles_to_poll) :]
121+
else:
122+
handles_to_poll = self._poll_queue.copy()
123+
self._poll_queue.clear()
124+
125+
for handle in handles_to_poll:
126+
try:
127+
status = handle.launcher.poll()
128+
except LauncherError as e:
129+
log.error("Error when polling job: %s", str(e))
130+
status = JobStatus.KILLED
131+
if status.is_terminal:
132+
reason = self._extract_launcher_job_failure(status, handle)
133+
completion_event = JobCompletionEvent(handle.spec, status, reason)
134+
completions.append(completion_event)
135+
else:
136+
self._poll_queue.append(handle)
137+
138+
if completions:
139+
await self._emit_completion(completions)
140+
141+
now = time.monotonic()
142+
sleep_time = max(0, next_tick - now)
143+
next_tick += self.poll_interval
144+
await asyncio.sleep(sleep_time)
145+
finally:
146+
self._poller_task = None
147+
148+
async def submit_many(self, jobs: Iterable[JobSpec]) -> dict[Hashable, JobHandle]:
149+
"""Submit & launch multiple jobs via Launchers.
150+
151+
Returns:
152+
mapping from job.id -> JobHandle. Entries are only present for jobs that successfully
153+
launched; jobs that failed in a non-fatal way are missing, and should be retried.
154+
155+
"""
156+
self._ensure_poller()
157+
158+
handles: dict[Hashable, JobHandle] = {}
159+
completions: list[JobCompletionEvent] = [] # For jobs with errors during launching
160+
161+
for job in jobs:
162+
# For the sake of wrapping and maintaining existing functionality, we do not
163+
# prepare the build env vars or output directories here. That is done in the
164+
# launcher itself to support these legacy implementations.
165+
if job.id in self._pending_launchers:
166+
launcher = self._pending_launchers[job.id]
167+
else:
168+
launcher = self.launcher_cls(job)
169+
self._pending_launchers[job.id] = launcher
170+
171+
try:
172+
launcher.launch()
173+
except LauncherError as e:
174+
log.exception("Error launching %s", job.full_name)
175+
reason = JobStatusInfo(message=f"Error launching job: {e}")
176+
completions.append(JobCompletionEvent(job, JobStatus.KILLED, reason))
177+
except LauncherBusyError:
178+
log.exception("Legacy '%s' launcher is busy", self.name)
179+
continue
180+
181+
self._pending_launchers.pop(job.id, None)
182+
handle = LauncherJobHandle(
183+
spec=job,
184+
backend=self.name,
185+
job_runtime=launcher.job_runtime,
186+
simulated_time=launcher.simulated_time,
187+
launcher=launcher,
188+
)
189+
self._poll_queue.append(handle)
190+
handles[job.id] = handle
191+
192+
if completions:
193+
await self._emit_completion(completions)
194+
195+
return handles
196+
197+
async def kill_many(self, handles: Iterable[JobHandle]) -> None:
198+
"""Cancel ongoing jobs via their handle. Killed jobs should still "complete"."""
199+
completions: list[JobCompletionEvent] = []
200+
201+
for handle in handles:
202+
if not isinstance(handle, LauncherJobHandle):
203+
type_ = "LauncherJobHandle"
204+
msg = f"Legacy backend expected handle of type `{type_}`, not `{type(handle)}`."
205+
raise TypeError(msg)
206+
207+
handle.launcher.kill()
208+
completion = JobCompletionEvent(handle.spec, JobStatus.KILLED, None)
209+
completions.append(completion)
210+
if handle in self._poll_queue:
211+
self._poll_queue.remove(handle)
212+
213+
if completions:
214+
await self._emit_completion(completions)
215+
216+
async def close(self) -> None:
217+
"""Release any resources that the backend holds; called when the scheduler completes.
218+
219+
Ensures that the dedicated asyncio poller task is cancelled.
220+
221+
"""
222+
self._closed = True
223+
if self._poller_task:
224+
self._poller_task.cancel()
225+
try:
226+
await self._poller_task
227+
except asyncio.CancelledError as e:
228+
log.debug("Ignoring asyncio cancellation error: %s", str(e))
229+
self._poller_task = None

0 commit comments

Comments
 (0)