Skip to content

Commit af16b87

Browse files
committed
Start propagating "no work to do" in adjustQuantum.
This will allow code in ctrl_mpexec to delegate to adjustQuantum before calling runQuantum, to test whether running is necessary in the exact same way it is tested at QG generation time.
1 parent e2a2d1a commit af16b87

1 file changed

Lines changed: 22 additions & 0 deletions

File tree

python/lsst/pipe/base/connections.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from . import config as configMod
3737
from .connectionTypes import (InitInput, InitOutput, Input, PrerequisiteInput,
3838
Output, BaseConnection, BaseInput)
39+
from .executed_quantum import NoWorkQuantum
3940
from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, NamedKeyMapping, Quantum
4041

4142
if typing.TYPE_CHECKING:
@@ -498,6 +499,10 @@ def adjustQuantum(
498499
Overrides of this function have the option of raising an Exception
499500
if a field in the input does not satisfy a need for a corresponding
500501
pipelineTask, i.e. no reference catalogs are found.
502+
NoWorkQuantum
503+
Raised to indicate that this quantum should not be run; one or more
504+
of its expected inputs do not exist, and if possible, should be
505+
pruned from the QuantumGraph.
501506
502507
Notes
503508
-----
@@ -512,6 +517,23 @@ def adjustQuantum(
512517
f"for scalar connection {label}.{name} ({connection.name}) "
513518
f"for quantum data ID {dataId}."
514519
)
520+
if not connection.optional and not refs:
521+
if isinstance(connection, PrerequisiteInput):
522+
# This branch should only be possible during QG generation,
523+
# or if someone deleted the dataset between making the QG
524+
# and trying to run it. Either one should be a hard error.
525+
raise FileNotFoundError(
526+
f"No datasets found for non-optional connection {label}.{name} ({connection.name}) "
527+
f"for quantum data ID {dataId}."
528+
)
529+
else:
530+
# This branch should be impossible during QG generation,
531+
# because that algorithm can only make quanta whose inputs
532+
# are either already present or should be created during
533+
# execution. It can trigger during execution if the input
534+
# wasn't actually created by an upstream task in the same
535+
# graph.
536+
raise NoWorkQuantum(label, name, connection)
515537
return ()
516538

517539
def translateAdjustQuantumInputs(

0 commit comments

Comments
 (0)