Skip to content

Commit 9170850

Browse files
committed
Sync metadata
1 parent 41780b0 commit 9170850

3 files changed

Lines changed: 75 additions & 18 deletions

File tree

dataflow_transfer/dataflow_transfer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ def process_run(run_dir, sequencer, config):
3434
run.start_transfer(final=False)
3535
return
3636

37+
## Sequencing finished. Copy metadata in the background if not already done.
38+
if run.has_status("sequencing_finished"):
39+
if not run.metadata_synced:
40+
run.sync_metadata()
41+
3742
## Sequencing finished but transfer not complete. Start final transfer.
3843
if not run.final_sync_successful:
3944
if run.has_status("sequencing_finished"):

dataflow_transfer/run_classes/generic_runs.py

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ def __init__(self, run_dir, configuration):
2121
)
2222
self.final_file = ""
2323
self.transfer_details = self.configuration.get("transfer_details", {})
24+
self.metadata_rsync_exitcode_file = os.path.join(
25+
self.run_dir, ".metadata_rsync_exitcode"
26+
)
27+
self.metadata_destination = os.path.join(
28+
self.configuration.get("metadata_archive"),
29+
getattr(self, "run_type", None),
30+
self.run_id,
31+
)
2432
self.final_rsync_exitcode_file = os.path.join(
2533
self.run_dir, ".final_rsync_exitcode"
2634
)
@@ -42,36 +50,80 @@ def sequencing_ongoing(self):
4250
return False
4351
return True
4452

45-
def generate_rsync_command(self, is_final_sync=False):
46-
"""Generate an rsync command string."""
47-
destination = (
48-
self.transfer_details.get("user")
49-
+ "@"
50-
+ self.transfer_details.get("host")
51-
+ ":"
52-
+ self.miarka_destination
53+
@property
54+
def metadata_synced(self):
55+
"""Check if the metadata rsync was successful by reading the exit code file."""
56+
return fs.check_exit_status(self.metadata_rsync_exitcode_file)
57+
58+
def sync_metadata(self):
59+
"""Start background rsync transfer for metadata files."""
60+
# make metadata destination path if it doesn't exist
61+
if not os.path.exists(self.metadata_destination):
62+
os.makedirs(self.metadata_destination)
63+
metadata_rsync_command = self.generate_rsync_command(
64+
remote=False, with_exit_code_file=True
5365
)
66+
67+
if fs.rsync_is_running(src=self.run_dir, dst=self.metadata_destination):
68+
logger.info(
69+
f"Metadata rsync is already running for {self.run_dir} to destination {self.metadata_destination}. Skipping background metadata sync initiation."
70+
)
71+
return
72+
try:
73+
fs.submit_background_process(metadata_rsync_command)
74+
logger.info(
75+
f"{self.run_id}: Started metadata rsync to {self.metadata_destination}"
76+
+ f" with the following command: '{metadata_rsync_command}'"
77+
)
78+
except Exception as e:
79+
logger.error(f"Failed to start metadata rsync for {self.run_id}: {e}")
80+
raise e
81+
82+
def generate_rsync_command(self, remote=False, with_exit_code_file=False):
83+
"""Generate an rsync command string."""
84+
if remote:
85+
destination = (
86+
self.transfer_details.get("user")
87+
+ "@"
88+
+ self.transfer_details.get("host")
89+
+ ":"
90+
+ self.miarka_destination
91+
)
92+
log_file_option = "--log-file=" + os.path.join(
93+
self.run_dir, "rsync_remote_log.txt"
94+
)
95+
rsync_options = self.sequencer_config.get("remote_rsync_options", [])
96+
exit_code_file = self.final_rsync_exitcode_file
97+
else:
98+
destination = self.metadata_destination
99+
log_file_option = "--log-file=" + os.path.join(
100+
self.run_dir, "rsync_metadata_log.txt"
101+
)
102+
rsync_options = self.sequencer_config.get("metadata_rsync_options", [])
103+
exit_code_file = self.metadata_rsync_exitcode_file
54104
run_one_bin = self.configuration.get("run_one_path", "run-one")
55105
command = [
56106
run_one_bin,
57107
"rsync",
58108
"-au",
59-
"--log-file=" + os.path.join(self.run_dir, "rsync_remote_log.txt"),
60-
*(self.sequencer_config.get("rsync_options", [])),
109+
log_file_option,
110+
*(rsync_options),
61111
self.run_dir,
62112
destination,
63113
]
64114
command_str = " ".join(command)
65-
if is_final_sync:
66-
command_str += f"; echo $? > {self.final_rsync_exitcode_file}"
115+
if with_exit_code_file:
116+
command_str += f"; echo $? > {exit_code_file}"
67117
return command_str
68118

69119
def start_transfer(self, final=False):
70120
"""Start background rsync transfer to storage."""
71-
transfer_command = self.generate_rsync_command(is_final_sync=final)
72-
if fs.rsync_is_running(src=self.run_dir):
121+
transfer_command = self.generate_rsync_command(
122+
remote=True, with_exit_code_file=final
123+
)
124+
if fs.rsync_is_running(src=self.run_dir, dst=self.miarka_destination):
73125
logger.info(
74-
f"Rsync is already running for {self.run_dir}. Skipping background transfer initiation."
126+
f"Rsync is already running for {self.run_dir} to destination {self.miarka_destination}. Skipping background transfer initiation."
75127
)
76128
return
77129
try:

dataflow_transfer/utils/filesystem.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ def find_runs(base_dir, ignore_folders=[]):
2828
return runs
2929

3030

31-
def rsync_is_running(src):
32-
"""Check if rsync is already running for given src."""
33-
pattern = f"rsync.*{src}"
31+
def rsync_is_running(src, dst):
32+
"""Check if rsync is already running for given src and destination."""
33+
pattern = f"rsync.*{src}.*{dst}"
3434
try:
3535
subprocess.check_output(["pgrep", "-f", pattern])
3636
return True

0 commit comments

Comments
 (0)