Skip to content

Commit dd5d998

Browse files
committed
replica sort f unction
1 parent b8ffb42 commit dd5d998

2 files changed

Lines changed: 19 additions & 14 deletions

File tree

irods/data_object.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def _DEFAULT_SORT_KEY_FN(row):
5757

5858

5959
class iRODSDataObject:
60-
def __init__(self, manager, parent=None, results=None):
60+
def __init__(self, manager, parent=None, results=None, replica_sort_function=None):
6161
self.manager = manager
6262
if parent and results:
6363
self.collection = parent
@@ -69,7 +69,7 @@ def __init__(self, manager, parent=None, results=None):
6969
# backward compatibility with older schema versions
7070
pass
7171
self.path = self.collection.path + "/" + self.name
72-
replicas = sorted(results, key=_DEFAULT_SORT_KEY_FN)
72+
replicas = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN))
7373

7474
# The status quo before iRODS 5
7575

irods/manager/data_object_manager.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -233,12 +233,12 @@ def _download(self, obj, local_path, num_threads, updatables=(), **options):
233233
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG
234234

235235
data_open_returned_values_ = {}
236-
with self.open(obj, "r", returned_values=data_open_returned_values_, **options) as o:
236+
with self.open(obj_path, "r", returned_values=data_open_returned_values_, **options) as o:
237237
if self.should_parallelize_transfer(num_threads, o, open_options=options.items()):
238238
error = RuntimeError("parallel get failed")
239239
try:
240240
if not self.parallel_get(
241-
(obj, o),
241+
(obj_path, o),
242242
local_file,
243243
num_threads=num_threads,
244244
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
@@ -265,6 +265,8 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda
265265
"""
266266
parent = self.sess.collections.get(irods_dirname(path))
267267

268+
replica_sort_function = options.pop('replica_sort_function', None)
269+
268270
# TODO: optimize
269271
if local_path:
270272
self._download(path, local_path, num_threads=num_threads, updatables=updatables, **options)
@@ -284,7 +286,7 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda
284286
results = query.all() # get up to max_rows replicas
285287
if len(results) <= 0:
286288
raise ex.DataObjectDoesNotExist()
287-
return iRODSDataObject(self, parent, results)
289+
return iRODSDataObject(self, parent, results, replica_sort_function=replica_sort_function)
288290

289291
@staticmethod
290292
def _resolve_force_put_option(options, default_setting=None, true_value=""):
@@ -317,23 +319,25 @@ def put(
317319
self._resolve_force_put_option(options, default_setting=client_config.data_objects.force_put_by_default)
318320

319321
if self.sess.collections.exists(irods_path):
320-
obj = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path))
322+
obj_path = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path))
321323
else:
322-
obj = irods_path
323-
if kw.FORCE_FLAG_KW not in options and self.exists(obj):
324+
obj_path = irods_path
325+
if kw.FORCE_FLAG_KW not in options and self.exists(obj_path):
324326
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG
325327
options.pop(kw.FORCE_FLAG_KW, None)
326328

329+
replica_sort_function = options.pop('replica_sort_function',None)
330+
327331
with open(local_path, "rb") as f:
328332
sizelist = []
329333
if self.should_parallelize_transfer(num_threads, f, measured_obj_size=sizelist, open_options=options):
330-
o = deferred_call(self.open, (obj, "w"), options)
334+
o = deferred_call(self.open, (obj_path, "w"), options)
331335
f.close()
332336
error = RuntimeError("parallel put failed")
333337
try:
334338
if not self.parallel_put(
335339
local_path,
336-
(obj, o),
340+
(obj_path, o),
337341
total_bytes=sizelist[0],
338342
num_threads=num_threads,
339343
target_resource_name=options.get(kw.RESC_NAME_KW, "") or options.get(kw.DEST_RESC_NAME_KW, ""),
@@ -346,7 +350,7 @@ def put(
346350
except BaseException as e:
347351
raise error from e
348352
else:
349-
with self.open(obj, "w", **options) as o:
353+
with self.open(obj_path, "w", **options) as o:
350354
# Set operation type to trigger acPostProcForPut
351355
if kw.OPR_TYPE_KW not in options:
352356
options[kw.OPR_TYPE_KW] = 1 # PUT_OPR
@@ -360,10 +364,10 @@ def put(
360364
# Requested to register checksum without verifying, but source replica has a checksum. This can result
361365
# in multiple replicas being marked good with different checksums, which is an inconsistency.
362366
del repl_options[kw.REG_CHKSUM_KW]
363-
self.replicate(obj, **repl_options)
367+
self.replicate(obj_path, **repl_options)
364368

365369
if return_data_object:
366-
return self.get(obj)
370+
return self.get(obj_path, replica_sort_function=replica_sort_function)
367371

368372
def chksum(self, path, **options):
369373
"""
@@ -480,6 +484,7 @@ def create(
480484
raise ex.DataObjectExistsAtLogicalPath
481485

482486
options = {**options, kw.DATA_TYPE_KW: "generic"}
487+
replica_sort_function = options.pop('replica_sort_function',None)
483488

484489
if resource:
485490
options[kw.DEST_RESC_NAME_KW] = resource
@@ -508,7 +513,7 @@ def create(
508513
desc = response.int_info
509514
conn.close_file(desc)
510515

511-
return self.get(path)
516+
return self.get(path, replica_sort_function=replica_sort_function)
512517

513518
def open_with_FileRaw(self, *arg, **kw_options):
514519
holder = []

0 commit comments

Comments
 (0)