|
| 1 | +# Copyright lowRISC contributors (OpenTitan project). |
| 2 | +# Licensed under the Apache License, Version 2.0, see LICENSE for details. |
| 3 | +# SPDX-License-Identifier: Apache-2.0 |
| 4 | + |
| 5 | +"""Legacy launcher adapter interface for the new async scheduler design.""" |
| 6 | + |
| 7 | +import asyncio |
| 8 | +import time |
| 9 | +from collections.abc import Hashable, Sequence |
| 10 | +from dataclasses import dataclass |
| 11 | + |
| 12 | +from dvsim.job.data import JobSpec, JobStatusInfo |
| 13 | +from dvsim.job.status import JobStatus |
| 14 | +from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError |
| 15 | +from dvsim.logging import log |
| 16 | +from dvsim.runtime.backend import RuntimeBackend |
| 17 | +from dvsim.runtime.data import JobCompletionEvent, JobHandle |
| 18 | + |
| 19 | + |
| 20 | +@dataclass(kw_only=True) |
| 21 | +class LauncherJobHandle(JobHandle): |
| 22 | + """Job handle for a job belonging to a legacy launcher adapter runtime backend.""" |
| 23 | + |
| 24 | + launcher: Launcher |
| 25 | + |
| 26 | + |
| 27 | +class LegacyLauncherAdapter(RuntimeBackend): |
| 28 | + """Adapter that allows legacy polling-based launchers to run appearing as a new async backend. |
| 29 | +
|
| 30 | + Each job receives its own legacy launcher instance to account for the fact that launcher |
| 31 | + constructs consume an individual job spec. A single async poll task is used which respects |
| 32 | + the launcher's configured `max_parallel`, `max_polls` and `poll_freq` configuration, to match |
| 33 | + the original scheduler/launcher behaviour more closely. |
| 34 | + """ |
| 35 | + |
| 36 | + name = "legacy" |
| 37 | + |
| 38 | + def __init__( |
| 39 | + self, |
| 40 | + launcher_cls: type[Launcher], |
| 41 | + *, |
| 42 | + poll_interval: float | None = None, |
| 43 | + max_polls_per_interval: int | None = None, |
| 44 | + max_parallelism: int | None = None, |
| 45 | + ) -> None: |
| 46 | + """Construct a legacy launcher adapter. |
| 47 | +
|
| 48 | + Args: |
| 49 | + launcher_cls: The legacy launcher class to adapt as a backend. |
| 50 | + poll_interval: Override for the interval (seconds) between poll attempts. |
| 51 | + max_polls_per_interval: The maximum number of jobs that can be polled in a single |
| 52 | + interval. `0` means no limit, `None` means no override is applied to the default. |
| 53 | + max_parallelism: The maximum number of jobs that can be dispatched to this backend |
| 54 | + at once. `0` means no limit, `None` means no override is applied to the default. |
| 55 | +
|
| 56 | + """ |
| 57 | + if max_parallelism is None: |
| 58 | + max_parallelism = launcher_cls.max_parallel |
| 59 | + super().__init__(max_parallelism=max_parallelism) |
| 60 | + |
| 61 | + self.launcher_cls = launcher_cls |
| 62 | + |
| 63 | + # Get the name from the defined variant, or from the class name, or just "legacy". |
| 64 | + if launcher_cls.variant is not None: |
| 65 | + self.name = launcher_cls.variant |
| 66 | + else: |
| 67 | + name = launcher_cls.__name__.lower() |
| 68 | + if name.endswith("launcher"): |
| 69 | + self.name = name.removesuffix("launcher") |
| 70 | + |
| 71 | + self.poll_interval = poll_interval if poll_interval is not None else launcher_cls.poll_freq |
| 72 | + self.max_polls_per_interval = ( |
| 73 | + max_polls_per_interval if max_polls_per_interval is not None else launcher_cls.max_poll |
| 74 | + ) |
| 75 | + # This is just hardcoded for now; from inspection, it seems these two launchers are |
| 76 | + # the only ones that actually support interactivity. |
| 77 | + self.supports_interactive = self.name in ("local", "nc") |
| 78 | + |
| 79 | + # Track launchers that have been created but not yet launched due to LauncherBusyErrors. |
| 80 | + self._pending_launchers: dict[Hashable, Launcher] = {} |
| 81 | + |
| 82 | + # FIFO Queue defining the order in which jobs should be polled. |
| 83 | + self._poll_queue: list[LauncherJobHandle] = [] |
| 84 | + |
| 85 | + self._poller_task: asyncio.Task | None = None |
| 86 | + self._closed: bool = False |
| 87 | + |
| 88 | + def _ensure_poller(self) -> None: |
| 89 | + """Lazily make sure that a poller task exists.""" |
| 90 | + if self._poller_task is None: |
| 91 | + log.warning("Using legacy runtime adapter for the '%s' launcher.", self.name) |
| 92 | + log.warning("Consider rewriting the launcher as a runtime backend if needed.") |
| 93 | + self._poller_task = asyncio.create_task(self._poller()) |
| 94 | + |
| 95 | + def _extract_launcher_job_failure( |
| 96 | + self, status: JobStatus, handle: LauncherJobHandle |
| 97 | + ) -> JobStatusInfo | None: |
| 98 | + """Extract relevant information about a job failure within a launcher.""" |
| 99 | + if status not in (JobStatus.FAILED, JobStatus.KILLED): |
| 100 | + return None |
| 101 | + fail_msg = handle.launcher.fail_msg |
| 102 | + if fail_msg is None or not fail_msg.message: |
| 103 | + return None |
| 104 | + |
| 105 | + return JobStatusInfo( |
| 106 | + message=fail_msg.message, |
| 107 | + lines=[fail_msg.line_number] if fail_msg.line_number is not None else None, |
| 108 | + context=fail_msg.context, |
| 109 | + ) |
| 110 | + |
| 111 | + async def _poller(self) -> None: |
| 112 | + """Poll all dispatched legacy jobs in a serial polling loop.""" |
| 113 | + next_tick = time.monotonic() + self.poll_interval |
| 114 | + try: |
| 115 | + while not self._closed: |
| 116 | + completions: list[JobCompletionEvent] = [] |
| 117 | + |
| 118 | + if self.max_polls_per_interval: |
| 119 | + handles_to_poll = self._poll_queue[: self.max_polls_per_interval] |
| 120 | + self._poll_queue = self._poll_queue[len(handles_to_poll) :] |
| 121 | + else: |
| 122 | + handles_to_poll = self._poll_queue.copy() |
| 123 | + self._poll_queue.clear() |
| 124 | + |
| 125 | + for handle in handles_to_poll: |
| 126 | + try: |
| 127 | + status = handle.launcher.poll() |
| 128 | + except LauncherError as e: |
| 129 | + log.error("Error when polling job: %s", str(e)) |
| 130 | + status = JobStatus.KILLED |
| 131 | + if status.is_terminal: |
| 132 | + reason = self._extract_launcher_job_failure(status, handle) |
| 133 | + completion_event = JobCompletionEvent(handle.spec, status, reason) |
| 134 | + completions.append(completion_event) |
| 135 | + else: |
| 136 | + self._poll_queue.append(handle) |
| 137 | + |
| 138 | + if completions: |
| 139 | + await self._emit_completion(completions) |
| 140 | + |
| 141 | + now = time.monotonic() |
| 142 | + sleep_time = max(0, next_tick - now) |
| 143 | + next_tick += self.poll_interval |
| 144 | + await asyncio.sleep(sleep_time) |
| 145 | + finally: |
| 146 | + self._poller_task = None |
| 147 | + |
| 148 | + async def submit_many(self, jobs: Sequence[JobSpec]) -> dict[Hashable, JobHandle]: |
| 149 | + """Submit & launch multiple jobs via Launchers. |
| 150 | +
|
| 151 | + Returns: |
| 152 | + mapping from job.id -> JobHandle. |
| 153 | +
|
| 154 | + """ |
| 155 | + self._ensure_poller() |
| 156 | + |
| 157 | + handles: dict[Hashable, JobHandle] = {} |
| 158 | + completions: list[JobCompletionEvent] = [] # For jobs with errors during launching |
| 159 | + |
| 160 | + for job in jobs: |
| 161 | + # For the sake of wrapping and maintaining existing functionality, we do not |
| 162 | + # prepare the build env vars or output directories here. That is done in the |
| 163 | + # launcher itself to support these legacy implementations. |
| 164 | + if job.id in self._pending_launchers: |
| 165 | + launcher = self._pending_launchers[job.id] |
| 166 | + else: |
| 167 | + launcher = self.launcher_cls(job) |
| 168 | + self._pending_launchers[job.id] = launcher |
| 169 | + |
| 170 | + try: |
| 171 | + launcher.launch() |
| 172 | + except LauncherError as e: |
| 173 | + log.exception("Error launching %s", job.full_name) |
| 174 | + reason = JobStatusInfo(message=f"Error launching job: {e}") |
| 175 | + completions.append(JobCompletionEvent(job, JobStatus.KILLED, reason)) |
| 176 | + except LauncherBusyError: |
| 177 | + log.exception("Legacy '%s' launcher is busy", self.name) |
| 178 | + continue |
| 179 | + |
| 180 | + self._pending_launchers.pop(job.id, None) |
| 181 | + handle = LauncherJobHandle( |
| 182 | + spec=job, |
| 183 | + backend=self.name, |
| 184 | + job_runtime=launcher.job_runtime, |
| 185 | + simulated_time=launcher.simulated_time, |
| 186 | + launcher=launcher, |
| 187 | + ) |
| 188 | + self._poll_queue.append(handle) |
| 189 | + handles[job.id] = handle |
| 190 | + |
| 191 | + if completions: |
| 192 | + await self._emit_completion(completions) |
| 193 | + |
| 194 | + return handles |
| 195 | + |
| 196 | + async def kill_many(self, handles: Sequence[JobHandle]) -> None: |
| 197 | + """Cancel ongoing jobs via their handle. Killed jobs should still "complete".""" |
| 198 | + completions: list[JobCompletionEvent] = [] |
| 199 | + |
| 200 | + for handle in handles: |
| 201 | + if not isinstance(handle, LauncherJobHandle): |
| 202 | + type_ = "LauncherJobHandle" |
| 203 | + msg = f"Legacy backend expected handle of type `{type_}`, not `{type(handle)}`." |
| 204 | + raise TypeError(msg) |
| 205 | + |
| 206 | + handle.launcher.kill() |
| 207 | + completion = JobCompletionEvent(handle.spec, JobStatus.KILLED, None) |
| 208 | + completions.append(completion) |
| 209 | + if handle in self._poll_queue: |
| 210 | + self._poll_queue.remove(handle) |
| 211 | + |
| 212 | + if completions: |
| 213 | + await self._emit_completion(completions) |
| 214 | + |
| 215 | + async def close(self) -> None: |
| 216 | + """Release any resources that the backend holds; called when the scheduler completes. |
| 217 | +
|
| 218 | + Ensures that the dedicated asyncio poller task is cancelled. |
| 219 | +
|
| 220 | + """ |
| 221 | + self._closed = True |
| 222 | + if self._poller_task: |
| 223 | + self._poller_task.cancel() |
| 224 | + try: |
| 225 | + await self._poller_task |
| 226 | + except asyncio.CancelledError as e: |
| 227 | + log.debug("Ignoring asyncio cancellation error: %s", str(e)) |
| 228 | + self._poller_task = None |
0 commit comments