Skip to content

Commit 4143ed9

Browse files
authored
Merge pull request #14 from ssjunnebo/sync_metadata
Sync metadata
2 parents 3945017 + 43dc7ca commit 4143ed9

7 files changed

Lines changed: 216 additions & 56 deletions

File tree

README.md

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ log:
8383

8484
run_one_path: /usr/bin/run-one
8585

86+
metadata_archive: /path/to/metadata/archive
87+
8688
transfer_details:
8789
user: username
8890
host: remote.host.com
@@ -96,14 +98,16 @@ statusdb:
9698
sequencers:
9799
NovaSeqXPlus:
98100
sequencing_path: /sequencing/NovaSeqXPlus
99-
miarka_destination: /Illumina/NovaSeqXPlus
101+
remote_destination: /Illumina/NovaSeqXPlus
100102
metadata_for_statusdb:
101103
- RunInfo.xml
102104
- RunParameters.xml
103105
ignore_folders:
104106
- nosync
105-
rsync_options:
107+
remote_rsync_options:
106108
- --chmod=Dg+s,g+rw
109+
metadata_rsync_options:
110+
- "--include=InterOp"
107111
# ... additional sequencer configurations
108112
```
109113

@@ -113,7 +117,7 @@ sequencers:
113117
2. **Validation**: Confirms run ID matches expected format for the sequencer type
114118
3. **Transfer Phases**:
115119
- **Sequencing Phase**: Starts continuous background rsync transfer while sequencing is ongoing (when the final sequencing file doesn't exist). Uploads status and metadata files (specified for each sequencer type in the config with `metadata_for_statusdb`) to database.
116-
- **Final Transfer**: After sequencing completes (final sequencing file appears), initiates final rsync transfer and captures exit code.
120+
- **Final Transfer**: After sequencing completes (final sequencing file appears), syncs specified metadata file to archive location, initiates final rsync transfer and captures exit codes.
117121
- **Completion**: Updates database when transfer was successful.
118122

119123
### Status Tracking
@@ -145,14 +149,15 @@ Run status is tracked in CouchDB with events including:
145149
- Final completion is indicated by the presence of a sequencer-specific final file (e.g., `RTAComplete.txt` for Illumina)
146150
- Remote storage is accessible via rsync over SSH
147151
- CouchDB is accessible and the database exists
148-
- Metadata files (e.g., RunInfo.xml) are present in run directories for status database updates
152+
- Metadata files (e.g., RunInfo.xml) are present in run directories for status database updates and sync to metadata archive location
149153

150154
### Status Files
151155

152156
The logic of the script relies on the following status files:
153157

154158
- `run.final_file` - The final file written by each sequencing machine. Used to indicate when the sequencing has completed.
155-
- `final_rsync_exitcode` - Used to indicate when the final rsync is done, so that the final rsync can be run in the background. This is especially useful for restarts after long pauses of the cronjob.
159+
- `.final_rsync_exitcode` - Used to indicate when the final rsync is done, so that the final rsync can be run in the background. This is especially useful for restarts after long pauses of the cronjob.
160+
- `.metadata_rsync_exitcode` - Used to indicate when rsync of metadata to the metadata archive is done, so that the rsync can be run in the background. This is useful when there are I/O issue with the disks.
156161

157162
## Development
158163

dataflow_transfer/dataflow_transfer.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ def process_run(run_dir, sequencer, config):
2222
run.confirm_run_type()
2323

2424
## Transfer already completed. Do nothing.
25-
if run.final_sync_successful and run.has_status("transferred_to_hpc"):
25+
if (
26+
run.final_sync_successful
27+
and run.has_status("transferred_to_hpc")
28+
and run.metadata_synced
29+
):
2630
# Check transfer success both in statusdb and via exit code file
2731
# To restart transfer, remove the exit code file
2832
logger.info(f"Transfer of {run_dir} is finished. No action needed.")
@@ -34,6 +38,12 @@ def process_run(run_dir, sequencer, config):
3438
run.start_transfer(final=False)
3539
return
3640

41+
## Sequencing finished. Copy metadata in the background if not already done.
42+
if run.has_status("sequencing_finished"):
43+
if not run.metadata_synced:
44+
run.sync_metadata()
45+
# We don't return here since metadata sync is somewhat independent of the real data sync.
46+
3747
## Sequencing finished but transfer not complete. Start final transfer.
3848
if not run.final_sync_successful:
3949
if run.has_status("sequencing_finished"):
@@ -47,8 +57,9 @@ def process_run(run_dir, sequencer, config):
4757

4858
## Final transfer completed successfully. Update statusdb.
4959
if run.final_sync_successful:
50-
logger.info(f"Final transfer completed successfully for {run_dir}.")
51-
run.update_statusdb(status="transferred_to_hpc")
60+
if not run.has_status("transferred_to_hpc"):
61+
logger.info(f"Final transfer completed successfully for {run_dir}.")
62+
run.update_statusdb(status="transferred_to_hpc")
5263
return
5364

5465
## Unknown status of run. Log error and raise exception.

dataflow_transfer/run_classes/generic_runs.py

Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,18 @@ def __init__(self, run_dir, configuration):
2121
)
2222
self.final_file = ""
2323
self.transfer_details = self.configuration.get("transfer_details", {})
24+
self.metadata_rsync_exitcode_file = os.path.join(
25+
self.run_dir, ".metadata_rsync_exitcode"
26+
)
27+
self.metadata_destination = os.path.join(
28+
self.configuration.get("metadata_archive"),
29+
getattr(self, "run_type", None),
30+
self.run_id,
31+
)
2432
self.final_rsync_exitcode_file = os.path.join(
2533
self.run_dir, ".final_rsync_exitcode"
2634
)
27-
self.miarka_destination = self.sequencer_config.get("miarka_destination")
35+
self.remote_destination = self.sequencer_config.get("remote_destination")
2836
self.db = StatusdbSession(self.configuration.get("statusdb"))
2937

3038
def confirm_run_type(self):
@@ -42,50 +50,95 @@ def sequencing_ongoing(self):
4250
return False
4351
return True
4452

45-
def generate_rsync_command(self, is_final_sync=False):
46-
"""Generate an rsync command string."""
47-
destination = (
48-
self.transfer_details.get("user")
49-
+ "@"
50-
+ self.transfer_details.get("host")
51-
+ ":"
52-
+ self.miarka_destination
53+
@property
54+
def metadata_synced(self):
55+
"""Check if the metadata rsync was successful by reading the exit code file."""
56+
return fs.check_exit_status(self.metadata_rsync_exitcode_file)
57+
58+
def sync_metadata(self):
59+
"""Start background rsync transfer for metadata files."""
60+
metadata_rsync_command = self.generate_rsync_command(
61+
metadata_only=True, with_exit_code_file=True
5362
)
63+
64+
if fs.rsync_is_running(src=self.run_dir, dst=self.metadata_destination):
65+
logger.info(
66+
f"Metadata rsync is already running for {self.run_dir} to destination {self.metadata_destination}. Skipping background metadata sync initiation."
67+
)
68+
return
69+
try:
70+
fs.submit_background_process(metadata_rsync_command)
71+
logger.info(
72+
f"{self.run_id}: Started metadata rsync to {self.metadata_destination}"
73+
+ f" with the following command: '{metadata_rsync_command}'"
74+
)
75+
except Exception as e:
76+
logger.error(f"Failed to start metadata rsync for {self.run_id}: {e}")
77+
raise e
78+
79+
def generate_rsync_command(self, metadata_only=False, with_exit_code_file=False):
80+
"""Generate an rsync command string."""
81+
if metadata_only:
82+
source = self.run_dir + "/"
83+
destination = self.metadata_destination + "/"
84+
log_file_option = "--log-file=" + os.path.join(
85+
self.run_dir, "rsync_metadata_log.txt"
86+
)
87+
rsync_options = self.sequencer_config.get("metadata_rsync_options", [])
88+
exit_code_file = self.metadata_rsync_exitcode_file
89+
else:
90+
source = self.run_dir
91+
destination = (
92+
self.transfer_details.get("user")
93+
+ "@"
94+
+ self.transfer_details.get("host")
95+
+ ":"
96+
+ self.remote_destination
97+
)
98+
log_file_option = "--log-file=" + os.path.join(
99+
self.run_dir, "rsync_remote_log.txt"
100+
)
101+
rsync_options = self.sequencer_config.get("remote_rsync_options", [])
102+
exit_code_file = self.final_rsync_exitcode_file
103+
54104
run_one_bin = self.configuration.get("run_one_path", "run-one")
55105
command = [
56106
run_one_bin,
57107
"rsync",
58108
"-au",
59-
"--log-file=" + os.path.join(self.run_dir, "rsync_remote_log.txt"),
60-
*(self.sequencer_config.get("rsync_options", [])),
61-
self.run_dir,
109+
log_file_option,
110+
*(rsync_options),
111+
"--exclude='*'" if metadata_only else "",
112+
source,
62113
destination,
63114
]
64115
command_str = " ".join(command)
65-
if is_final_sync:
66-
command_str += f"; echo $? > {self.final_rsync_exitcode_file}"
116+
if with_exit_code_file:
117+
command_str += f"; echo $? > {exit_code_file}"
67118
return command_str
68119

69120
def start_transfer(self, final=False):
70121
"""Start background rsync transfer to storage."""
71-
transfer_command = self.generate_rsync_command(is_final_sync=final)
72-
if fs.rsync_is_running(src=self.run_dir):
122+
transfer_command = self.generate_rsync_command(
123+
metadata_only=False, with_exit_code_file=final
124+
)
125+
if fs.rsync_is_running(src=self.run_dir, dst=self.remote_destination):
73126
logger.info(
74-
f"Rsync is already running for {self.run_dir}. Skipping background transfer initiation."
127+
f"Rsync is already running for {self.run_dir} to destination {self.remote_destination}. Skipping background transfer initiation."
75128
)
76129
return
77130
try:
78131
fs.submit_background_process(transfer_command)
79132
logger.info(
80-
f"{self.run_id}: Started rsync to {self.miarka_destination}"
133+
f"{self.run_id}: Started rsync to {self.remote_destination}"
81134
+ f" with the following command: '{transfer_command}'"
82135
)
83136
except Exception as e:
84137
logger.error(f"Failed to start rsync for {self.run_id}: {e}")
85138
raise e
86139
rsync_info = {
87140
"command": transfer_command,
88-
"destination_path": self.miarka_destination,
141+
"destination_path": self.remote_destination,
89142
}
90143
if final:
91144
self.update_statusdb(

dataflow_transfer/tests/test_filesystem.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,12 @@ class TestRsyncIsRunning:
6969
@patch("subprocess.check_output")
7070
def test_rsync_running(self, mock_check_output):
7171
mock_check_output.return_value = b"12345"
72-
assert rsync_is_running("/some/path") is True
72+
assert rsync_is_running("/some/path", "/dst/path") is True
7373

7474
@patch("subprocess.check_output")
7575
def test_rsync_not_running(self, mock_check_output):
7676
mock_check_output.side_effect = CalledProcessError(1, "pgrep")
77-
assert rsync_is_running("/some/path") is False
77+
assert rsync_is_running("/some/path", "/dst/path") is False
7878

7979

8080
class TestSubmitBackgroundProcess:

0 commit comments

Comments
 (0)