Skip to content

Commit c9ce7e6

Browse files
Merge pull request #140 from wfcommons/ro-crate-parser
Created RO-crate parser for RO-crate files (generated by Streamflow)
2 parents 96f104e + 18eb8ef commit c9ce7e6

3 files changed

Lines changed: 218 additions & 1 deletion

File tree

wfcommons/wfinstances/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
# the Free Software Foundation, either version 3 of the License, or
99
# (at your option) any later version.
1010

11-
from .logs import MakeflowLogsParser, NextflowLogsParser, PegasusLogsParser, HierarchicalPegasusLogsParser
11+
from .logs import MakeflowLogsParser, NextflowLogsParser, PegasusLogsParser, HierarchicalPegasusLogsParser, ROCrateLogsParser
1212
from .schema import SchemaValidator
1313
from .instance import Instance
1414
from .instance_analyzer import InstanceAnalyzer, InstanceElement

wfcommons/wfinstances/logs/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@
1313
from .nextflow import NextflowLogsParser
1414
from .pegasus import PegasusLogsParser
1515
from .pegasusrec import HierarchicalPegasusLogsParser
16+
from .ro_crate import ROCrateLogsParser
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2021 The WfCommons Team.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
11+
import json
12+
import itertools
13+
import math
14+
import os
15+
import pathlib
16+
17+
from datetime import datetime, timezone
18+
from logging import Logger
19+
from typing import List, Optional
20+
21+
from .abstract_logs_parser import LogsParser
22+
from ...common.file import File
23+
from ...common.machine import Machine
24+
from ...common.task import Task, TaskType
25+
from ...common.workflow import Workflow
26+
27+
28+
class ROCrateLogsParser(LogsParser):
29+
"""
30+
Parse RO Crate directory to generate workflow instance. This parser has some limitations when it comes to non-file
31+
dependencies between tasks. It determines these via ParameterConnection type objects in the ro-crate-metadata.json,
32+
which contain the "instrument" (the cwl file they execute) of the parent and child task. However, since tasks
33+
can share an "instrument", the parser creates dependencies between every task pair matching the parent and child
34+
"instrument"s, assuming they're all related.
35+
36+
:param crate_dir: RO crate directory (contains ro-crate-metadata.json).
37+
:type crate_dir: pathlib.Path
38+
:param description: Workflow instance description.
39+
:type description: Optional[str]
40+
:param logger: The logger where to log information/warning or errors (optional).
41+
:type logger: Optional[Logger]
42+
"""
43+
44+
def __init__(self,
45+
crate_dir: pathlib.Path,
46+
description: Optional[str] = None,
47+
logger: Optional[Logger] = None) -> None:
48+
"""Create an object of the RO crate parser."""
49+
50+
# TODO: Decide if these should be RO crate or Streamflow or whatev
51+
super().__init__('Streamflow-ROCrate', 'https://w3id.org/workflowhub/workflow-ro-crate/1.0', description, logger)
52+
53+
# Sanity check
54+
if not crate_dir.is_dir():
55+
raise OSError(f'The provided path does not exist or is not a folder: {crate_dir}')
56+
57+
metadata: pathlib.Path = crate_dir / 'ro-crate-metadata.json'
58+
if not metadata.is_file():
59+
raise OSError(f'Unable to find ro-crate-metadata.json file in: {crate_dir}')
60+
self.metadata = metadata
61+
62+
self.crate_dir: pathlib.Path = crate_dir
63+
64+
self.file_objects = {}
65+
66+
67+
def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
68+
"""
69+
Create workflow instance based on the workflow execution logs.
70+
71+
:param workflow_name: The workflow name.
72+
:type workflow_name: Optional[str]
73+
74+
:return: A workflow instance object.
75+
:rtype: Workflow
76+
"""
77+
self.workflow_name = workflow_name
78+
79+
# create base workflow instance object
80+
self.workflow = Workflow(name=self.workflow_name,
81+
description=self.description,
82+
runtime_system_name=self.wms_name,
83+
runtime_system_url=self.wms_url)
84+
85+
with open(self.metadata, 'r') as f:
86+
self.data = json.load(f)
87+
self.graph_data = self.data.get('@graph', [])
88+
89+
# Dictionary of ro-crate objects by "@id"
90+
self.lookup = {item["@id"]: item for item in self.graph_data}
91+
92+
# Find id of the main workflow
93+
overview = self.lookup.get("./")
94+
main_workflow_id = overview.get("mainEntity").get("@id")
95+
96+
create_actions = list(filter((lambda x: x.get('@type') == "CreateAction"), self.graph_data))
97+
self._create_tasks(create_actions, main_workflow_id)
98+
99+
100+
101+
return self.workflow
102+
103+
def _create_tasks(self, create_actions, main_workflow_id):
104+
# Object to track dependencies between tasks based on files
105+
files = {}
106+
# Object to track task's "instrument" for further dependencies
107+
instruments = {}
108+
109+
for create_action in create_actions:
110+
111+
# Handle overall workflow create_action then skip
112+
if create_action["name"] == f"Run of workflow/{main_workflow_id}":
113+
self._process_main_workflow(create_action)
114+
continue
115+
116+
# Get all input & output for the create_action
117+
input = [obj['@id'] for obj in create_action['object']]
118+
output = [obj['@id'] for obj in create_action['result']]
119+
120+
# Filter for actual files
121+
input_files = self._filter_file_ids(input)
122+
output_files = self._filter_file_ids(output)
123+
124+
create_action['name'] = create_action['name'].removeprefix("Run of workflow/")
125+
126+
task = Task(name=create_action['name'],
127+
task_id=create_action['@id'],
128+
task_type=TaskType.COMPUTE,
129+
runtime=self._time_diff(create_action['startTime'], create_action['endTime']),
130+
executed_at=create_action['startTime'],
131+
input_files=self._get_file_objects(input_files),
132+
output_files=self._get_file_objects(output_files),
133+
logger=self.logger)
134+
self.workflow.add_task(task)
135+
136+
# For each file, track which task(s) it is in/output for
137+
for infile in input_files:
138+
if infile not in files:
139+
files[infile] = {}
140+
if 'in' not in files[infile]:
141+
files[infile]['in'] = []
142+
143+
files[infile]['in'].append(create_action['@id'])
144+
145+
for outfile in output_files:
146+
if outfile not in files:
147+
files[outfile] = {}
148+
if 'out' not in files[outfile]:
149+
files[outfile]['out'] = []
150+
151+
files[outfile]['out'].append(create_action['@id'])
152+
153+
# For each task, track which 'instrument' it uses
154+
instrument = create_action['instrument']['@id']
155+
if instrument not in instruments:
156+
instruments[instrument] = []
157+
instruments[instrument].append(create_action['@id'])
158+
159+
self._add_dependencies(files, instruments)
160+
161+
def _add_dependencies(self, files, instruments):
162+
for file in files.values():
163+
for parent in file.get('out', []):
164+
for child in file.get('in', []):
165+
self.workflow.add_dependency(parent, child)
166+
167+
# Assumes
168+
parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
169+
for parameter_connection in parameter_connections:
170+
source = parameter_connection["sourceParameter"]["@id"]
171+
source = source.rsplit("#", 1)[0] # Trim to get instrument
172+
173+
target = parameter_connection["targetParameter"]["@id"]
174+
target = target.rsplit("#", 1)[0] # Trim to get instrument
175+
176+
for parent in instruments.get(source, []):
177+
for child in instruments.get(target, []):
178+
self.workflow.add_dependency(parent, child)
179+
180+
def _time_diff(self, start_time, end_time):
181+
diff = datetime.fromisoformat(end_time) - datetime.fromisoformat(start_time)
182+
return diff.total_seconds()
183+
184+
def _get_file_objects(self, files):
185+
# Given a list of "@id"s, returns the File objs.
186+
output = []
187+
for file in files:
188+
if file not in self.file_objects:
189+
self.file_objects[file] = File(file_id=file,
190+
size=os.path.getsize(f"{self.crate_dir}/{file}"),
191+
logger=self.logger)
192+
output.append(self.file_objects[file])
193+
return output
194+
195+
def _filter_file_ids(self, ids):
196+
# Given a list of "@id"s, returns those with the File type as well as unpacks PropertyValue into Files.
197+
file_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'File', ids))
198+
199+
property_value_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'PropertyValue', ids))
200+
for property_value_id in property_value_ids:
201+
property_values = self.lookup.get(property_value_id)['value']
202+
203+
# Filter out values without "@id"s (i.e. int values, etc.)
204+
pv_contained_ids = list(filter(lambda x: isinstance(x, dict) and "@id" in x, property_values))
205+
pv_contained_ids = [obj["@id"] for obj in pv_contained_ids]
206+
207+
# Recurse to verify everything's a file
208+
pv_filtered_ids = self._filter_file_ids(pv_contained_ids)
209+
210+
# Filter duplicates while adding
211+
file_ids = list(set(file_ids + pv_filtered_ids))
212+
213+
return file_ids
214+
def _process_main_workflow(self, main_workflow):
215+
self.workflow.makespan = self._time_diff(main_workflow['startTime'], main_workflow['endTime'])
216+
self.workflow.executed_at = main_workflow['startTime']

0 commit comments

Comments
 (0)