Skip to content

Commit b58ebe8

Browse files
committed
created copy of SyncUploader in manifest.py
1 parent e2e3004 commit b58ebe8

3 files changed

Lines changed: 263 additions & 8 deletions

File tree

synapseclient/models/mixins/storable_container.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@
4747
from synapseclient.models.protocols.storable_container_protocol import (
4848
StorableContainerSynchronousProtocol,
4949
)
50-
from synapseclient.models.services.manifest import read_manifest_for_upload
50+
from synapseclient.models.services.manifest import (
51+
SyncUploader,
52+
read_manifest_for_upload,
53+
)
5154
from synapseclient.models.services.storable_entity_components import (
5255
MANIFEST_UPLOAD_MAX_RETRIES,
5356
FailureStrategy,
@@ -577,7 +580,6 @@ async def main():
577580
from tqdm import tqdm
578581

579582
from synapseutils.monitor import notify_me_async
580-
from synapseutils.sync import _SyncUploader
581583

582584
syn = Synapse.get_client(synapse_client=synapse_client)
583585

@@ -609,7 +611,7 @@ async def main():
609611
)
610612
with upload_shared_progress_bar(progress_bar):
611613
try:
612-
uploader = _SyncUploader(syn)
614+
uploader = SyncUploader(syn)
613615
if send_messages:
614616
notify_decorator = notify_me_async(
615617
syn, f"Upload from {manifest_path}", retries=retries

synapseclient/models/services/manifest.py

Lines changed: 240 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import datetime
88
import os
99
import re
10+
from dataclasses import dataclass
1011
from typing import TYPE_CHECKING, Iterable, NamedTuple
1112

1213
from synapseclient import Synapse
@@ -18,6 +19,7 @@
1819
from synapseclient.core.utils import (
1920
bool_or_none,
2021
datetime_or_none,
22+
get_synid_and_version,
2123
is_synapse_id_str,
2224
is_url,
2325
test_import_pandas,
@@ -29,6 +31,7 @@
2931
if TYPE_CHECKING:
3032
from pandas import DataFrame, Series
3133

34+
from synapseclient.models import UsedEntity, UsedURL
3235
from synapseclient.models.file import File
3336

3437
#: Scalar types that Synapse supports as annotation values.
@@ -74,11 +77,6 @@
7477
_FILE_NAME_PATTERN = re.compile(r"^[`\w \-\+\.\(\)]{1,256}$")
7578

7679

77-
def _expand_path(path: str) -> str:
78-
"""Expand ``~`` and environment variables, then return the absolute path."""
79-
return os.path.abspath(os.path.expandvars(os.path.expanduser(path)))
80-
81-
8280
class SyncUploadItem(NamedTuple):
8381
"""Represents a single file being uploaded.
8482
@@ -98,6 +96,238 @@ class SyncUploadItem(NamedTuple):
9896
activity_description: str | None
9997

10098

99+
@dataclass
100+
class SyncUploader:
101+
"""Manages the uploads associated with a sync_to_synapse call.
102+
103+
Files will be uploaded concurrently and in an order that honours any
104+
interdependent provenance.
105+
"""
106+
107+
syn: Synapse
108+
109+
@dataclass
110+
class DependencyGraph:
111+
"""The graph that represents the dependencies of the files to be uploaded.
112+
113+
Attributes:
114+
path_to_dependencies: A dictionary where the key is the path of the file and
115+
the value is a list of paths that need to be uploaded before the key can
116+
be uploaded.
117+
path_to_upload_item: A dictionary where the key is the path of the file and
118+
the value is the upload item that is associated with the file.
119+
path_to_file_check: A dictionary where the key is the path of the file and
120+
the value is a boolean that represents if the file is a file or not.
121+
"""
122+
123+
path_to_dependencies: dict[str, list[str]]
124+
path_to_upload_item: dict[str, SyncUploadItem]
125+
path_to_file_check: dict[str, bool]
126+
127+
def _build_dependency_graph(
128+
self, items: Iterable[SyncUploadItem]
129+
) -> DependencyGraph:
130+
"""Determine the order in which the files should be uploaded based on their
131+
dependencies. This will also verify that the dependencies are valid and that
132+
there are no cycles in the graph.
133+
134+
Arguments:
135+
items: The list of items to upload.
136+
137+
Return:
138+
A graph that represents information about how to upload the graph of items
139+
into Synapse.
140+
"""
141+
items_by_path = {i.entity.path: i for i in items}
142+
graph: dict[str, list[str]] = {}
143+
resolved_file_checks: dict[str, bool] = {}
144+
145+
for item in items:
146+
item_file_provenance: list[str] = []
147+
for provenance_dependency in item.used + item.executed:
148+
# File objects (already in Synapse) are not local-path
149+
# dependencies — skip them in the dependency graph.
150+
if not isinstance(provenance_dependency, str):
151+
continue
152+
if provenance_dependency in resolved_file_checks:
153+
is_file = resolved_file_checks[provenance_dependency]
154+
else:
155+
is_file = os.path.isfile(provenance_dependency)
156+
resolved_file_checks[provenance_dependency] = is_file
157+
if is_file:
158+
if provenance_dependency not in items_by_path:
159+
raise ValueError(
160+
f"{item.entity.path} depends on"
161+
f" {provenance_dependency} which is not being uploaded"
162+
)
163+
item_file_provenance.append(provenance_dependency)
164+
165+
graph[item.entity.path] = item_file_provenance
166+
167+
graph_sorted = topolgical_sort(graph)
168+
path_to_dependencies_sorted: dict[str, list[str]] = {}
169+
path_to_upload_items_sorted: dict[str, SyncUploadItem] = {}
170+
for path, dependency_paths in graph_sorted:
171+
path_to_dependencies_sorted[path] = dependency_paths
172+
path_to_upload_items_sorted[path] = items_by_path.get(path)
173+
174+
return self.DependencyGraph(
175+
path_to_dependencies=path_to_dependencies_sorted,
176+
path_to_upload_item=path_to_upload_items_sorted,
177+
path_to_file_check=resolved_file_checks,
178+
)
179+
180+
def _build_tasks_from_dependency_graph(
181+
self, dependency_graph: DependencyGraph
182+
) -> list[asyncio.Task]:
183+
"""Build the asyncio tasks that will be used to upload the files in the correct
184+
order based on their dependencies.
185+
186+
Arguments:
187+
dependency_graph: The graph that represents the dependencies of the files to
188+
be uploaded.
189+
190+
Return:
191+
A list of asyncio tasks that will upload the files in the correct order.
192+
"""
193+
created_tasks_by_path: dict[str, asyncio.Task] = {}
194+
195+
for (
196+
file_path,
197+
dependent_file_paths,
198+
) in dependency_graph.path_to_dependencies.items():
199+
dependent_tasks: list[asyncio.Task] = []
200+
for dependent_file in dependent_file_paths:
201+
task = created_tasks_by_path.get(dependent_file)
202+
if task is not None:
203+
dependent_tasks.append(task)
204+
205+
upload_item = dependency_graph.path_to_upload_item.get(file_path)
206+
file_task = asyncio.create_task(
207+
self._upload_item_async(
208+
item=upload_item.entity,
209+
used=upload_item.used,
210+
executed=upload_item.executed,
211+
activity_name=upload_item.activity_name,
212+
activity_description=upload_item.activity_description,
213+
dependent_futures=dependent_tasks,
214+
)
215+
)
216+
created_tasks_by_path[file_path] = file_task
217+
218+
return list(created_tasks_by_path.values())
219+
220+
async def upload(self, items: Iterable[SyncUploadItem]) -> list[File]:
221+
"""Upload a number of files to Synapse as provided in the manifest file. This
222+
will handle ordering the files based on their dependency graph.
223+
224+
Arguments:
225+
items: The list of items to upload.
226+
227+
Returns:
228+
List of File entities that were created or updated, in the same
229+
order as the dependency-graph task execution.
230+
"""
231+
dependency_graph = self._build_dependency_graph(items=list(items))
232+
tasks = self._build_tasks_from_dependency_graph(
233+
dependency_graph=dependency_graph
234+
)
235+
results = await asyncio.gather(*tasks)
236+
return list(results)
237+
238+
def _build_activity_linkage(
239+
self,
240+
used_or_executed: Iterable[str | File],
241+
resolved_file_ids: dict[str, str],
242+
) -> list[UsedEntity | UsedURL]:
243+
"""Loop over the incoming list of used or executed items and build the
244+
appropriate UsedEntity or UsedURL objects.
245+
246+
Arguments:
247+
used_or_executed: The list of used or executed items.
248+
resolved_file_ids: A dictionary that maps the path of a file to its Synapse
249+
ID.
250+
251+
Returns:
252+
A list of UsedEntity or UsedURL objects.
253+
"""
254+
from synapseclient.models import UsedEntity, UsedURL
255+
256+
returned_linkage: list[UsedEntity | UsedURL] = []
257+
for item in used_or_executed:
258+
if not isinstance(item, str):
259+
# item is a File object resolved from provenance — use its ID
260+
returned_linkage.append(UsedEntity(target_id=item.id))
261+
continue
262+
resolved_file_id = resolved_file_ids.get(item, None)
263+
if resolved_file_id:
264+
returned_linkage.append(UsedEntity(target_id=resolved_file_id))
265+
elif is_url(item):
266+
returned_linkage.append(UsedURL(url=item))
267+
else:
268+
if not is_synapse_id_str(item):
269+
raise ValueError(f"{item} is not a valid Synapse id")
270+
synid, version = get_synid_and_version(item)
271+
target_version = int(version) if version else None
272+
returned_linkage.append(
273+
UsedEntity(target_id=synid, target_version_number=target_version)
274+
)
275+
return returned_linkage
276+
277+
async def _upload_item_async(
278+
self,
279+
item: File,
280+
used: Iterable[str | File],
281+
executed: Iterable[str | File],
282+
activity_name: str,
283+
activity_description: str,
284+
dependent_futures: list[asyncio.Future],
285+
) -> File:
286+
"""Upload a single file, waiting for any provenance dependencies to finish first.
287+
288+
Arguments:
289+
item: The File entity to upload.
290+
used: Provenance ``used`` references (paths, URLs, Synapse IDs, or File
291+
objects).
292+
executed: Provenance ``executed`` references.
293+
activity_name: Name for the provenance Activity.
294+
activity_description: Description for the provenance Activity.
295+
dependent_futures: Futures for files that must be uploaded before this one.
296+
297+
Returns:
298+
The stored File entity.
299+
"""
300+
from synapseclient.models import Activity
301+
302+
resolved_file_ids: dict[str, str] = {}
303+
if dependent_futures:
304+
finished_dependencies, pending = await asyncio.wait(dependent_futures)
305+
if pending:
306+
raise RuntimeError(
307+
f"There were {len(pending)} dependencies left when storing {item}"
308+
)
309+
for finished_dependency in finished_dependencies:
310+
result = finished_dependency.result()
311+
resolved_file_ids[result.path] = result.id
312+
313+
used_activity = self._build_activity_linkage(
314+
used_or_executed=used, resolved_file_ids=resolved_file_ids
315+
)
316+
executed_activity = self._build_activity_linkage(
317+
used_or_executed=executed, resolved_file_ids=resolved_file_ids
318+
)
319+
320+
if used_activity or executed_activity:
321+
item.activity = Activity(
322+
name=activity_name,
323+
description=activity_description,
324+
used=used_activity,
325+
executed=executed_activity,
326+
)
327+
await item.store_async(synapse_client=self.syn)
328+
return item
329+
330+
101331
async def read_manifest_for_upload(
102332
manifest_path: str,
103333
syn: Synapse,
@@ -260,6 +490,11 @@ def _check_path_and_normalize(f: str) -> str:
260490
return path_normalized
261491

262492

493+
def _expand_path(path: str) -> str:
494+
"""Expand ``~`` and environment variables, then return the absolute path."""
495+
return os.path.abspath(os.path.expandvars(os.path.expanduser(path)))
496+
497+
263498
def _check_file_names(df: DataFrame) -> None:
264499
"""Validate that each file name is acceptable for Synapse and that all
265500
(name, parentId) pairs are unique.

synapseutils/sync.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,12 @@ async def _sync(
364364
return root_entity
365365

366366

367+
@deprecated(
368+
version="4.13.0",
369+
reason=(
370+
"To be removed in 5.0.0. " "Moved to synapseclient.models.services.manifest.py"
371+
),
372+
)
367373
class _SyncUploadItem(NamedTuple):
368374
"""Represents a single file being uploaded.
369375
@@ -383,6 +389,12 @@ class _SyncUploadItem(NamedTuple):
383389
activity_description: str
384390

385391

392+
@deprecated(
393+
version="4.13.0",
394+
reason=(
395+
"To be removed in 5.0.0. " "Moved to synapseclient.models.services.manifest.py"
396+
),
397+
)
386398
@dataclass
387399
class _SyncUploader:
388400
"""
@@ -721,6 +733,12 @@ def _get_file_entity_provenance_dict(syn, entity):
721733
raise # unexpected error so we re-raise the exception
722734

723735

736+
@deprecated(
737+
version="4.13.0",
738+
reason=(
739+
"To be removed in 5.0.0. " "Moved to synapseclient.models.services.manifest.py"
740+
),
741+
)
724742
def generate_manifest(all_files: List[File], path: str) -> None:
725743
"""Generates a manifest file based on a list of entities objects.
726744

0 commit comments

Comments
 (0)