Skip to content

Commit 4f1c9e1

Browse files
committed
First draft of status objects for PipelineTask execution.
1 parent 1d1eba0 commit 4f1c9e1

2 files changed

Lines changed: 271 additions & 0 deletions

File tree

python/lsst/pipe/base/connections.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,24 @@ def translateAdjustQuantumInputs(
547547
)
548548
return results
549549

550+
def hasPostWriteLogic(self):
551+
"""Test whether this `PipelineTask` can fail even after all outputs
552+
have been written.
553+
554+
When this returns `False` (the default base class behavior), execution
555+
harnesses and QuantumGraph generation algorithms may assume that:
556+
557+
- any quantum execution that yielded all predicted outputs was a
558+
success, without checking actual exit status.
559+
560+
- any quantum execution that yields no predicted outputs can be
561+
treated as if `NoWorkQuantum` was raised.
562+
563+
These assumptions enable important optimizations in code that attempts
564+
to quickly determine the status of an executed quantum.
565+
"""
566+
return False
567+
550568

551569
def iterConnections(connections: PipelineTaskConnections,
552570
connectionType: Union[str, Iterable[str]]
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
# This file is part of pipe_base.
2+
#
3+
# Developed for the LSST Data Management System.
4+
# This product includes software developed by the LSST Project
5+
# (http://www.lsst.org).
6+
# See the COPYRIGHT file at the top-level directory of this distribution
7+
# for details of code ownership.
8+
#
9+
# This program is free software: you can redistribute it and/or modify
10+
# it under the terms of the GNU General Public License as published by
11+
# the Free Software Foundation, either version 3 of the License, or
12+
# (at your option) any later version.
13+
#
14+
# This program is distributed in the hope that it will be useful,
15+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17+
# GNU General Public License for more details.
18+
#
19+
# You should have received a copy of the GNU General Public License
20+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
21+
22+
__all__ = (
23+
)
24+
25+
import enum
26+
from typing import FrozenSet
27+
import pydantic
28+
29+
from lsst.daf.butler import DatasetId
30+
31+
32+
class NoWorkQuantum(BaseException):
33+
"""An exception raised when a Quantum should not exist because there is no
34+
work for it to do.
35+
36+
This usually occurs because a non-optional input dataset is not present, or
37+
a spatiotemporal overlap that was conservatively predicted does not
38+
actually exist.
39+
40+
This inherits from BaseException because it is used to signal a case that
41+
we don't consider a real error, even though we often want to use try/except
42+
logic to trap it.
43+
"""
44+
45+
46+
class RepeatableQuantumError(RuntimeError):
47+
"""Exception that may be raised by PipelineTasks (and code they delegate
48+
to) in order to indicate that a repeatable problem that will not be
49+
addressed by retries.
50+
51+
This usually indicates that the algorithm and the data it has been given
52+
are somehow incompatible, and the task should run fine on most other data.
53+
54+
This exception may be used as a base class for more specific questions, or
55+
used directly while chaining another exception, e.g.::
56+
57+
try:
58+
run_code()
59+
except SomeOtherError as err:
60+
raise RepeatableQuantumError() from err
61+
62+
This may be used for missing input data when the desired behavior is to
63+
cause all downstream tasks being run be blocked, forcing the user to
64+
address the problem. When the desired behavior is to skip this quantum and
65+
attempt downstream tasks (or skip them) without its outputs, raise
66+
`NoWorkQuantum` instead. When the desired behavior is to write only some
67+
outputs, the task should exit as usual and will be considered a success.
68+
"""
69+
70+
71+
class InvalidQuantumError(Exception):
72+
"""Exception that may be raised by PipelineTasks (and code they delegate
73+
to) in order to indicate logic bug or configuration problem.
74+
75+
This usually indicates that the configured algorithm itself is invalid and
76+
will not run on a significant fraction of quanta (often all of them).
77+
78+
This exception may be used as a base class for more specific questions, or
79+
used directly while chaining another exception, e.g.::
80+
81+
try:
82+
run_code()
83+
except SomeOtherError as err:
84+
raise RepeatableQuantumError() from err
85+
86+
Raising this exception in `PipelineTask.runQuantum` or something it calls
87+
is a last resort - whenever possible, such problems should cause exceptions
88+
in ``__init__`` or in QuantumGraph generation. It should never be used
89+
for missing data.
90+
"""
91+
92+
93+
class QuantumStatusCategory(enum.Enum):
94+
SUCCEEDED = enum.auto()
95+
"""Quantum ran to completion.
96+
97+
This usually means at least some predicted outputs were actually produced,
98+
but does not guarantee it unless
99+
`PipelineTaskConnections.hasPostWriteLogic` returns `False`.
100+
"""
101+
102+
NO_WORK_FOUND = enum.auto()
103+
"""Quantum was run but found it had no work to do, and produced no outputs
104+
(other than metadata).
105+
106+
Rerunning a task that had this status will change the result only if its
107+
`~ExtendedQuantumStatus.available_inputs` change. This status may be
108+
invoked by a `PipelineTask` by raising `NoWorkQuantum`.
109+
"""
110+
111+
NO_WORK_SKIPPED = enum.auto()
112+
"""Quantum was not run by the execution harness, because at least one
113+
required input was predicted but not actually produced by an upstream task.
114+
115+
Tasks with this state should have metadata written directly by the
116+
execution harness, and should never be rerun unless its
117+
`~ExtendedQuantumStatus.available_inputs` change such that all required
118+
inputs are now available.
119+
"""
120+
121+
INTERRUPTED = enum.auto()
122+
"""Quantum caught an external signal indicating it should stop execution,
123+
and then shut down cleanly.
124+
125+
This state should never be set if all predicted outputs were produced and
126+
`PipelineTaskConnections.hasPostWriteLogic` returns `False`; execution
127+
harnesses should record this as a success even if a last-moment
128+
interruption attempt was detected.
129+
"""
130+
131+
FAILED_EXCEPTION = enum.auto()
132+
"""Quantum raised a Python exception that was caught by the execution
133+
harness.
134+
135+
This state does not attempt to distinguish between repeatable problems
136+
and transient ones; rerunning a quantum with this status may or may not
137+
yield a different result.
138+
"""
139+
140+
FAILED_UNKNOWN = enum.auto()
141+
"""Quantum failed for an unknown reason.
142+
143+
This state does not attempt to distinguish between repeatable problems
144+
and transient ones; rerunning a quantum with this status may or may not
145+
yield a different result.
146+
147+
This state cannot usually be set by Python execution harnesses that run
148+
in the same process as the task code, but it may be set by higher-level
149+
systems in the case of e.g. segfaults, and should be assumed in cases where
150+
the file or dataset that would normally contain status information isn't
151+
present at all.
152+
"""
153+
154+
FAILED_REPEATABLE = enum.auto()
155+
"""Quantum failed due to a problem that the task was able to recognize as
156+
non-transient and highly likely to affect any attempt to rerun this
157+
quantum.
158+
159+
This status can be invoked by a `PipelineTask` by raising
160+
`RepeatableQuantumError`.
161+
"""
162+
163+
FAILED_INVALID = enum.auto()
164+
"""Quantum failed because of a configuration problem or task logic bug that
165+
must be fixed by the user.
166+
167+
Execution harnesses may shut down entire runs if this status is detected in
168+
any quantum.
169+
170+
This should be set if a task failure (not an interruption) occurs after all
171+
predicted outputs have been produced and
172+
`PipelineTaskConnections.hasPostWriteLogic` returns `False`, as this
173+
indicates that this method has been implemented incorrectly.
174+
175+
This status can be invoked by a `PipelineTask` by raising
176+
`InvalidQuantumError`.
177+
"""
178+
179+
@property
180+
def can_retry(self) -> bool:
181+
return self is self.FAILED_EXCEPTION or self is self.INTERRUPTED
182+
183+
@property
184+
def is_no_work(self) -> bool:
185+
return self is self.NO_WORK_FOUND or self is self.NO_WORK_SKIPPED
186+
187+
@property
188+
def is_success(self) -> bool:
189+
return self is self.SUCCEEDED or self is self.is_no_work
190+
191+
@property
192+
def is_failure(self) -> bool:
193+
return (
194+
self is self.FAILED_EXCEPTION
195+
or self is self.FAILED_UNKNOWN
196+
or self is self.FAILED_REPEATABLE
197+
or self is self.FAILED_INVALID
198+
)
199+
200+
201+
class ExtendedQuantumStatus(pydantic.BaseModel):
202+
"""Struct used to record the state of a quantum that has been run.
203+
"""
204+
205+
category: QuantumStatusCategory
206+
"""Category describing the qualitative execution status for this quantum.
207+
"""
208+
209+
available_inputs: FrozenSet[DatasetId] = frozenset()
210+
"""IDs of all input datasets that were actually available to the task
211+
at execution time.
212+
213+
This may differ from the predicted inputs by removal of datasets that
214+
were not actually produced by upstream tasks.
215+
216+
This field will be set for all quanta for which provenance is successfully
217+
written, regardless of status category.
218+
"""
219+
220+
actual_inputs: FrozenSet[DatasetId] = frozenset()
221+
"""IDs of all input datasets actually used by the task.
222+
223+
Any dataset that can affect the output of the algorithm should be included.
224+
For example, if a dataset is ultimately identified as some kind of outlier,
225+
but was itself used in the determination of whether other datasets were or
226+
were not outliers, it is still considered an actual input.
227+
228+
If a `PipelineTask` never reads a dataset at all, it will automatically be
229+
removed from `actual_inputs`. It may also explicitly call
230+
`ButlerQuantumContext.makeInputUnused`.
231+
"""
232+
233+
actual_outputs: FrozenSet[DatasetId] = frozenset()
234+
"""IDs of all output dataset actually produced by this task.
235+
236+
This is set automatically by calls to `ButlerQuantumContext.put`;
237+
`PipelineTask` authors should not have to do anything manually.
238+
"""
239+
240+
#
241+
# Notably missing:
242+
#
243+
# - Quantum identifiers. I'd like to wait for DM-30266, and then we need
244+
# to think about how much we want to normalize/denormalize predicted and
245+
# executed quantum information.
246+
#
247+
# - Exception objects. These look like a pain to serialize well, but
248+
# doing it well seems really valuable. Maybe
249+
# https://github.com/ionelmc/python-tblib?
250+
#
251+
# - Host information and resource usage. Just haven't gotten around to it,
252+
# and I bet other people have schemas I should just copy from.
253+
#

0 commit comments

Comments
 (0)