Skip to content

Commit 87629c1

Browse files
committed
add find_required_input_source and SkipOperation
1 parent c92bc7f commit 87629c1

2 files changed

Lines changed: 36 additions & 6 deletions

File tree

examples/flywheel_analyzer_engage.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ def define_analysis(gear_name, acquisition_label, create_inputs):
6060

6161

6262
def reactivity_inputs(acquisition_label, acquisitions, session, **kwargs):
63-
functional = fa.find(acquisitions, label=acquisition_label)
63+
functional = fa.find_required_input_source(acquisitions, label=acquisition_label)
64+
# using plain find() here b/c this T1w might be missing
6465
structural = fa.find(acquisitions, label='T1w 1mm')
6566
if not structural:
6667
assert session['_id'] in second_to_first_visit_id,\
@@ -70,6 +71,7 @@ def reactivity_inputs(acquisition_label, acquisitions, session, **kwargs):
7071
first_visit_acquisitions = client.request(
7172
'sessions/{}/acquisitions'.format(first_visit_session_id)).json()
7273
structural = fa.find(first_visit_acquisitions, label='T1w 1mm')
74+
assert structural, 'Session {} is missing a structural.'.format(session['_id'])
7375

7476
return dict(
7577
functional=functional.find_file('*.nii.gz'),
@@ -78,7 +80,7 @@ def reactivity_inputs(acquisition_label, acquisitions, session, **kwargs):
7880

7981

8082
def connectivity_inputs(acquisition_label, analyses, **kwargs):
81-
reactivity = fa.find(
83+
reactivity = fa.find_required_input_source(
8284
analyses, label=analysis_label('reactivity-preprocessing', acquisition_label))
8385

8486
return dict(
@@ -89,11 +91,11 @@ def connectivity_inputs(acquisition_label, analyses, **kwargs):
8991

9092

9193
def first_level_model_inputs(acquisition_label, analyses, acquisitions, **kwargs):
92-
reactivity = fa.find(
94+
reactivity = fa.find_required_input_source(
9395
analyses, label=analysis_label('reactivity-preprocessing', acquisition_label))
94-
connectivity = fa.find(
96+
connectivity = fa.find_required_input_source(
9597
analyses, label=analysis_label('connectivity-preprocessing', acquisition_label))
96-
behavioral = fa.find(
98+
behavioral = fa.find_required_input_source(
9799
acquisitions, label='Behavioral and Physiological')
98100

99101
return dict(

scitran_client/flywheel_analyzer.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,16 @@ def find(items, _constructor_=FlywheelFileContainer, **kwargs):
117117
return result and _constructor_(result)
118118

119119

120+
def find_required_input_source(items, **kwargs):
121+
'''Finds a match to `kwargs` in `items` by using `find()`. If this match is not
122+
found, the current operation will be skipped.
123+
'''
124+
result = find(items, **kwargs)
125+
if not result:
126+
raise SkipOperation('could not find match to {}'.format(kwargs))
127+
return result
128+
129+
120130
def find_project(**kwargs):
121131
'''Finds a project that matches the key, value pairs in `kwargs`.
122132
@@ -130,6 +140,19 @@ class ShuttingDownException(Exception):
130140
shutting_down = False
131141

132142

143+
class SkipOperation(Exception):
144+
'''
145+
SkipOperation can be thrown from a `create_inputs` function to skip the execution of that
146+
operation. This is a way to more dynamically create operation graphs by discarding nodes
147+
at runtime.
148+
149+
For example, if every session has a variable number of functional acquisitions that need to be
150+
processed, you can define operations for the max number of per-session functional acquisitions,
151+
and throw SkipOperation for all operations corresponding to acquisitions missing for a session.
152+
'''
153+
pass
154+
155+
133156
def request(*args, **kwargs):
134157
# HACK client is a module variable for now. In the future, we should pass client around.
135158
assert 'client' in state, 'client must be installed in state before using request. See `installed_client`.'
@@ -208,7 +231,12 @@ def _analyze_session(operations, gears_by_name, session):
208231
# have completed analysis
209232
if not acquisitions:
210233
acquisitions = request('sessions/{}/acquisitions'.format(session_id))
211-
job_inputs = create_inputs(analyses=analyses, acquisitions=acquisitions, session=session)
234+
try:
235+
job_inputs = create_inputs(analyses=analyses, acquisitions=acquisitions, session=session)
236+
except SkipOperation:
237+
# we skip to the next operation
238+
continue
239+
212240
job_config = _defaults_for_gear(gears_by_name[gear_name])
213241

214242
# When create_inputs returns a tuple, we unpack it into job_inputs and job_config.

0 commit comments

Comments
 (0)