Skip to content

Commit 59fa495

Browse files
Merge pull request #112 from wfcommons/taskvine_logger
TaskVine logger improvements (via validation on large execution logs)
2 parents d5090cf + 4547ebd commit 59fa495

1 file changed

Lines changed: 48 additions & 44 deletions

File tree

wfcommons/wfinstances/logs/taskvine.py

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def __init__(self,
8080
self.task_runtimes = {}
8181
self.task_input_files = {}
8282
self.task_output_files = {}
83+
self.known_task_ids = []
8384

8485
def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
8586
"""
@@ -101,16 +102,35 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
101102

102103
# Construct the task command-line array
103104
self._construct_task_command_lines()
104-
# sys.stderr.write(str(self.task_command_lines) + "\n")
105+
# sys.stderr.write(str(self.task_command_lines.keys()) + "\n")
106+
107+
# At this point, the ONLY tasks we care about are the ones for which we have a command-line
108+
self.known_task_ids = sorted(self.task_command_lines.keys())
105109

106110
# Construct file map
107111
self._construct_file_map()
108112
# sys.stderr.write("FILEMAP: " + str(self.files_map) + "\n")
113+
for file_key in self.files_map.keys():
114+
if not "size" in self.files_map[file_key]:
115+
sys.stderr.write(f"Warning: Could not determine size for file with key {file_key}: assuming zero bytes.\n")
116+
self.files_map[file_key]["size"] = 0
117+
sys.stderr.write(f"Identified {len(self.files_map)} valid files\n")
109118

110119
# Construct the task runtimes
111120
self._construct_task_runtimes()
112121
# sys.stderr.write("TASK RUN TIMES: " + str(self.task_runtimes) + "\n")
113122

123+
# Check whether every known task has a runtime, and if not forget it :(
124+
to_remove = []
125+
for task_id in self.known_task_ids:
126+
if task_id not in self.task_runtimes.keys():
127+
sys.stderr.write(f"Warning: Ignoring task {task_id} because runtime could not be determined.\n")
128+
to_remove.append(task_id)
129+
for victim in to_remove:
130+
self.known_task_ids.remove(victim)
131+
132+
sys.stderr.write(f"Identified {len(self.known_task_ids)} valid tasks\n")
133+
114134
# Construct the input and output file for each task
115135
self._construct_task_input_output_files()
116136
# print("TASK INPUT FILES: " + str(self.task_input_files))
@@ -130,6 +150,8 @@ def _construct_task_command_lines(self) -> None:
130150
command_line = previous_line[previous_line.find("busy on '") + len("busy on '"):-2]
131151
self.task_command_lines[int(task_index)] = command_line
132152
executable = command_line.split()[0]
153+
# May not be full-proof in case of commands like "export A=b; executable ..." but
154+
# may help.....
133155
self.filenames_to_ignore.add(executable)
134156
previous_line = line
135157

@@ -142,78 +164,60 @@ def _construct_file_map(self) -> None:
142164
for line in f:
143165
if "__vine_env_task" in line: # Ignore that weird task/file
144166
continue
145-
# 2025/09/09 21:12:48.02 vine_manager[239]vine: tx to dab178765b01 (127.0.0.1:34382): infile file-rnd-fmtpwpiobiumeze blastall_00000016_outfile_0016 0
146-
if "infile " in line:
147-
[file_key, filename] = line[line.find("infile ") + len("infile "):].split()[0:2]
148-
# 2025/09/09 21:12:42.12 vine_manager[239]vine: tx to dab178765b01 (127.0.0.1:34382): outfile file-rnd-jpnzrjrjnxxqhej blastall_00000017_outfile_0017 0
149-
elif "outfile " in line:
150-
[file_key, filename] = line[line.find("outfile ") + len("outfile "):].split()[0:2]
167+
if "infile " in line :
168+
# 2025/09/09 21:12:48.02 vine_manager[239]vine: tx to dab178765b01 (127.0.0.1:34382): infile file-rnd-fmtpwpiobiumeze blastall_00000016_outfile_0016 0
169+
[file_key, filename] = line[line.find("infile ") + len("infile "):].split()[:2]
170+
elif "outfile " in line and "completed with outfile " not in line and "outfile =" not in line:
171+
# 2025/09/30 18:37:19.74 vine_manager[1849324]vine: tx to d64cepyc028.crc.nd.edu (10.32.94.18:47558): outfile temp-rnd-pidiwheippcwbeu fde2b5eb-9713-423a-8bc6-f4f9263ad20b.pkl 0 3
172+
[file_key, filename] = line[line.find("outfile ") + len("outfile "):].split()[:2]
151173
else:
152174
continue
153175
if filename in self.filenames_to_ignore:
154176
continue
155-
self.files_map[file_key] = {"filename": filename}
177+
# NOTE THAT THE FILENAME MAY NOT BE UNIQUE IN TASKVINE WORKFLOWS, SO
178+
# WE ADD THE KEY
179+
self.files_map[file_key] = {"filename": filename + "." + file_key}
156180
filename_to_key_map[filename] = file_key
157181

158-
# Another pass through the debug file to get the actual file paths
159-
with open(self.debug_file) as f:
160-
for line in f:
161-
# 2025/09/09 21:12:48.01 vine_manager[239]vine: dab178765b01 (127.0.0.1:34382) needs file data/blastall_00000003_outfile_0003 as blastall_00000003_outfile_0003
162-
if "needs file " in line:
163-
[full_path, ignore, filename] = line[line.find("needs file ") + len("needs file "):].split()[0:3]
164-
file_key = filename_to_key_map.get(filename)
165-
# 2025/09/09 21:12:47.92 vine_manager[239]vine: dab178765b01 (127.0.0.1:34382) sending back file-rnd-jajwzwsrtyzbkfs to data/blastall_00000020_outfile_0020
166-
elif "sending back " in line:
167-
[file_key, ignore, full_path] = line[line.find("sending back ") + len("sending back "):].split()[0:3]
168-
filename = self.files_map[file_key]["filename"]
169-
else:
170-
continue
171-
if filename in self.filenames_to_ignore:
172-
continue
173-
self.files_map[file_key]["path"] = full_path
174-
175182
# Pass through the transactions file to get the file sizes
176-
with open(self.transactions_file) as f:
183+
with open(self.debug_file) as f:
177184
for line in f:
178-
# 1757452362084671 239 WORKER worker-50dc215f08057f4005f3b65dee08592f TRANSFER OUTPUT file-rnd-wzkjcrgiivvzbci 227273 1327 1757452362083301
179-
# 1757452358704968 239 WORKER worker-50dc215f08057f4005f3b65dee08592f TRANSFER INPUT file-meta-9b84b334875319e856f72be634aae964 17648 1129 1757452358703835
180-
if "TRANSFER INPUT " in line:
181-
[file_key, file_size] = line[line.find("TRANSFER INPUT ") + len("TRANSFER INPUT "):].split()[0:2]
182-
elif "TRANSFER OUTPUT " in line:
183-
[file_key, file_size] = line[line.find("TRANSFER OUTPUT ") + len("TRANSFER OUTPUT "):].split()[0:2]
184-
elif "CACHE_UPDATE " in line:
185-
[file_key, file_size] = line[line.find("CACHE_UPDATE ") + len("CACHE_UPDATE "):].split()[0:2]
185+
if "): file " in line:
186+
[file_key, file_size] = line[line.find("): file ") + len("): file "):].split()[0:2]
186187
else:
187188
continue
188189
if file_key in self.files_map:
189190
self.files_map[file_key]["size"] = int(file_size)
190191

191-
# print(str(self.files_map))
192-
193192

194193
def _construct_task_runtimes(self) -> None:
195194
task_start_times = {}
196195
task_end_times = {}
197196

197+
# This method consists only of
198+
198199
with open(self.transactions_file) as f:
199200
for line in f:
200201
if line[0] == "#":
201202
continue
202203
if "RUNNING" in line:
203204
[start_date, ignore, ignore, task_index] = line.split()[0:4]
204-
task_start_times[int(task_index)] = int(start_date)
205+
if int(task_index) in self.known_task_ids:
206+
task_start_times[int(task_index)] = int(start_date)
205207
elif "DONE" in line:
206208
[end_date, ignore, ignore, task_index] = line.split()[0:4]
207-
task_end_times[int(task_index)] = int(end_date)
209+
if int(task_index) in self.known_task_ids:
210+
task_end_times[int(task_index)] = int(end_date)
208211

209212
for task_index in task_start_times:
210-
self.task_runtimes[task_index] = (
213+
if task_index in task_end_times:
214+
self.task_runtimes[task_index] = (
211215
float(task_end_times[task_index] - task_start_times[task_index]) / 1_000_000.0)
212216

213217
def _construct_task_input_output_files(self) -> None:
214218

215219
# Initialize all entries
216-
for task_id in self.task_runtimes.keys():
220+
for task_id in self.known_task_ids:
217221
self.task_input_files[task_id] = []
218222
self.task_output_files[task_id] = []
219223

@@ -238,7 +242,7 @@ def _construct_task_input_output_files(self) -> None:
238242
if destination.startswith("file-"):
239243
destination = destination[len("file-"):]
240244

241-
if "task" in source and "file" not in source:
245+
if "task-" in source and "file-" not in source:
242246
try:
243247
task_id = int(source.split("-")[1])
244248
except ValueError as e:
@@ -273,15 +277,15 @@ def _construct_workflow(self) -> None:
273277
for file_key in self.files_map:
274278
filename = self.files_map[file_key]["filename"]
275279
file_size = self.files_map[file_key]["size"]
276-
# file_path = self.files_map[file_key]["path"]
277280
file_object_map[filename] = File(file_id=filename,
278281
size=file_size,
279282
logger=self.logger)
280283

281284
# Create all tasks
282285
task_map = {}
283-
for task_id in self.task_runtimes:
284-
task_name = "Task %d" % task_id
286+
# print(self.task_runtimes[16])
287+
for task_id in self.known_task_ids:
288+
task_name = "Task_%d" % task_id
285289
task = Task(name=task_name,
286290
task_id=task_name,
287291
task_type=TaskType.COMPUTE,

0 commit comments

Comments
 (0)