1+ """
2+ LLM-based backward translator for WFCommons WFBench.
3+
4+ Translates WMS traces/logs INTO WfFormat JSON using an LLM.
5+ This is the reverse direction of the standard translators.
6+ """
7+
8+ from pathlib import Path
9+
10+ from dotenv import load_dotenv
11+ from typing import Optional , Dict , Any , List
12+ from wfcommons .wfbench .translator .utils .llm_client import (
13+ LLMClient , client_from_yaml ,
14+ )
15+ from wfcommons .wfbench .translator .utils .wfinstances import (
16+ load_instances , retrieve_instances , _word_overlap ,
17+ )
18+ from wfcommons .wfbench .translator .skills .loader import SkillLoader
19+
20+ load_dotenv () # loads .env from cwd (project root)
21+
22+ BACKWARD_SKILLS_DIR = Path (__file__ ).resolve ().parent / "skills" / "backward"
23+
24+
25+ class LLMBackwardTranslator ():
26+ """
27+ • Uses existing WfFormat as examples.
28+ • Accepts a trace from a NEW workflow system.
29+ • Sends all of this as grounding context to an LLM.
30+ • Produces a new trace in WfFormat automatically.
31+
32+ The user does not implement translation logic.
33+ """
34+
35+ def __init__ (self ,
36+ llm_client : LLMClient | None = None ,
37+ model_name : str | None = None ,
38+ models_file : str | Path | None = None ,
39+ examples_instances : Optional [List [str ]] = None ,
40+ num_examples : int = 3 ,
41+ system_prompt : Optional [str ] = None ,
42+ skill_name : Optional [str ] = None ,
43+ ** kwargs
44+ ):
45+ """
46+ Parameters
47+ ----------
48+ llm_client : LLMClient, optional
49+ A pre-configured LLMClient instance. Either this or
50+ ``model_name`` must be provided.
51+ model_name : str, optional
52+ Key from models.yaml (e.g. "qwen3", "ollama/llama3").
53+ The matching config is used to build an LLMClient automatically.
54+ models_file : str or Path, optional
55+ Path to a custom models YAML file. Defaults to the
56+ ``models.yaml`` shipped alongside this module.
57+ example_instances : List[str]
58+ URLs pointing to translator examples or benchmarks:
59+ - raw GitHub links
60+ - JSON traces
61+ - scripts
62+ num_examples : int, optional
63+ Number of example instances to include in the prompt.
64+ system_prompt : str, optional
65+ Override the default system instructions for the LLM.
66+ When provided, the skills system is bypassed entirely.
67+ skill_name : str, optional
68+ Explicit skill to use (e.g. "nextflow", "cwl").
69+ Auto-detected from trace content if not specified.
70+ kwargs : dict
71+ Additional parameters passed to the parent Translator if needed.
72+ """
73+ super ().__init__ (** kwargs )
74+
75+ if llm_client is None and model_name is None :
76+ raise ValueError ("Provide either llm_client or model_name." )
77+ if llm_client is not None and model_name is not None :
78+ raise ValueError ("Provide only one of llm_client or model_name, not both." )
79+
80+ if model_name is not None :
81+ llm_client = client_from_yaml (model_name , models_file )
82+
83+ self .llm = llm_client
84+ if isinstance (examples_instances , str ):
85+ examples_instances = [examples_instances ]
86+ self .examples_instances = examples_instances
87+ self .num_examples = num_examples
88+ self .skill_name = skill_name
89+ self ._skill_loader = SkillLoader (skills_dir = BACKWARD_SKILLS_DIR )
90+ self ._system_prompt_override = system_prompt
91+
92+ @staticmethod
93+ def available_skills () -> list [str ]:
94+ """Return the list of available backward skill names."""
95+ return SkillLoader (skills_dir = BACKWARD_SKILLS_DIR ).available_skills ()
96+
97+ def translate (self , trace , metadata = None , json_schema : dict | None = None , ** kwargs ):
98+ # --- Normalize the trace into a string ---
99+ if isinstance (trace , dict ):
100+ import json
101+ trace_text = json .dumps (trace , indent = 2 )
102+ elif isinstance (trace , (list , tuple )):
103+ trace_text = "\n " .join (map (str , trace ))
104+ else :
105+ # assume Python code or any raw text
106+ trace_text = str (trace )
107+
108+
109+ examples = self ._retrieve_examples (trace_text )
110+
111+ prompt = self ._build_prompt (
112+ trace = trace_text ,
113+ examples = examples ,
114+ metadata = metadata ,
115+ )
116+
117+ response_format = None
118+ if json_schema is not None :
119+ response_format = {
120+ "type" : "json_schema" ,
121+ "json_schema" : {
122+ "name" : "WfFormat" ,
123+ "schema" : json_schema
124+ }
125+ }
126+
127+ output = self .llm .complete (prompt , response_format = response_format )
128+ return output
129+
130+ def _retrieve_examples (self , trace_text : str ) -> List [Dict [str , Any ]]:
131+ """
132+ Fetch and rank WfInstances examples by relevance to the trace.
133+
134+ Returns an empty list if no examples_instances were provided or
135+ none of the paths exist in the WfInstances repository.
136+ """
137+ return retrieve_instances (
138+ self .examples_instances or [],
139+ num_examples = self .num_examples ,
140+ score_against = trace_text ,
141+ )
142+
143+ def _build_prompt (
144+ self ,
145+ trace : str ,
146+ examples : List [Dict [str , Any ]] = [],
147+ metadata : Optional [Dict [str , Any ]] = None ,
148+ ) -> str :
149+
150+ if self ._system_prompt_override is not None :
151+ # Explicit override: use as-is (backward compat)
152+ system_prompt = self ._system_prompt_override
153+ else :
154+ # Compose from skill files
155+ skill_hint = self .skill_name
156+ if not skill_hint and metadata and "source_system" in metadata :
157+ skill_hint = metadata ["source_system" ].lower ()
158+ system_prompt = self ._skill_loader .compose_prompt (
159+ trace_text = trace ,
160+ skill_name = skill_hint ,
161+ )
162+
163+ prompt = system_prompt .strip () + "\n \n "
164+
165+ prompt += "=== EXAMPLE WORKFLOW INSTANCES (WFFORMAT) ===\n "
166+ for i , ex in enumerate (examples , 1 ):
167+ prompt += f"\n --- Example { i } ---\n "
168+ prompt += f"Source URL: { ex ['url' ]} \n "
169+ prompt += "Content:\n "
170+ prompt += ex ["content" ][:5000 ] # safety truncation
171+ prompt += "\n "
172+
173+ prompt += "\n === NEW WORKFLOW TRACE TO TRANSLATE (e.g., dispel4py) ===\n "
174+ prompt += trace + "\n "
175+
176+ if metadata :
177+ prompt += "\n === ADDITIONAL METADATA ===\n "
178+ for k , v in metadata .items ():
179+ prompt += f"- { k } : { v } \n "
180+
181+ prompt += (
182+ "\n === OUTPUT REQUIREMENTS ===\n "
183+ "Produce ONLY a JSON object compatible with WorkflowBenchmark.from_dict().\n "
184+ "Infer tasks, dependencies, runtimes, and workflow structure.\n "
185+ "Do not include explanations.\n "
186+ )
187+
188+ return prompt
189+
190+ def _parse_llm_output (self , output : str ) -> Dict [str , Any ]:
191+ import json
192+ import re
193+
194+ # Try direct parse first
195+ try :
196+ return json .loads (output )
197+ except json .JSONDecodeError :
198+ pass
199+
200+ # Extract JSON from markdown code blocks
201+ code_block_match = re .search (r'```(?:json)?\s*([\s\S]*?)\s*```' , output )
202+ if code_block_match :
203+ try :
204+ return json .loads (code_block_match .group (1 ))
205+ except json .JSONDecodeError :
206+ pass
207+
208+ # Extract JSON object by finding first { and last }
209+ first_brace = output .find ('{' )
210+ last_brace = output .rfind ('}' )
211+ if first_brace != - 1 and last_brace != - 1 and last_brace > first_brace :
212+ try :
213+ return json .loads (output [first_brace :last_brace + 1 ])
214+ except json .JSONDecodeError :
215+ pass
216+
217+ raise ValueError ("Could not extract valid JSON from LLM output." )
0 commit comments