Skip to content

Commit ce5ed61

Browse files
committed
added screen capture and support for /linux/capture API
1 parent 69ad124 commit ce5ed61

2 files changed

Lines changed: 294 additions & 10 deletions

File tree

Framework/Built_In_Automation/Desktop/Linux/BuiltInFunctions.py

Lines changed: 293 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,123 @@ def get_latest_app_name() -> str | None:
124124
return None
125125

126126

127-
def capture_screenshot(file_path: str) -> bool:
128-
"""Capture screenshot using available tools (scrot, gnome-screenshot, import)."""
127+
def _get_window_id_for_app(app_name: str | None) -> str | None:
128+
"""Return a window id for the requested app name, or None if not found.
129+
130+
- If app_name is provided, tries to find the first visible window with that name using xdotool.
131+
- Falls back to the active window using xdotool if no app_name was resolved or found.
132+
"""
133+
try:
134+
if app_name:
135+
# search for visible window by name (use substring regex match)
136+
# use case-insensitive matching in regex
137+
pattern = f"(?i).*{re.escape(app_name)}.*"
138+
res = subprocess.run(['xdotool', 'search', '--onlyvisible', '--name', pattern], capture_output=True, text=True)
139+
win_lines = [l for l in res.stdout.splitlines() if l.strip()]
140+
if win_lines:
141+
return win_lines[0].strip()
142+
# try class match
143+
res = subprocess.run(['xdotool', 'search', '--onlyvisible', '--class', pattern], capture_output=True, text=True)
144+
win_lines = [l for l in res.stdout.splitlines() if l.strip()]
145+
if win_lines:
146+
return win_lines[0].strip()
147+
# try matching by exec command (from desktop file) or by process name (pgrep)
148+
try:
149+
app_key, matched_name, exec_cmd = find_best_app_match(app_name) or (None, None, None)
150+
except Exception:
151+
app_key, matched_name, exec_cmd = (None, None, None)
152+
153+
if exec_cmd:
154+
# try to find processes using exec_cmd
155+
for pid in get_process_ids(exec_cmd):
156+
res = subprocess.run(['xdotool', 'search', '--onlyvisible', '--pid', str(pid)], capture_output=True, text=True)
157+
win_lines = [l for l in res.stdout.splitlines() if l.strip()]
158+
if win_lines:
159+
return win_lines[0].strip()
160+
161+
# try matching by pid for processes that match app_name
162+
for pid in get_process_ids(app_name):
163+
res = subprocess.run(['xdotool', 'search', '--onlyvisible', '--pid', str(pid)], capture_output=True, text=True)
164+
win_lines = [l for l in res.stdout.splitlines() if l.strip()]
165+
if win_lines:
166+
return win_lines[0].strip()
167+
# as a last resort, iterate visible windows and check names for substring match
168+
res = subprocess.run(['xdotool', 'search', '--onlyvisible', '--name', '.*'], capture_output=True, text=True)
169+
win_lines = [l for l in res.stdout.splitlines() if l.strip()]
170+
for wid in win_lines:
171+
try:
172+
name = subprocess.run(['xdotool', 'getwindowname', wid], capture_output=True, text=True).stdout.strip()
173+
if app_name.lower() in name.lower():
174+
return wid.strip()
175+
except Exception:
176+
continue
177+
# fallback to active window
178+
res = subprocess.run(['xdotool', 'getactivewindow'], capture_output=True, text=True)
179+
winid = res.stdout.strip()
180+
if winid:
181+
CommonUtil.ExecLog(MODULE_NAME, f"Selected window id {winid} for app '{app_name}'", 1)
182+
CommonUtil.ExecLog(MODULE_NAME, f"Trying xwd/convert capture for window id: {winid}", 1)
183+
return winid
184+
except Exception as e:
185+
CommonUtil.ExecLog(MODULE_NAME, f"Window lookup error: {e}", 3)
186+
return None
187+
188+
189+
def capture_screenshot(file_path: str, app_name: str | None = None) -> bool:
190+
"""Capture screenshot of the desired window using xwd (and ImageMagick convert),
191+
falling back to scrot/gnome-screenshot/import if necessary.
192+
193+
The function will try to capture the latest opened application window if available
194+
(via `get_latest_app_name()`); otherwise it will capture the currently active window.
195+
"""
196+
desired_app = app_name or get_latest_app_name()
197+
# Attempt xwd + convert flow first (capture only the target window)
198+
try:
199+
winid = _get_window_id_for_app(desired_app)
200+
if winid:
201+
# Try to use xwd + convert (ImageMagick) to create the requested file
202+
# If convert is not available, xwd will produce an .xwd output (which may not be desired)
203+
# We'll attempt convert and if it fails fall back to writing xwd file then try to convert
204+
convert_available = subprocess.run(['which', 'convert'], capture_output=True, text=True).returncode == 0
205+
if convert_available:
206+
# Run xwd and pipe to convert which will write the final file
207+
p1 = subprocess.Popen(['xwd', '-silent', '-id', winid], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
208+
p2 = subprocess.Popen(['convert', 'xwd:-', file_path], stdin=p1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
209+
if p1.stdout:
210+
p1.stdout.close()
211+
out, err = p2.communicate()
212+
if p2.returncode == 0 and os.path.exists(file_path) and os.path.getsize(file_path) > 0:
213+
return True
214+
else:
215+
# convert not available, write xwd to file and then optionally convert using import
216+
tmp_xwd = file_path if file_path.endswith('.xwd') else f"{file_path}.xwd"
217+
exit_code = subprocess.run(['xwd', '-silent', '-id', winid, '-out', tmp_xwd], capture_output=True).returncode
218+
if exit_code == 0 and os.path.exists(tmp_xwd) and os.path.getsize(tmp_xwd) > 0:
219+
# If desired output wasn't .xwd and ImageMagick 'convert' exists, try to convert
220+
if not file_path.endswith('.xwd') and subprocess.run(['which', 'convert'], capture_output=True).returncode == 0:
221+
conv_exit = subprocess.run(['convert', tmp_xwd, file_path], capture_output=True).returncode
222+
if conv_exit == 0 and os.path.exists(file_path) and os.path.getsize(file_path) > 0:
223+
# remove the temporary xwd file
224+
try:
225+
os.remove(tmp_xwd)
226+
except Exception:
227+
pass
228+
return True
229+
# If the caller wanted .xwd (or conversion not possible), move or rename temporary file
230+
if tmp_xwd != file_path:
231+
try:
232+
os.replace(tmp_xwd, file_path)
233+
except Exception:
234+
pass
235+
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
236+
return True
237+
except FileNotFoundError:
238+
# xwd not present; fallback to previously supported tools
239+
pass
240+
except Exception as e:
241+
CommonUtil.ExecLog(MODULE_NAME, f"xwd/convert screenshot failed: {e}", 3)
242+
243+
# Fallback: try scrot / gnome-screenshot / import (root window capture) like before
129244
tools = [
130245
["scrot", file_path],
131246
["gnome-screenshot", "-f", file_path],
@@ -139,8 +254,8 @@ def capture_screenshot(file_path: str) -> bool:
139254
return True
140255
except (subprocess.CalledProcessError, FileNotFoundError):
141256
continue
142-
143-
CommonUtil.ExecLog(MODULE_NAME, "Failed to capture screenshot. Ensure scrot, gnome-screenshot, or imagemagick is installed.", 3)
257+
258+
CommonUtil.ExecLog(MODULE_NAME, "Failed to capture screenshot. Ensure xwd/xdotool and at least one screenshot tool like scrot, gnome-screenshot, or imagemagick are installed.", 3)
144259
return False
145260

146261

@@ -218,13 +333,23 @@ def get_extended_info(accessible):
218333
return info_str
219334

220335
def get_position_info(accessible):
336+
"""Return position string for XML with coordinates relative to:
337+
- 'desktop' (default): absolute coordinates using DESKTOP_COORDS
338+
- 'app': relative to the application's window origin
339+
- 'parent': relative to the immediate parent's origin
340+
341+
If computing a relative coordinate fails, falls back to desktop coordinates.
342+
"""
221343
position_str = ''
222344
try:
223345
component_iface = accessible.queryComponent()
224346
if component_iface:
225-
x, y = component_iface.getPosition(pyatspi.DESKTOP_COORDS)
226-
position_str += f' x="{x}" y="{y}"'
347+
# Get absolute (desktop) position first
348+
x_abs, y_abs = component_iface.getPosition(pyatspi.DESKTOP_COORDS)
227349
width, height = component_iface.getSize()
350+
x, y = x_abs, y_abs
351+
352+
position_str += f' x="{x}" y="{y}"'
228353
position_str += f' width="{width}" height="{height}"'
229354
except Exception:
230355
pass
@@ -283,7 +408,7 @@ def dump_node(node: Accessible, indent_level=0, path=[], recursive=True) -> list
283408
for i in range(child_count):
284409
child = node.get_child_at_index(i)
285410
if recursive:
286-
dump_node(child, indent_level + 1, path + [i])
411+
dump_node(child, indent_level + 1, path + [i], recursive=recursive)
287412
ui_xml_strings.append(f'{indent}</{role}>')
288413
else:
289414
ui_xml_strings.append(f'{indent}<{role} name="{safe_name}"{attributes}{path_attr}{position_info}{iface_attrs}{text_content_attr}/>')
@@ -446,6 +571,11 @@ def click_element_by_node(node: Accessible | None) -> Literal["passed", "zeuz_fa
446571
CommonUtil.ExecLog(sModuleInfo, "Element not found", 3)
447572
return "zeuz_failed"
448573

574+
original_node = node
575+
# Use module-level helper get_node_center_coords
576+
577+
# Use module-level helper click_coords_with_xdotool
578+
449579
while node:
450580
try:
451581
action_iface = node.queryAction()
@@ -463,8 +593,24 @@ def click_element_by_node(node: Accessible | None) -> Literal["passed", "zeuz_fa
463593
CommonUtil.ExecLog(sModuleInfo, f"Clicked element using action: {action_name}", 1)
464594
return "passed"
465595
else:
466-
node = node.parent
467-
continue
596+
# No action found on this node: consider clicking via xdotool using node coordinates
597+
# Attempt to compute center coords for the current node
598+
coords = get_node_center_coords(node)
599+
if coords:
600+
# Attempt to get the application name if available
601+
app_name = None
602+
try:
603+
app_acc = node.get_application()
604+
if app_acc and getattr(app_acc, 'name', None):
605+
app_name = app_acc.name
606+
except Exception:
607+
app_name = None
608+
if click_coords_with_xdotool(coords, app_name=app_name):
609+
CommonUtil.ExecLog(sModuleInfo, f"Clicked element using xdotool at: {coords}", 1)
610+
return "passed"
611+
else:
612+
CommonUtil.ExecLog(sModuleInfo, f"xdotool could not activate the application '{app_name}', aborting click", 3)
613+
return "zeuz_failed"
468614
else:
469615
node = node.parent
470616
continue
@@ -473,22 +619,160 @@ def click_element_by_node(node: Accessible | None) -> Literal["passed", "zeuz_fa
473619
continue
474620
except Exception as e:
475621
CommonUtil.ExecLog(sModuleInfo, f"Failed to click element: {e}", 3)
622+
# try a final attempt using xdotool on the original node
623+
coords = get_node_center_coords(original_node)
624+
if coords:
625+
app_name = None
626+
try:
627+
app_acc = original_node.get_application()
628+
if app_acc and getattr(app_acc, 'name', None):
629+
app_name = app_acc.name
630+
except Exception:
631+
app_name = None
632+
if click_coords_with_xdotool(coords, app_name=app_name):
633+
CommonUtil.ExecLog(sModuleInfo, f"Clicked element using xdotool at: {coords}", 1)
634+
return "passed"
476635
return "zeuz_failed"
477636

478637

638+
def get_node_center_coords(node: Accessible) -> tuple[int, int] | None:
639+
"""Module-level helper to compute center coordinates of a node in desktop coords."""
640+
try:
641+
comp = node.queryComponent()
642+
if comp:
643+
pos_func = getattr(comp, 'getPosition', None)
644+
size_func = getattr(comp, 'getSize', None)
645+
if pos_func and size_func:
646+
x, y = pos_func(pyatspi.DESKTOP_COORDS)
647+
w, h = size_func()
648+
cx = int(x + (w / 2))
649+
cy = int(y + (h / 2))
650+
return cx, cy
651+
except Exception:
652+
return None
653+
return None
654+
655+
656+
def click_coords_with_xdotool(coords: tuple[int, int], app_name: str | None = None) -> bool:
657+
"""Module-level helper to click coordinates via xdotool and optionally activate the app window."""
658+
try:
659+
x, y = coords
660+
if app_name:
661+
try:
662+
winid = _get_window_id_for_app(app_name)
663+
# If we couldn't find the desired window id for the app, do not click
664+
if not winid:
665+
CommonUtil.ExecLog(MODULE_NAME, f"Could not find a window for app '{app_name}'", 3)
666+
return False
667+
# Try a few methods to activate/raise the window so it's on top
668+
activated = False
669+
try:
670+
# Prefer --sync if available
671+
subprocess.run(['xdotool', 'windowactivate', '--sync', winid], capture_output=True)
672+
activated = True
673+
except Exception:
674+
try:
675+
subprocess.run(['xdotool', 'windowactivate', winid], capture_output=True)
676+
activated = True
677+
except Exception:
678+
activated = False
679+
680+
try:
681+
subprocess.run(['xdotool', 'windowraise', winid], capture_output=True)
682+
except Exception:
683+
# Not critical
684+
pass
685+
686+
# If wmctrl is available, try using it to activate the window (more reliable on some WMs)
687+
try:
688+
if subprocess.run(['which', 'wmctrl'], capture_output=True, text=True).returncode == 0:
689+
subprocess.run(['wmctrl', '-i', '-a', winid], capture_output=True)
690+
activated = True
691+
except Exception:
692+
pass
693+
694+
# Verify that the requested window is now active; retry a few times
695+
for _ in range(5):
696+
try:
697+
active = subprocess.run(['xdotool', 'getactivewindow'], capture_output=True, text=True).stdout.strip()
698+
if active and active == winid:
699+
activated = True
700+
break
701+
except Exception:
702+
pass
703+
time.sleep(0.1)
704+
if not activated:
705+
CommonUtil.ExecLog(MODULE_NAME, f"Failed to activate/raise window {winid} for app '{app_name}'", 3)
706+
return False
707+
except Exception:
708+
pass
709+
subprocess.run(['xdotool', 'mousemove', '--sync', str(x), str(y)], check=True, capture_output=True)
710+
time.sleep(0.05)
711+
subprocess.run(['xdotool', 'click', '1'], check=True, capture_output=True)
712+
return True
713+
except Exception as e:
714+
CommonUtil.ExecLog(MODULE_NAME, f"xdotool click failed: {e}", 3)
715+
return False
716+
717+
718+
@logger
719+
def click_element_xdotool(data_set: DataSet) -> Literal["passed", "zeuz_failed"]:
720+
"""Click an element using xdotool by computing coordinates from the node in the dataset."""
721+
frame = inspect.currentframe()
722+
sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME
723+
data_dict = convert_data_set_to_dict(data_set)
724+
node = get_node(data_dict)
725+
if node is None:
726+
CommonUtil.ExecLog(sModuleInfo, "Element not found", 3)
727+
return "zeuz_failed"
728+
729+
coords = get_node_center_coords(node)
730+
if not coords:
731+
CommonUtil.ExecLog(sModuleInfo, "Could not determine coordinates for node", 3)
732+
return "zeuz_failed"
733+
734+
app_name = None
735+
try:
736+
app_acc = node.get_application()
737+
if app_acc and getattr(app_acc, 'name', None):
738+
app_name = app_acc.name
739+
except Exception:
740+
app_name = None
741+
742+
# Require app_name to bring it to front before clicking; if we can't determine it, fail
743+
if not app_name:
744+
CommonUtil.ExecLog(sModuleInfo, "No application context found for xdotool click; aborting", 3)
745+
return "zeuz_failed"
746+
if click_coords_with_xdotool(coords, app_name=app_name):
747+
CommonUtil.ExecLog(sModuleInfo, f"Clicked element using xdotool at: {coords}", 1)
748+
return "passed"
749+
else:
750+
return "zeuz_failed"
751+
752+
479753
@logger
480754
def click_element(data_set: DataSet) -> Literal["passed", "zeuz_failed"]:
481755
""" Click using element, first get the element then click"""
482756
frame = inspect.currentframe()
483757
sModuleInfo = (frame.f_code.co_name if frame else "unknown") + " : " + MODULE_NAME
484758

485759
data_dict = convert_data_set_to_dict(data_set)
760+
# Check for explicit xdotool method in the dataset
761+
use_xdotool = False
762+
for left, mid, right in data_set:
763+
try:
764+
if mid.strip().lower() == "action" and left.strip().lower() in ("click method", "method", "click using") and right.strip().lower() == "xdotool":
765+
use_xdotool = True
766+
except Exception:
767+
continue
486768
node = get_node(data_dict)
487769
if node is None:
488770
CommonUtil.ExecLog(sModuleInfo, "Element not found", 3)
489771
return "zeuz_failed"
490772

491773
try:
774+
if use_xdotool:
775+
return click_element_xdotool(data_set)
492776
return click_element_by_node(node)
493777
except NotImplementedError:
494778
CommonUtil.ExecLog(sModuleInfo, "This node does not support the Action interface.", 3)

server/linux.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def inspect(app_name: str | None = None):
5050
full_screenshot_path = os.path.abspath(SCREENSHOT_PATH)
5151

5252
screenshot_base64 = None
53-
if BuiltInFunctions.capture_screenshot(full_screenshot_path):
53+
if BuiltInFunctions.capture_screenshot(full_screenshot_path, target_app):
5454
try:
5555
with open(full_screenshot_path, 'rb') as img_file:
5656
screenshot_bytes = img_file.read()

0 commit comments

Comments
 (0)