Skip to content

Commit f4ce24d

Browse files
committed
made functions more readable
1 parent 2a47dfa commit f4ce24d

1 file changed

Lines changed: 82 additions & 41 deletions

File tree

synapseclient/models/services/manifest.py

Lines changed: 82 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -865,7 +865,7 @@ def _create_upload_tasks(
865865
1. Collects the already-created asyncio.Task objects for its
866866
prerequisites -- these are guaranteed to exist because of the
867867
topological ordering.
868-
2. Creates a new asyncio.Task wrapping _upload_item_async, passing
868+
2. Creates a new asyncio.Task wrapping _upload_file_async, passing
869869
the prerequisite tasks in. That function calls asyncio.wait() on
870870
them before uploading, so the file will not start uploading until
871871
its dependencies finish.
@@ -900,8 +900,8 @@ def _create_upload_tasks(
900900

901901
upload_item = upload_plan.path_to_upload_item[file_path]
902902
file_task = asyncio.create_task(
903-
_upload_item_async(
904-
item=upload_item.entity,
903+
_upload_file_async(
904+
file_entity=upload_item.entity,
905905
used=upload_item.used,
906906
executed=upload_item.executed,
907907
activity_name=upload_item.activity_name,
@@ -935,7 +935,7 @@ def _build_activity_linkage(
935935
version, returned as UsedEntity).
936936
- If none match, raises ValueError.
937937
938-
The resolved_file_ids dict is built by _upload_item_async after
938+
The resolved_file_ids dict is built by _upload_file_async after
939939
prerequisite uploads finish, mapping each uploaded file's local path to
940940
the Synapse ID it received. This is how provenance references between
941941
files in the same manifest batch get wired up: file A uploads first and
@@ -947,7 +947,7 @@ def _build_activity_linkage(
947947
a File object (already resolved from provenance), or a string that is
948948
a local file path, URL, or Synapse ID.
949949
resolved_file_ids: A dictionary that maps the local path of a file to the
950-
Synapse ID it received after upload. Populated by _upload_item_async
950+
Synapse ID it received after upload. Populated by _upload_file_async
951951
once prerequisite uploads complete.
952952
953953
Returns:
@@ -1005,8 +1005,8 @@ def _resolve_linkage_item(
10051005
)
10061006

10071007

1008-
async def _upload_item_async(
1009-
item: File,
1008+
async def _upload_file_async(
1009+
file_entity: File,
10101010
used: Iterable[str | File],
10111011
executed: Iterable[str | File],
10121012
activity_name: str,
@@ -1016,8 +1016,31 @@ async def _upload_item_async(
10161016
) -> File:
10171017
"""Upload a single file, waiting for any provenance dependencies to finish first.
10181018
1019+
This function is invoked as an asyncio.Task by _create_upload_tasks. Many
1020+
instances run concurrently, but each one self-serializes by awaiting only
1021+
its specific prerequisites. Files with no dependencies start uploading
1022+
immediately in parallel.
1023+
1024+
The flow:
1025+
1026+
1. Wait for prerequisites -- if this file declares provenance on other
1027+
files in the same manifest batch (e.g. "file B was derived from
1028+
file A"), those files must be uploaded first so they have Synapse IDs.
1029+
asyncio.wait blocks until all prerequisite upload tasks finish, then
1030+
a path-to-Synapse-ID mapping is collected from their results.
1031+
2. Build provenance linkages -- converts the raw used and executed
1032+
references (local paths, URLs, Synapse IDs, or File objects) into
1033+
typed UsedEntity/UsedURL objects. Local paths are resolved to Synapse
1034+
IDs using the mapping from step 1.
1035+
3. Attach Activity -- if any provenance references exist, creates an
1036+
Activity with the name, description, and linkages, and attaches it
1037+
to the file.
1038+
4. Store -- calls file_entity.store_async() to perform the actual upload.
1039+
5. Return -- the returned File (now with a Synapse ID) becomes available
1040+
to downstream tasks that depend on it via the resolved mapping.
1041+
10191042
Arguments:
1020-
item: The File entity to upload.
1043+
file_entity: The File entity to upload.
10211044
used: Provenance used references (paths, URLs, Synapse IDs, or File
10221045
objects).
10231046
executed: Provenance executed references.
@@ -1036,35 +1059,45 @@ async def _upload_item_async(
10361059
"""
10371060
from synapseclient.models import Activity
10381061

1062+
# Step 1: Wait for prerequisite uploads to finish and collect their
1063+
# Synapse IDs so provenance references can point to them.
10391064
resolved_file_ids: dict[str, str] = {}
10401065
if prerequisite_tasks:
10411066
finished_dependencies, pending = await asyncio.wait(
10421067
prerequisite_tasks, return_when=asyncio.ALL_COMPLETED
10431068
)
1069+
# Defensive check: ALL_COMPLETED guarantees pending is empty, but
1070+
# guard against unexpected asyncio behavior or future refactors.
10441071
if pending:
10451072
raise RuntimeError(
1046-
f"There were {len(pending)} dependencies left when storing {item}"
1073+
f"There were {len(pending)} dependencies left when storing {file_entity}"
10471074
)
10481075
for finished_dependency in finished_dependencies:
1049-
result = finished_dependency.result()
1076+
result: File = finished_dependency.result()
10501077
resolved_file_ids[result.path] = result.id
10511078

1079+
# Step 2: Convert raw provenance references (local paths, URLs, Synapse
1080+
# IDs, File objects) into typed UsedEntity/UsedURL objects. Local paths
1081+
# are resolved to Synapse IDs using the mapping built in step 1.
10521082
used_activity = _build_activity_linkage(
10531083
used_or_executed=used, resolved_file_ids=resolved_file_ids
10541084
)
10551085
executed_activity = _build_activity_linkage(
10561086
used_or_executed=executed, resolved_file_ids=resolved_file_ids
10571087
)
10581088

1089+
# Step 3: Attach an Activity to the file if provenance was declared.
10591090
if used_activity or executed_activity:
1060-
item.activity = Activity(
1091+
file_entity.activity = Activity(
10611092
name=activity_name,
10621093
description=activity_description,
10631094
used=used_activity,
10641095
executed=executed_activity,
10651096
)
1066-
await item.store_async(synapse_client=syn)
1067-
return item
1097+
1098+
# Step 4: Upload and return the file (now with a Synapse ID).
1099+
await file_entity.store_async(synapse_client=syn)
1100+
return file_entity
10681101

10691102

10701103
def _split_csv_cell(input_string: str) -> list[str]:
@@ -1142,24 +1175,20 @@ async def _resolve_provenance_column(
11421175
if isinstance(cell, list):
11431176
items = cell
11441177
else:
1145-
# cell is guaranteed to be a str here; .strip() is safe.
11461178
items = list(cell.split(";")) if cell.strip() != "" else []
11471179

1148-
return [
1149-
r
1150-
for r in await asyncio.gather(
1151-
*[
1152-
_resolve_provenance_item(
1153-
item.strip() if isinstance(item, str) else item,
1154-
owner_path=path,
1155-
syn=syn,
1156-
df=df,
1157-
)
1158-
for item in items
1159-
]
1160-
)
1161-
if r is not None
1162-
]
1180+
resolved = await asyncio.gather(
1181+
*[
1182+
_resolve_provenance_item(
1183+
item.strip() if isinstance(item, str) else item,
1184+
owner_path=path,
1185+
syn=syn,
1186+
df=df,
1187+
)
1188+
for item in items
1189+
]
1190+
)
1191+
return [item for item in resolved if item is not None]
11631192

11641193

11651194
async def _resolve_provenance_item(
@@ -1198,19 +1227,31 @@ async def _resolve_provenance_item(
11981227

11991228

12001229
async def _resolve_local_file_provenance(
1201-
item: str, owner_path: str, syn: Synapse, df: DataFrame
1230+
raw_path: str, owner_path: str, syn: Synapse, manifest_by_path: DataFrame
12021231
) -> str | File:
12031232
"""Resolve a local file path to either an in-batch path or a Synapse File.
12041233
1205-
If the file is part of the current upload batch (present in df.index),
1206-
returns its absolute path. Otherwise, looks it up in Synapse by MD5 hash.
1234+
Given a manifest row that declares provenance on a local file, this
1235+
function determines where that file is:
1236+
1237+
1. If the file does not exist on disk, the provenance reference is
1238+
broken and a SynapseProvenanceError is raised.
1239+
2. If the file is in the current upload batch (present in
1240+
manifest_by_path.index), its absolute path is returned as a string.
1241+
The Synapse ID will be resolved later, after that file is uploaded.
1242+
3. If the file exists on disk but is not being uploaded, it is looked
1243+
up in Synapse by MD5 hash. If found, the File object is returned
1244+
so its Synapse ID can be used for provenance. If not found, a
1245+
SynapseProvenanceError is raised because the reference cannot be
1246+
linked.
12071247
12081248
Arguments:
1209-
item: A local file path string from a provenance cell.
1249+
raw_path: A local file path string from a provenance cell, not yet
1250+
expanded or normalized.
12101251
owner_path: The manifest file path of the file that declares this
12111252
provenance reference. Used only for error messages.
12121253
syn: Authenticated Synapse client.
1213-
df: The manifest DataFrame (path-indexed).
1254+
manifest_by_path: The manifest DataFrame indexed by absolute file path.
12141255
12151256
Returns:
12161257
str — the absolute path if the file is in the upload batch.
@@ -1222,21 +1263,21 @@ async def _resolve_local_file_provenance(
12221263
"""
12231264
from synapseclient.models.file import File
12241265

1225-
item_path = _expand_path(item)
1266+
absolute_path = _expand_path(raw_path)
12261267

1227-
if not os.path.isfile(item_path):
1268+
if not os.path.isfile(absolute_path):
12281269
raise SynapseProvenanceError(
12291270
f"The provenance record for file: {owner_path} is incorrect.\n"
1230-
f"Specifically {item} is not an existing file path, a valid URL, or a Synapse ID."
1271+
f"Specifically {raw_path} is not an existing file path, a valid URL, or a Synapse ID."
12311272
)
12321273

1233-
if item_path in df.index:
1234-
return item_path
1274+
if absolute_path in manifest_by_path.index:
1275+
return absolute_path
12351276

12361277
try:
1237-
return await File.from_path_async(path=item_path, synapse_client=syn)
1278+
return await File.from_path_async(path=absolute_path, synapse_client=syn)
12381279
except SynapseFileNotFoundError as e:
12391280
raise SynapseProvenanceError(
12401281
f"The provenance record for file: {owner_path} is incorrect.\n"
1241-
f"Specifically {item_path} is not being uploaded and is not in Synapse."
1282+
f"Specifically {absolute_path} is not being uploaded and is not in Synapse."
12421283
) from e

0 commit comments

Comments
 (0)