Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/dvsim/job/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
from pydantic import BaseModel, ConfigDict

from dvsim.job.status import JobStatus
from dvsim.launcher.base import ErrorMessage
from dvsim.report.data import IPMeta, ToolMeta

__all__ = (
"CompletedJobStatus",
"JobSpec",
"JobStatusInfo",
"WorkspaceConfig",
)

Expand Down Expand Up @@ -117,12 +117,31 @@ class JobSpec(BaseModel):
fail_patterns: Sequence[str]
"""regex patterns to match on to determine if the job has failed."""

@property
def id(self) -> str:
"""Returns a string that uniquely identifies this job."""
# The full name disambiguates jobs, so `id` is just an alias here.
return self.full_name

@property
def timeout_secs(self) -> int | None:
"""Returns the timeout applied to the launched job, in seconds."""
return None if self.timeout_mins is None else self.timeout_mins * 60


class JobStatusInfo(BaseModel):
"""Context about some sort of failure / error within a job."""

Comment thread
AlexJones0 marked this conversation as resolved.
model_config = ConfigDict(frozen=True, extra="forbid")

message: str
"""Human readable error message."""
lines: Sequence[int | tuple[int, int]] | None = None
"""Relevant line information (in the job script or the job itself)."""
context: Sequence[str] | None = None
"""Arbitrary context strings."""


class CompletedJobStatus(BaseModel):
"""Job status."""

Expand Down Expand Up @@ -166,5 +185,5 @@ class CompletedJobStatus(BaseModel):

status: JobStatus
"""Status of the job."""
fail_msg: ErrorMessage
fail_msg: JobStatusInfo | None
"""Error message."""
5 changes: 5 additions & 0 deletions src/dvsim/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright lowRISC contributors (OpenTitan project).
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0

"""Job runtime backends."""
114 changes: 114 additions & 0 deletions src/dvsim/runtime/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright lowRISC contributors (OpenTitan project).
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0

"""Runtime backend abstract base class."""

from abc import ABC, abstractmethod
from collections.abc import Hashable, Iterable

from dvsim.job.data import JobSpec
from dvsim.job.status import JobStatus
from dvsim.logging import log
from dvsim.runtime.data import (
CompletionCallback,
JobCompletionEvent,
JobHandle,
)


class RuntimeBackend(ABC):
"""Abstraction for a backend that launches, maintains, polls and kills a job.

Provides methods to prepare an environment for running a job, launching the job,
polling for its completion, killing it, and doing some cleanup activities.
"""

name: str
"""The name of the backend."""

max_parallelism: int = 0
"""The maximum number of jobs that can be run at any time via this backend. The scheduler
should respect the parallelism limit defined here.
"""

max_output_dirs: int = 5
"""If a history of previous invocations is to be maintained, keep at most this many dirs."""

supports_interactive: bool = False
"""Whether this backend supports jobs in interactive mode (transparent stdin/stdout)."""

def __init__(self, *, max_parallelism: int | None = None) -> None:
"""Construct a runtime backend.

Args:
max_parallelism: The maximum number of jobs that can be dispatched to this backend
at once. `0` means no limit, `None` means no override is applied to the default.

"""
if max_parallelism is not None:
self.max_parallelism = max_parallelism

self._completion_callback: CompletionCallback | None = None

def attach_completion_callback(self, callback: CompletionCallback) -> None:
"""Attach a callback for completed events, to notify the scheduler.

Args:
callback: the callback to use for job completion events.

"""
self._completion_callback = callback

async def _emit_completion(self, events: Iterable[JobCompletionEvent]) -> None:
"""Mark a job as now being in some completed/terminal state by notifying the scheduler."""
if self._completion_callback is None:
raise RuntimeError("Backend not attached to the scheduler")

for event in events:
log.debug(
"Job %s completed execution: %s", event.spec.qual_name, event.status.shorthand
)
if event.status != JobStatus.PASSED and event.reason is not None:
log.verbose(
"Job %s has status '%s' instead of 'Passed'. Reason: %s",
event.spec.qual_name,
event.status.name.capitalize(),
event.reason.message,
)

await self._completion_callback(events)

@abstractmethod
async def submit_many(self, jobs: Iterable[JobSpec]) -> dict[Hashable, JobHandle]:
"""Submit & launch multiple jobs.

Returns:
mapping from job.id -> JobHandle. Entries are only present for jobs that successfully
launched; jobs that failed in a non-fatal way are missing, and should be retried.

"""

async def submit(self, job: JobSpec) -> JobHandle | None:
Comment thread
AlexJones0 marked this conversation as resolved.
"""Submit & launch a job, returning a handle for that job.

If the job failed to launch in a non-fatal way (e.g. backend is busy), None is returned
instead, and the job should be re-submitted at some later time.
"""
result = await self.submit_many([job])
return result.get(job.id)

@abstractmethod
async def kill_many(self, handles: Iterable[JobHandle]) -> None:
"""Cancel ongoing jobs via their handle. Killed jobs should still "complete"."""

async def kill(self, handle: JobHandle) -> None:
"""Cancel an ongoing job via its handle. Killed jobs should still "complete"."""
await self.kill_many([handle])

async def close(self) -> None: # noqa: B027
"""Release any resources that the backend holds; called when the scheduler completes.

The default implementation just does nothing.

"""
59 changes: 59 additions & 0 deletions src/dvsim/runtime/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright lowRISC contributors (OpenTitan project).
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0

"""Various dataclasses used by the different runtime backends when executing jobs."""

from collections.abc import Awaitable, Callable, Hashable, Iterable
from dataclasses import dataclass
from typing import TypeAlias

from dvsim.job.data import JobSpec, JobStatusInfo
from dvsim.job.status import JobStatus
from dvsim.job.time import JobTime

__all__ = (
"CompletionCallback",
"JobCompletionEvent",
"JobHandle",
)


@dataclass(kw_only=True)
class JobHandle:
"""A handle for a job that is actively executing on some backend."""

spec: JobSpec
backend: str

# TODO: these are necessary for now because they are exposed in the CompletedJobStatus.
# It would be nice to figure out a better mechanism for these.
job_runtime: JobTime
simulated_time: JobTime

@property
def job_id(self) -> Hashable:
"""Returns an object that uniquely identifies the job. Alias of self.spec.id."""
return self.spec.id


@dataclass(frozen=True)
class JobCompletionEvent:
"""Event emitted when a job finishes."""

# TODO: ideally we would rather store a `job_id: Hashable` here instead of the full spec, but
# this is needed to access the `post_finish` callback in `RuntimeBackend._emit_completion`.
# When these `pre_launch`/`post_finish` callbacks are refactored/removed, this can be changed.
spec: JobSpec
"""The specification for the job that has been completed."""
status: JobStatus
"""The terminal/final status of the completed job."""
reason: JobStatusInfo | None
Comment thread
AlexJones0 marked this conversation as resolved.
"""The reason to report as to why the job is in the specified terminal state.
Typically only reported for non-successful executions, as it is used for e.g. failure message
buckets - success is implicit.
"""


# Callback (async) for (batches of) job completion events
CompletionCallback: TypeAlias = Callable[[Iterable[JobCompletionEvent]], Awaitable[None]]
Loading
Loading