Skip to content

Commit 203f238

Browse files
committed
updating TaskVine parser to support version 2
1 parent e0d5dea commit 203f238

3 files changed

Lines changed: 73 additions & 79 deletions

File tree

tests/test_helpers.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2025-2026 The WfCommons Team.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
111
import pathlib
212
import shutil
313
import tarfile

tests/translators_loggers/test_translators_loggers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
#
4-
# Copyright (c) 2025 The WfCommons Team.
4+
# Copyright (c) 2025-2026 The WfCommons Team.
55
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
@@ -289,7 +289,7 @@ def test_translator(self, backend) -> None:
289289

290290
if parser:
291291
sys.stderr.write(f"\n[{backend}] Parsing the logs...\n")
292-
reconstructed_workflow : Workflow = parser.build_workflow("reconstructed_workflow")
292+
reconstructed_workflow : Workflow = parser.build_workflow(f"reconstructed_workflow_{backend}")
293293
reconstructed_workflow.write_json(pathlib.Path("/tmp/reconstructed_workflow.json"))
294294

295295
original_workflow : Workflow = benchmark.workflow

wfcommons/wfinstances/logs/taskvine.py

Lines changed: 61 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
#
4-
# Copyright (c) 2021 The WfCommons Team.
4+
# Copyright (c) 2021-2026 The WfCommons Team.
55
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
@@ -138,8 +138,6 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
138138

139139
# Construct the input and output file for each task
140140
self._construct_task_input_output_files()
141-
# print("TASK INPUT FILES: " + str(self.task_input_files))
142-
# print("TASK OUTPUT FILES: " + str(self.task_output_files))
143141

144142
# Construct the workflow
145143
self._construct_workflow()
@@ -162,38 +160,41 @@ def _construct_task_command_lines(self) -> None:
162160

163161

164162
def _construct_file_map(self) -> None:
165-
166163
filename_to_key_map = {}
167-
# One pass through the debug file to create the initial file key -> filename mapping
168-
with open(self.debug_file) as f:
164+
165+
with open(self.taskgraph_file) as f:
169166
for line in f:
170-
if "__vine_env_task" in line: # Ignore that weird task/file
171-
continue
172-
if "infile " in line :
173-
# 2025/09/09 21:12:48.02 vine_manager[239]vine: tx to dab178765b01 (127.0.0.1:34382): infile file-rnd-fmtpwpiobiumeze blastall_00000016_outfile_0016 0
174-
[file_key, filename] = line[line.find("infile ") + len("infile "):].split()[:2]
175-
elif "outfile " in line and "completed with outfile " not in line and "outfile =" not in line:
176-
# 2025/09/30 18:37:19.74 vine_manager[1849324]vine: tx to d64cepyc028.crc.nd.edu (10.32.94.18:47558): outfile temp-rnd-pidiwheippcwbeu fde2b5eb-9713-423a-8bc6-f4f9263ad20b.pkl 0 3
177-
[file_key, filename] = line[line.find("outfile ") + len("outfile "):].split()[:2]
178-
else:
179-
continue
180-
if filename in self.filenames_to_ignore:
167+
if line[0] == "#":
181168
continue
182-
# NOTE THAT THE FILENAME MAY NOT BE UNIQUE IN TASKVINE WORKFLOWS, SO
183-
# WE ADD THE KEY
184-
self.files_map[file_key] = {"filename": filename + "." + file_key}
185-
filename_to_key_map[filename] = file_key
186-
187-
# Pass through the transactions file to get the file sizes
188-
with open(self.debug_file) as f:
169+
170+
if line.startswith("FILE"):
171+
# FILE file-meta-df1e8b0d0e056c4aedb917abe198a2ff "taskvine_poncho.tar.gz" 0
172+
[ignore, file_key, filename, ignore] = line.split()[:4]
173+
if len(filename) > 1 and filename[0] == "\"" and filename[-1] == "\"":
174+
filename = filename[1:-1]
175+
if self.filenames_to_ignore and any(
176+
ignore_string in filename for ignore_string in self.filenames_to_ignore
177+
):
178+
continue
179+
if file_key not in self.files_map:
180+
self.files_map[file_key] = {"filename": filename}
181+
182+
with open(self.transactions_file) as f:
189183
for line in f:
190-
if "): file " in line:
191-
[file_key, file_size] = line[line.find("): file ") + len("): file "):].split()[0:2]
192-
else:
184+
if line[0] == "#":
193185
continue
194-
if file_key in self.files_map:
195-
self.files_map[file_key]["size"] = int(file_size)
196-
186+
if "TRANSFER INPUT" in line:
187+
#1769907492293772 245 WORKER worker-8ac3adb4bf1e86b025d0df194b115b8c TRANSFER INPUT file-meta-04276d901d8d096cf981f7ab55f6d1a5 147667979 72409 1769907492221291
188+
[ignore, ignore, ignore, ignore, ignore, ignore, file_key, file_size_in_mb] = line.split()[0:8]
189+
if file_key not in self.files_map:
190+
continue
191+
self.files_map[file_key]["size"] = int(file_size_in_mb)
192+
elif "TRANSFER OUTPUT" in line:
193+
#1769907502293773 245 WORKER worker-8ac3adb4bf1e86b025d0df194b115b8c TRANSFER OUTPUT file-meta-04276d901d8d096cf981f7ab55f6d1a5 147667979 72409 1769907502221292
194+
[ignore, ignore, ignore, ignore, ignore, ignore, file_key, file_size_in_mb] = line.split()[0:8]
195+
if file_key not in self.files_map:
196+
continue
197+
self.files_map[file_key]["size"] = int(file_size_in_mb)
197198

