1+ """
2+ LLM-based translator scaffolding for WFCommons WFBench.
3+
4+ This translator inherits from the existing abstract Translator class
5+ from wfcommons.wfbench.translator and implements the required interface.
6+ """
7+
8+ from os import path
9+ import requests
10+ from wfcommons .wfbench .bench import WorkflowBenchmark
11+ from typing import Optional , Dict , Any , List
12+ from wfcommons .wfbench .translator .utils .llm_client import LLMClient
13+
14+
15+ class LLMTranslator ():
16+ """
17+ • Uses existing WfFormat as examples.
18+ • Accepts a trace from a NEW workflow system.
19+ • Sends all of this as grounding context to an LLM.
20+ • Produces a new recipe automatically.
21+
22+ The user does not implement translation logic.
23+ """
24+
25+ def __init__ (self ,
26+ llm_client : LLMClient ,
27+ examples_instances : Optional [List [str ]],
28+ num_examples : int = 3 ,
29+ system_prompt : Optional [str ] = None ,
30+ ** kwargs
31+ ):
32+ """
33+ Parameters
34+ ----------
35+ llm_client : Any
36+ An object with `.complete(prompt: str) -> str`.
37+ example_instances : List[str]
38+ URLs pointing to translator examples or benchmarks:
39+ - raw GitHub links
40+ - JSON traces
41+ - scripts
42+ num_examples : int, optional
43+ Number of example instances to include in the prompt.
44+ system_prompt : str, optional
45+ Override the default system instructions for the LLM.
46+ kwargs : dict
47+ Additional parameters passed to the parent Translator if needed.
48+ """
49+ super ().__init__ (** kwargs )
50+
51+ self .llm = llm_client
52+ self .examples_instances = examples_instances
53+ self .num_examples = num_examples
54+ self .system_prompt = system_prompt or DEFAULT_SYSTEM_PROMPT
55+
56+ def _load_examples (self ,
57+ path_list : List [str ] ,
58+ ref : str = "main" ) -> List [str ]:
59+ """
60+ Load and return the content from the provided URLs.
61+ Parameters
62+ ----------
63+ path_list : List[str]
64+ List of paths within the repository to fetch files from.
65+ Returns
66+ -------
67+ List[Dict[str, Any]]
68+ List of dictionaries with 'url', 'filename', and 'content' keys.
69+ """
70+ all_examples = {}
71+ for path in path_list :
72+ examples = self ._fetch_examples_from_path (
73+ path = path ,
74+ ref = ref
75+ )
76+ all_examples [path ] = examples
77+ return all_examples
78+
79+
80+ def _fetch_examples_from_path (self ,
81+ path : str ,
82+ ref : str = "main" ) -> List [Dict [str , Any ]]:
83+ """
84+ Fetch Python files from a specific path in a GitHub repository.
85+ Parameters
86+ ----------
87+ path : str
88+ Path within the repository to fetch files from.
89+ ref : str, optional
90+ Git reference (branch, tag, or commit SHA). Defaults to "main".
91+
92+ Returns
93+ -------
94+ List[Dict[str, Any]]
95+ List of dictionaries with 'url', 'filename', and 'content' keys.
96+ """
97+ print (f"Fetching examples from path: { path } at ref: { ref } " )
98+ url = f"https://api.github.com/repos/wfcommons/WfInstances/contents/{ path } ?ref={ ref } "
99+ listing = requests .get (url ).json ()
100+
101+ examples = []
102+ for item in listing :
103+ if item ["type" ] == "file" and item ["name" ].endswith (".json" ) and not item ["name" ].endswith (".md" ):
104+ raw = requests .get (item ["download_url" ]).text
105+ examples .append ({
106+ "url" : item ["download_url" ],
107+ "filename" : item ["name" ],
108+ "content" : raw
109+ })
110+ return examples
111+
112+ def translate (self , trace , metadata = None , json_schema : dict | None = None , ** kwargs ):
113+ # --- Normalize the trace into a string ---
114+ if isinstance (trace , dict ):
115+ import json
116+ trace_text = json .dumps (trace , indent = 2 )
117+ elif isinstance (trace , (list , tuple )):
118+ trace_text = "\n " .join (map (str , trace ))
119+ else :
120+ # assume Python code or any raw text
121+ trace_text = str (trace )
122+
123+ # grounding_examples = self._retrieve_examples(trace_text)
124+
125+ prompt = self ._build_prompt (
126+ trace = trace_text ,
127+ # examples=grounding_examples,
128+ metadata = metadata ,
129+ )
130+
131+ output = self .llm .complete (
132+ prompt ,
133+ response_format = {
134+ "type" : "json_schema" ,
135+ "json_schema" : {
136+ "name" : "WfFormat" ,
137+ "schema" : json_schema
138+ }
139+ }
140+ )
141+ return output
142+
143+ def _retrieve_examples (self , trace_text : str ):
144+ """
145+ Simple scoring method to choose top-k (num_examples) examples
146+ Replace with embeddings if desired.
147+ """
148+ examples = self ._load_examples (self .examples_instances )
149+ flat_examples = []
150+ if isinstance (examples , dict ):
151+ for v in examples .values ():
152+ flat_examples .extend (v )
153+ else :
154+ flat_examples = list (examples )
155+
156+ results = []
157+ for example in flat_examples :
158+ score = self ._similarity (trace_text , example ["content" ])
159+ results .append ((score , example ))
160+
161+ results .sort (reverse = True , key = lambda x : x [0 ])
162+ return [ex for _ , ex in results [: self .num_examples ]]
163+
164+ @staticmethod
165+ def _similarity (a : str , b : str ) -> float :
166+ """
167+ Very naive similarity; replace with embeddings for real use.
168+ """
169+ return len (set (a .split ()) & set (b .split ()))
170+
171+ def _build_prompt (
172+ self ,
173+ trace : str ,
174+ examples : List [Dict [str , Any ]] = [],
175+ metadata : Optional [Dict [str , Any ]] = None ,
176+ ) -> str :
177+
178+ prompt = self .system_prompt .strip () + "\n \n "
179+
180+ prompt += "=== EXAMPLE TRANSLATORS (FROM URLS) ===\n "
181+ for i , ex in enumerate (examples , 1 ):
182+ prompt += f"\n --- Example { i } ---\n "
183+ prompt += f"Source URL: { ex ['url' ]} \n "
184+ prompt += "Content:\n "
185+ prompt += ex ["content" ][:5000 ] # safety truncation
186+ prompt += "\n "
187+
188+ prompt += "\n === NEW WORKFLOW TRACE TO TRANSLATE (e.g., dispel4py) ===\n "
189+ prompt += trace + "\n "
190+
191+ if metadata :
192+ prompt += "\n === ADDITIONAL METADATA ===\n "
193+ for k , v in metadata .items ():
194+ prompt += f"- { k } : { v } \n "
195+
196+ prompt += (
197+ "\n === OUTPUT REQUIREMENTS ===\n "
198+ "Produce ONLY a JSON object compatible with WorkflowBenchmark.from_dict().\n "
199+ "Infer tasks, dependencies, runtimes, and workflow structure.\n "
200+ "Do not include explanations.\n "
201+ )
202+
203+ return prompt
204+
205+ def _parse_llm_output (self , output : str ) -> Dict [str , Any ]:
206+ import json
207+ import re
208+
209+ # Try direct parse first
210+ try :
211+ return json .loads (output )
212+ except json .JSONDecodeError :
213+ pass
214+
215+ # Extract JSON from markdown code blocks
216+ code_block_match = re .search (r'```(?:json)?\s*([\s\S]*?)\s*```' , output )
217+ if code_block_match :
218+ try :
219+ return json .loads (code_block_match .group (1 ))
220+ except json .JSONDecodeError :
221+ pass
222+
223+ # Extract JSON object by finding first { and last }
224+ first_brace = output .find ('{' )
225+ last_brace = output .rfind ('}' )
226+ if first_brace != - 1 and last_brace != - 1 and last_brace > first_brace :
227+ try :
228+ return json .loads (output [first_brace :last_brace + 1 ])
229+ except json .JSONDecodeError :
230+ pass
231+
232+ raise ValueError ("Could not extract valid JSON from LLM output." )
233+
234+ DEFAULT_SYSTEM_PROMPT = """
235+ You are an expert software engineer specializing in workflow systems.
236+ Translate workflow definitions/traces into WfCommons WfFormat 1.5 JSON.
237+
238+ OUTPUT THIS EXACT STRUCTURE:
239+ {
240+ "name": "<workflow_name - REQUIRED>",
241+ "schemaVersion": "1.5",
242+ "workflow": {
243+ "specification": {
244+ "tasks": [
245+ {
246+ "name": "<task_name>",
247+ "id": "<task_id>",
248+ "parents": [],
249+ "children": [],
250+ "inputFiles": [],
251+ "outputFiles": []
252+ }
253+ ],
254+ "files": []
255+ },
256+ "execution": {
257+ "makespanInSeconds": <number or 0 if unknown>,
258+ "executedAt": "<timestamp or "1970-01-01T00:00:00Z" if unknown>",
259+ "tasks": [
260+ {
261+ "id": "<task_id matching specification>",
262+ "runtimeInSeconds": <number or 0 if unknown>,
263+ "executedAt": "<timestamp or "1970-01-01T00:00:00Z" if unknown>",
264+ "command": {
265+ "program": "<program name>",
266+ "arguments": []
267+ },
268+ "coreCount": <number or 1>,
269+ "avgCPU": <percentage or 0>,
270+ "readBytes": <number or 0>,
271+ "writtenBytes": <number or 0>,
272+ "memoryInBytes": <number or 0>,
273+ "machines": ["unknown"]
274+ }
275+ ],
276+ "machines": [
277+ {
278+ "nodeName": "unknown"
279+ }
280+ ]
281+ }
282+ }
283+ }
284+
285+ RULES:
286+ 1. Use EXACTLY this structure - do not add or rename fields
287+ 2. "name" is REQUIRED - use the workflow name from the file, or the filename from metadata
288+ 3. "schemaVersion" is always "1.5"
289+ 4. Do NOT include optional top-level fields like "description", "createdAt", "author", "runtimeSystem" unless explicitly provided in source
290+ 5. For arrays not found, use empty array []
291+ 6. For numbers not found, use 0
292+ 7. For timestamp strings not found, use "1970-01-01T00:00:00Z"
293+ 8. Each task in specification MUST have: name, id, parents, children
294+ 9. Each task in execution MUST have: id (matching specification), runtimeInSeconds
295+ 10. Infer task dependencies from data flow (channels, inputs/outputs)
296+ 11. Only populate execution fields if runtime data exists in the source - otherwise use 0 or placeholder values
297+ 12. Output ONLY valid JSON - no explanations or markdown
298+ """
0 commit comments