Skip to content

Commit 1d1a91f

Browse files
Donglai Weiclaude
andcommitted
Add --chunk-index for direct chunk assignment (no orchestrator competition)
sbatch --array=0-25 now directly assigns chunk N to worker N. No task claiming, no file locks, no race conditions. Auto-detects SLURM_ARRAY_TASK_ID when --chunk-index is not set. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 8f50580 commit 1d1a91f

2 files changed

Lines changed: 32 additions & 9 deletions

File tree

scripts/decode_large.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ def main():
4444
parser.add_argument("--config", required=True, help="YAML config file")
4545
parser.add_argument("--init-only", action="store_true", help="Initialize workflow and exit")
4646
parser.add_argument("--worker", action="store_true", help="Run as a worker (claim tasks)")
47+
parser.add_argument("--chunk-index", type=int, default=None,
48+
help="Decode a specific chunk by index (for sbatch --array)")
49+
parser.add_argument("--chunk-range", type=str, default=None,
50+
help="Decode chunk range 'start-end' (inclusive)")
4751
parser.add_argument("--wait", action="store_true", help="Wait for all tasks to complete")
4852
parser.add_argument("--assemble", action="store_true", help="Assemble final output volume")
4953
parser.add_argument("--parallel", type=int, default=None,
@@ -115,6 +119,30 @@ def main():
115119
print("Workflow initialized. Launch workers to execute tasks.")
116120
return
117121

122+
# Direct chunk assignment (no orchestrator competition)
123+
chunk_index = args.chunk_index
124+
if chunk_index is None and os.environ.get("SLURM_ARRAY_TASK_ID"):
125+
# Auto-detect from SLURM array index
126+
chunk_index = int(os.environ["SLURM_ARRAY_TASK_ID"])
127+
128+
if chunk_index is not None or args.chunk_range is not None:
129+
if args.chunk_range:
130+
start, end = args.chunk_range.split("-")
131+
indices = list(range(int(start), int(end) + 1))
132+
else:
133+
indices = [chunk_index]
134+
for idx in indices:
135+
if idx >= len(chunks):
136+
print(f"Chunk index {idx} out of range (0-{len(chunks)-1}), skipping")
137+
continue
138+
chunk = chunks[idx]
139+
print(f"Decoding chunk {idx}/{len(chunks)}: {chunk.key}")
140+
from waterz.orchestrator import TaskRecord, TaskSpec
141+
record = TaskRecord(spec=TaskSpec(stage="decode", key=chunk.key))
142+
result = runner.handle_decode_chunk(record)
143+
print(f" Done: {result}")
144+
return
145+
118146
if args.worker:
119147
worker_id = args.worker_id or os.environ.get("SLURM_JOB_ID", None)
120148
job_id = args.job_id or os.environ.get("SLURM_ARRAY_TASK_ID", None)

scripts/decode_large_worker.sh

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
#!/bin/bash
22
#SBATCH --job-name=waterz_worker
33
#SBATCH --mem=64G
4-
#SBATCH --cpus-per-task=4
4+
#SBATCH --cpus-per-task=2
55
#SBATCH --time=12:00:00
66
#SBATCH --output=slurm_outputs/waterz_worker_%A_%a.out
77
#SBATCH --error=slurm_outputs/waterz_worker_%A_%a.err
88

99
# Usage:
10-
# sbatch --array=0-7 scripts/decode_large_worker.sh tutorials/waterz_decoding_large.yaml
10+
# sbatch --array=0-25 scripts/decode_large_worker.sh tutorials/waterz_decoding_large.yaml
1111
#
12-
# Each array task is an independent worker that claims and executes
13-
# tasks from the shared workflow directory. Workers coordinate via
14-
# file locks — no central scheduler needed.
12+
# Worker N decodes chunk N directly — no task competition, no race conditions.
1513

1614
CONFIG=${1:-tutorials/waterz_decoding_large.yaml}
1715

@@ -26,9 +24,6 @@ echo "Start: $(date)"
2624

2725
python scripts/decode_large.py \
2826
--config ${CONFIG} \
29-
--worker \
30-
--worker-id "$(hostname)-${SLURM_ARRAY_TASK_ID}" \
31-
--job-id "${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}" \
32-
--idle-timeout 120
27+
--chunk-index ${SLURM_ARRAY_TASK_ID}
3328

3429
echo "End: $(date)"

0 commit comments

Comments
 (0)