Skip to content

Commit 4400ba0

Browse files
committed
Nextflow with Flowcept
1 parent 262e9af commit 4400ba0

2 files changed

Lines changed: 98 additions & 0 deletions

File tree

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2025 The WfCommons Team.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
11+
import ast
12+
import logging
13+
import pathlib
14+
import sys
15+
import time
16+
from flowcept.flowcept_api.flowcept_controller import Flowcept
17+
18+
logging.basicConfig(
19+
level=logging.INFO,
20+
format="[WfBench][%%(asctime)s][%%(levelname)s] %%(message)s",
21+
datefmt="%%H:%%M:%%S",
22+
handlers=[logging.StreamHandler()]
23+
)
24+
25+
workflow_name = sys.argv[1]
26+
workflow_id = sys.argv[2]
27+
out_files = ast.literal_eval(sys.argv[3])
28+
29+
logging.info("Flowcept Starting")
30+
flowcept_agent = Flowcept(workflow_id=workflow_id, workflow_name=workflow_name, bundle_exec_id=workflow_id, start_persistence=False, save_workflow=True)
31+
32+
try:
33+
flowcept_agent.start()
34+
except Exception:
35+
import traceback
36+
traceback.print_exc()
37+
38+
remaining_files = set(out_files)
39+
40+
while remaining_files:
41+
found_files = set()
42+
for f in remaining_files:
43+
if pathlib.Path(f).exists():
44+
found_files.add(f)
45+
remaining_files -= found_files
46+
if not remaining_files:
47+
break
48+
time.sleep(1)
49+
50+
try:
51+
flowcept_agent.stop()
52+
except Exception:
53+
import traceback
54+
traceback.print_exc()
55+
56+
logging.info("Flowcept Completed")
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
params.simulate = false
2+
params.pwd = null
3+
params.help = null
4+
pwd = null
5+
6+
def printUsage(error_msg, exit_code) {
7+
def usage_string = """
8+
Usage: nextflow run workflow.nf --pwd /path/to/directory [--simulate] [--help]
9+
10+
Required parameters:
11+
--pwd Working directory (where the workflow.nf file is located)
12+
13+
Optional parameters:
14+
--help Show this message and exit.
15+
--simulate Use a "sleep 1" for all tasks instead of the WfBench benchmark.
16+
"""
17+
if (error_msg) {
18+
def RED = '\u001B[31m'
19+
def RESET = '\u001B[0m'
20+
System.err.println "${RED}Error: ${RESET}" + error_msg
21+
}
22+
System.err.println usage_string
23+
exit exit_code
24+
}
25+
26+
def validateParams() {
27+
if (params.help) {
28+
printUsage(msg = "", exit_code=0)
29+
}
30+
if (params.pwd == null) {
31+
printUsage(msg = "Missing required parameter: --pwd", exit_code=1)
32+
}
33+
pwd = file(params.pwd).toAbsolutePath().toString()
34+
if (!file(pwd).exists()) {
35+
printUsage(msg = "Directory not found: ${pwd}", exit_code=1)
36+
}
37+
}
38+
39+
// Call validation at the start
40+
validateParams()
41+
42+
# Generated code goes here

0 commit comments

Comments
 (0)