Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ log:

run_one_path: /usr/bin/run-one

metadata_archive: /path/to/metadata/archive

transfer_details:
user: username
host: remote.host.com
Expand All @@ -102,8 +104,10 @@ sequencers:
- RunParameters.xml
ignore_folders:
- nosync
rsync_options:
remote_rsync_options:
- --chmod=Dg+s,g+rw
metadata_rsync_options:
- "--include=InterOp"
# ... additional sequencer configurations
```

Expand All @@ -113,7 +117,7 @@ sequencers:
2. **Validation**: Confirms run ID matches expected format for the sequencer type
3. **Transfer Phases**:
- **Sequencing Phase**: Starts continuous background rsync transfer while sequencing is ongoing (when the final sequencing file doesn't exist). Uploads status and metadata files (specified for each sequencer type in the config with `metadata_for_statusdb`) to database.
- **Final Transfer**: After sequencing completes (final sequencing file appears), initiates final rsync transfer and captures exit code.
- **Final Transfer**: After sequencing completes (final sequencing file appears), syncs specified metadata file to archive location, initiates final rsync transfer and captures exit codes.
- **Completion**: Updates database when transfer was successful.

### Status Tracking
Expand Down Expand Up @@ -145,14 +149,15 @@ Run status is tracked in CouchDB with events including:
- Final completion is indicated by the presence of a sequencer-specific final file (e.g., `RTAComplete.txt` for Illumina)
- Remote storage is accessible via rsync over SSH
- CouchDB is accessible and the database exists
- Metadata files (e.g., RunInfo.xml) are present in run directories for status database updates
- Metadata files (e.g., RunInfo.xml) are present in run directories for status database updates and sync to metadata archive location

### Status Files

The logic of the script relies on the following status files:

- `run.final_file` - The final file written by each sequencing machine. Used to indicate when the sequencing has completed.
- `final_rsync_exitcode` - Used to indicate when the final rsync is done, so that the final rsync can be run in the background. This is especially useful for restarts after long pauses of the cronjob.
- `.final_rsync_exitcode` - Used to indicate when the final rsync is done, so that the final rsync can be run in the background. This is especially useful for restarts after long pauses of the cronjob.
- `.metadata_rsync_exitcode` - Used to indicate when rsync of metadata to the metadata archive is done, so that the rsync can be run in the background. This is useful when there are I/O issue with the disks.

## Development

Expand Down
5 changes: 5 additions & 0 deletions dataflow_transfer/dataflow_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ def process_run(run_dir, sequencer, config):
run.start_transfer(final=False)
return

## Sequencing finished. Copy metadata in the background if not already done.
if run.has_status("sequencing_finished"):
if not run.metadata_synced:
run.sync_metadata()
Comment thread
ssjunnebo marked this conversation as resolved.

## Sequencing finished but transfer not complete. Start final transfer.
if not run.final_sync_successful:
if run.has_status("sequencing_finished"):
Expand Down
84 changes: 68 additions & 16 deletions dataflow_transfer/run_classes/generic_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ def __init__(self, run_dir, configuration):
)
self.final_file = ""
self.transfer_details = self.configuration.get("transfer_details", {})
self.metadata_rsync_exitcode_file = os.path.join(
self.run_dir, ".metadata_rsync_exitcode"
)
self.metadata_destination = os.path.join(
self.configuration.get("metadata_archive"),
getattr(self, "run_type", None),
self.run_id,
)
self.final_rsync_exitcode_file = os.path.join(
self.run_dir, ".final_rsync_exitcode"
)
Expand All @@ -42,36 +50,80 @@ def sequencing_ongoing(self):
return False
return True

def generate_rsync_command(self, is_final_sync=False):
"""Generate an rsync command string."""
destination = (
self.transfer_details.get("user")
+ "@"
+ self.transfer_details.get("host")
+ ":"
+ self.miarka_destination
@property
def metadata_synced(self):
"""Check if the metadata rsync was successful by reading the exit code file."""
return fs.check_exit_status(self.metadata_rsync_exitcode_file)

def sync_metadata(self):
Comment thread
ssjunnebo marked this conversation as resolved.
"""Start background rsync transfer for metadata files."""
metadata_rsync_command = self.generate_rsync_command(
remote=False, with_exit_code_file=True
)

if fs.rsync_is_running(src=self.run_dir, dst=self.metadata_destination):
logger.info(
f"Metadata rsync is already running for {self.run_dir} to destination {self.metadata_destination}. Skipping background metadata sync initiation."
)
return
try:
fs.submit_background_process(metadata_rsync_command)
logger.info(
f"{self.run_id}: Started metadata rsync to {self.metadata_destination}"
+ f" with the following command: '{metadata_rsync_command}'"
)
except Exception as e:
logger.error(f"Failed to start metadata rsync for {self.run_id}: {e}")
raise e

def generate_rsync_command(self, remote=False, with_exit_code_file=False):
"""Generate an rsync command string."""
Comment thread
ssjunnebo marked this conversation as resolved.
if remote:
source = self.run_dir
destination = (
self.transfer_details.get("user")
+ "@"
+ self.transfer_details.get("host")
+ ":"
+ self.miarka_destination
Comment thread
ssjunnebo marked this conversation as resolved.
Outdated
)
log_file_option = "--log-file=" + os.path.join(
self.run_dir, "rsync_remote_log.txt"
)
rsync_options = self.sequencer_config.get("remote_rsync_options", [])
exit_code_file = self.final_rsync_exitcode_file
else:
source = self.run_dir + "/"
destination = self.metadata_destination + "/"
log_file_option = "--log-file=" + os.path.join(
self.run_dir, "rsync_metadata_log.txt"
)
rsync_options = self.sequencer_config.get("metadata_rsync_options", [])
exit_code_file = self.metadata_rsync_exitcode_file
run_one_bin = self.configuration.get("run_one_path", "run-one")
command = [
run_one_bin,
"rsync",
"-au",
"--log-file=" + os.path.join(self.run_dir, "rsync_remote_log.txt"),
*(self.sequencer_config.get("rsync_options", [])),
self.run_dir,
log_file_option,
*(rsync_options),
"--exclude='*'" if not remote else "",
source,
destination,
]
command_str = " ".join(command)
if is_final_sync:
command_str += f"; echo $? > {self.final_rsync_exitcode_file}"
if with_exit_code_file:
command_str += f"; echo $? > {exit_code_file}"
return command_str

def start_transfer(self, final=False):
"""Start background rsync transfer to storage."""
transfer_command = self.generate_rsync_command(is_final_sync=final)
if fs.rsync_is_running(src=self.run_dir):
transfer_command = self.generate_rsync_command(
remote=True, with_exit_code_file=final
)
if fs.rsync_is_running(src=self.run_dir, dst=self.miarka_destination):
logger.info(
f"Rsync is already running for {self.run_dir}. Skipping background transfer initiation."
f"Rsync is already running for {self.run_dir} to destination {self.miarka_destination}. Skipping background transfer initiation."
)
return
try:
Expand Down
4 changes: 2 additions & 2 deletions dataflow_transfer/tests/test_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ class TestRsyncIsRunning:
@patch("subprocess.check_output")
def test_rsync_running(self, mock_check_output):
mock_check_output.return_value = b"12345"
assert rsync_is_running("/some/path") is True
assert rsync_is_running("/some/path", "/dst/path") is True

@patch("subprocess.check_output")
def test_rsync_not_running(self, mock_check_output):
mock_check_output.side_effect = CalledProcessError(1, "pgrep")
assert rsync_is_running("/some/path") is False
assert rsync_is_running("/some/path", "/dst/path") is False


class TestSubmitBackgroundProcess:
Expand Down
127 changes: 108 additions & 19 deletions dataflow_transfer/tests/test_run_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def novaseqxplus_testobj(tmp_path):
config = {
"log": {"file": "test.log"},
"transfer_details": {"user": "testuser", "host": "testhost"},
"metadata_archive": "/data/metadata_archive",
"statusdb": {
"username": "dbuser",
"password": "dbpass",
Expand All @@ -23,7 +24,12 @@ def novaseqxplus_testobj(tmp_path):
"miarka_destination": "/data/NovaSeqXPlus",
"metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"],
"ignore_folders": ["nosync"],
"rsync_options": ["--chmod=Dg+s,g+rw"],
"remote_rsync_options": ["--chmod=Dg+s,g+rw"],
"metadata_rsync_options": [
"--include=RunInfo.xml",
"--include=RunParameters.xml",
"--exclude=*",
],
}
},
}
Expand All @@ -38,6 +44,7 @@ def nextseq_testobj(tmp_path):
config = {
"log": {"file": "test.log"},
"transfer_details": {"user": "testuser", "host": "testhost"},
"metadata_archive": "/data/metadata_archive",
"statusdb": {
"username": "dbuser",
"password": "dbpass",
Expand All @@ -49,7 +56,12 @@ def nextseq_testobj(tmp_path):
"miarka_destination": "/data/NextSeq",
"metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"],
"ignore_folders": ["nosync"],
"rsync_options": ["--chmod=Dg+s,g+rw"],
"remote_rsync_options": ["--chmod=Dg+s,g+rw"],
"metadata_rsync_options": [
"--include=RunInfo.xml",
"--include=RunParameters.xml",
"--exclude=*",
],
}
},
}
Expand All @@ -64,6 +76,7 @@ def miseqseq_testobj(tmp_path):
config = {
"log": {"file": "test.log"},
"transfer_details": {"user": "testuser", "host": "testhost"},
"metadata_archive": "/data/metadata_archive",
"statusdb": {
"username": "dbuser",
"password": "dbpass",
Expand All @@ -75,7 +88,12 @@ def miseqseq_testobj(tmp_path):
"miarka_destination": "/data/MiSeq",
"metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"],
"ignore_folders": ["nosync"],
"rsync_options": ["--chmod=Dg+s,g+rw"],
"remote_rsync_options": ["--chmod=Dg+s,g+rw"],
"metadata_rsync_options": [
"--include=RunInfo.xml",
"--include=RunParameters.xml",
"--exclude=*",
],
}
},
}
Expand All @@ -90,6 +108,7 @@ def miseqseqi100_testobj(tmp_path):
config = {
"log": {"file": "test.log"},
"transfer_details": {"user": "testuser", "host": "testhost"},
"metadata_archive": "/data/metadata_archive",
"statusdb": {
"username": "dbuser",
"password": "dbpass",
Expand All @@ -101,7 +120,12 @@ def miseqseqi100_testobj(tmp_path):
"miarka_destination": "/data/MiSeqi100",
"metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"],
"ignore_folders": ["nosync"],
"rsync_options": ["--chmod=Dg+s,g+rw"],
"remote_rsync_options": ["--chmod=Dg+s,g+rw"],
"metadata_rsync_options": [
"--include=RunInfo.xml",
"--include=RunParameters.xml",
"--exclude=*",
],
}
},
}
Expand Down Expand Up @@ -166,27 +190,32 @@ def test_sequencing_ongoing(run_fixture, request):


@pytest.mark.parametrize(
"run_fixture, final_sync",
"run_fixture, remote, with_exit_code_file",
[
("novaseqxplus_testobj", False),
("novaseqxplus_testobj", True),
("nextseq_testobj", False),
("nextseq_testobj", True),
("miseqseq_testobj", False),
("miseqseq_testobj", True),
("miseqseqi100_testobj", False),
("miseqseqi100_testobj", True),
("novaseqxplus_testobj", False, True),
("novaseqxplus_testobj", True, False),
("novaseqxplus_testobj", True, True),
("nextseq_testobj", False, True),
("nextseq_testobj", True, False),
("nextseq_testobj", True, True),
("miseqseq_testobj", False, True),
("miseqseq_testobj", True, False),
("miseqseq_testobj", True, True),
("miseqseqi100_testobj", False, True),
("miseqseqi100_testobj", True, False),
("miseqseqi100_testobj", True, True),
],
)
def test_generate_rsync_command(run_fixture, final_sync, request):
def test_generate_rsync_command(run_fixture, remote, with_exit_code_file, request):
run_obj = request.getfixturevalue(run_fixture)
rsync_command = run_obj.generate_rsync_command(is_final_sync=final_sync)
rsync_command = run_obj.generate_rsync_command(
remote=remote, with_exit_code_file=with_exit_code_file
)
assert "run-one rsync" in rsync_command
assert "--log-file=" in rsync_command
assert "--chmod=Dg+s,g+rw" in rsync_command
assert run_obj.run_dir in rsync_command
if final_sync:
assert f"; echo $? > {run_obj.final_rsync_exitcode_file}" in rsync_command
if with_exit_code_file:
assert "; echo $? >" in rsync_command


@pytest.mark.parametrize(
Expand All @@ -213,7 +242,7 @@ def test_generate_rsync_command(run_fixture, final_sync, request):
def test_start_transfer(run_fixture, rsync_running, final, request, monkeypatch):
run_obj = request.getfixturevalue(run_fixture)

def mock_rsync_is_running(src):
def mock_rsync_is_running(src, dst):
return rsync_running

def mock_submit_background_process(command_str):
Expand Down Expand Up @@ -270,6 +299,66 @@ def test_final_sync_successful(run_fixture, sync_successful, request):
assert run_obj.final_sync_successful == sync_successful


@pytest.mark.parametrize(
"run_fixture, sync_successful",
[
("novaseqxplus_testobj", True),
("novaseqxplus_testobj", False),
("nextseq_testobj", True),
("nextseq_testobj", False),
("miseqseq_testobj", True),
("miseqseq_testobj", False),
("miseqseqi100_testobj", True),
("miseqseqi100_testobj", False),
],
)
def test_metadata_synced(run_fixture, sync_successful, request):
run_obj = request.getfixturevalue(run_fixture)
if sync_successful:
# Create the final rsync exit code file with a success code
with open(run_obj.metadata_rsync_exitcode_file, "w") as f:
f.write("0")
else:
# Create the final rsync exit code file with a failure code
with open(run_obj.metadata_rsync_exitcode_file, "w") as f:
f.write("1")
assert run_obj.metadata_synced == sync_successful


@pytest.mark.parametrize(
"run_fixture",
[
"novaseqxplus_testobj",
"nextseq_testobj",
"miseqseq_testobj",
"miseqseqi100_testobj",
],
)
def test_sync_metadata(run_fixture, request, monkeypatch):
run_obj = request.getfixturevalue(run_fixture)

def mock_generate_rsync_command(remote, with_exit_code_file):
return "rsync command"

def mock_rsync_is_running(src, dst):
return False

def mock_submit_background_process(command_str):
mock_submit_background_process.called = True
mock_submit_background_process.command_str = command_str

monkeypatch.setattr(run_obj, "generate_rsync_command", mock_generate_rsync_command)
monkeypatch.setattr(generic_runs.fs, "rsync_is_running", mock_rsync_is_running)
monkeypatch.setattr(
generic_runs.fs, "submit_background_process", mock_submit_background_process
)

run_obj.sync_metadata()

assert hasattr(mock_submit_background_process, "called")
assert mock_submit_background_process.command_str == "rsync command"


@pytest.mark.parametrize(
"run_fixture, status_to_check, expected_result",
[
Expand Down
Loading
Loading