1+ #!/usr/bin/env python
2+ # -*- coding: utf-8 -*-
3+ #
4+ # Copyright (c) 2024 The WfCommons Team.
5+ #
6+ # This program is free software: you can redistribute it and/or modify
7+ # it under the terms of the GNU General Public License as published by
8+ # the Free Software Foundation, either version 3 of the License, or
9+ # (at your option) any later version.
10+
11+ import pytest
12+
13+ from wfcommons .common import Task , Workflow
14+ from wfcommons .version import __version__ , __schema_version__
15+
16+
17+ tasks_list = [
18+ ([Task (name = "task_1" , task_id = "task_1" , runtime = 15.0 ), Task (name = "task_2" , task_id = "task_2" , runtime = 30.0 )]),
19+ ([Task (name = "task_1" , task_id = "task_1" , runtime = 15.0 ), Task (name = "task_2" , task_id = "task_2" , runtime = 30.0 ), Task (name = "task_3" , task_id = "task_3" , runtime = 60.0 )]),
20+ ]
21+
22+
23+ class TestWorkflow :
24+
25+ @pytest .fixture
26+ def workflow (self ) -> Workflow :
27+ return Workflow (
28+ name = "Workflow Test" ,
29+ makespan = 100.0
30+ )
31+
32+ @pytest .mark .unit
33+ def test_workflow_creation (self , workflow : Workflow ) -> None :
34+ workflow .write_json ("/tmp/workflow_test.json" )
35+
36+ workflow_json = {
37+ "name" : "Workflow Test" ,
38+ "description" : "Instance generated with WfCommons - https://wfcommons.org" ,
39+ "createdAt" : workflow .created_at ,
40+ "schemaVersion" : f"{ __schema_version__ } " ,
41+ "author" : {
42+ "name" : "8uf" ,
43+ "email" : "support@wfcommons.org"
44+ },
45+ "workflow" : {
46+ "specification" : {
47+ "tasks" : [],
48+ "files" : []
49+ },
50+ "execution" : {
51+ "makespanInSeconds" : 100.0 ,
52+ "executedAt" : workflow .executed_at ,
53+ "tasks" : []
54+ }
55+ },
56+ "runtimeSystem" : {
57+ "name" : "WfCommons" ,
58+ "version" : f"{ __version__ } " ,
59+ "url" : "https://docs.wfcommons.org/en/v1.1.dev/"
60+ }
61+ }
62+
63+ assert (workflow .workflow_json == workflow_json )
64+
65+ @pytest .mark .unit
66+ def test_workflow_empty_tasks (self , workflow : Workflow ) -> None :
67+ assert (not workflow .tasks )
68+
69+ @pytest .mark .parametrize (("tasks" ), tasks_list )
70+ @pytest .mark .unit
71+ def test_workflow_add_tasks (self , workflow : Workflow , tasks : list [Task ]) -> None :
72+ tasks_list = []
73+ for task in tasks :
74+ workflow .add_task (task )
75+ tasks_list .append (task .task_id )
76+ assert (workflow .leaves () == tasks_list )
77+
78+ @pytest .mark .parametrize (("tasks" ), tasks_list )
79+ @pytest .mark .unit
80+ def test_workflow_add_dependencies_roots (self , workflow : Workflow , tasks : list [Task ]) -> None :
81+ previous_task = None
82+ for task in tasks :
83+ workflow .add_task (task )
84+ if previous_task :
85+ workflow .add_dependency (previous_task .task_id , task .task_id )
86+ previous_task = task
87+ assert (workflow .roots () == [tasks [0 ].task_id ])
88+
89+ @pytest .mark .parametrize (("tasks" ), tasks_list )
90+ @pytest .mark .unit
91+ def test_workflow_add_dependencies_leaves (self , workflow : Workflow , tasks : list [Task ]) -> None :
92+ previous_task = None
93+ for task in tasks :
94+ workflow .add_task (task )
95+ if previous_task :
96+ workflow .add_dependency (previous_task .task_id , task .task_id )
97+ previous_task = task
98+ assert (workflow .leaves () == [previous_task .task_id ])
0 commit comments