198199
def _construct_task_runtimes(self) -> None:
199200
task_start_times = {}
@@ -220,61 +221,45 @@ def _construct_task_runtimes(self) -> None:
220221
float(task_end_times[task_index] - task_start_times[task_index]) / 1_000_000.0)
221222

222223
def _construct_task_input_output_files(self) -> None:
223-
224224
# Initialize all entries
225225
for task_id in self.known_task_ids:
226226
self.task_input_files[task_id] = []
227227
self.task_output_files[task_id] = []
228228

229229
with open(self.taskgraph_file) as f:
230230
for line in f:
231-
if "->" not in line:
232-
continue
233-
if "file-task" in line: # Ignoring what I think are taskvine internal/specific things
231+
if line[0] == "#":
234232
continue
235-
line = line[:-1]
236-
# print(f"LINE: {line}")
237-
[source, ignore, destination] = line.split()
238-
# Remove quotes
239-
source = source [1:-1]
240-
destination = destination [1:-2]
241-
# Remove weird file- prefix
242-
source = source.replace("--", "-") # Sometimes there is an unexpected "--"!!
243-
destination = destination.replace("--", "-") # Sometimes there is an unexpected "--"!!
244-
# print(f"source: {source} destination: {destination}")
245-
if source.startswith("file-"):
246-
source = source[len("file-"):]
247-
if destination.startswith("file-"):
248-
destination = destination[len("file-"):]
249-
250-
if "task-" in source and "file-" not in source:
251-
try:
252-
task_id = int(source.split("-")[1])
253-
except ValueError as e:
254-
raise Exception(f"The source was {source} and the split around '-' failed!")
255-
256-
if task_id not in self.task_runtimes:
233+
234+
if line.startswith("TASK"):
235+
# TASK T23 "__vine_env_task-rnd-twtxpejwzsyiebf/bin/run_in_env" INPUTS task-rnd-twtxpejwzsyiebf file-meta-d7504c061a7afd9401c612b4ac7d6be6 file-meta-baab4e4516c4d93a8fcdcbba1a680af7 file-meta-693cb61fedd032b4ddec444b8cce6c89 file-rnd-fnrudlxsaqmlpqq OUTPUTS file-rnd-pdtdqayfmmxyxyp
236+
parts = line.split()
237+
task_key = parts[1] # T23
238+
task_id = int(task_key[1:]) # Remove the T
239+
if task_id not in self.known_task_ids:
257240
continue
258-
file_key = destination
259-
if file_key not in self.files_map:
260-
continue
261-
output_file = self.files_map[file_key]["filename"]
262-
self.task_output_files[task_id].append(output_file)
263-
elif "task" in destination and "file" not in destination:
264-
try:
265-
task_id = int(destination.split("-")[1])
266-
except ValueError as e:
267-
raise Exception(f"The destination was {destination} and the split around '-' failed!")
268-
if task_id not in self.task_runtimes:
269-
continue
270-
file_key = source
271-
if file_key not in self.files_map:
272-
continue
273-
input_file = self.files_map[file_key]["filename"]
274-
self.task_input_files[task_id].append(input_file)
275-
else:
276-
raise ValueError("Error in the taskgraph file")
277-
241+
input_section = False
242+
output_section = False
243+
for part in parts[2:]:
244+
if part == "INPUTS":
245+
input_section = True
246+
output_section = False
247+
continue
248+
elif part == "OUTPUTS":
249+
input_section = False
250+
output_section = True
251+
continue
252+
else:
253+
if input_section:
254+
file_key = part
255+
if file_key not in self.files_map:
256+
continue
257+
self.task_input_files[task_id].append(self.files_map[file_key]["filename"])
258+
elif output_section:
259+
file_key = part
260+
if file_key not in self.files_map:
261+
continue
262+
self.task_output_files[task_id].append(self.files_map[file_key]["filename"])
278263

279264
def _construct_workflow(self) -> None:
280265
# Create files and put them in a map
@@ -288,7 +273,6 @@ def _construct_workflow(self) -> None:
288273

289274
# Create all tasks
290275
task_map = {}
291-
# print(self.task_runtimes[16])
292276
for task_id in self.known_task_ids:
293277
task_name = "Task_%d" % task_id
294278
task = Task(name=task_name,

0 commit comments

Comments
 (0)