-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathprocess_utils.py
More file actions
178 lines (143 loc) · 5.8 KB
/
process_utils.py
File metadata and controls
178 lines (143 loc) · 5.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# Thanks to https://stackoverflow.com/questions/452969/does-python-have-an-equivalent-to-java-class-forname
# This should be moved to a utils library
import importlib.util
import os.path
import time
import sys
from subprocess import Popen
import subprocess
import zipfile
import base64
import psutil
from robotics_application_manager import LogManager
def get_class(kls):
parts = kls.split(".")
module = ".".join(parts[:-1])
m = __import__(module)
for comp in parts[1:]:
m = getattr(m, comp)
return m
def get_class_from_file(file_path: str, class_name: str):
spec = importlib.util.spec_from_file_location("application", file_path)
module = importlib.util.module_from_spec(spec)
sys.modules[f"{class_name}"] = module
spec.loader.exec_module(module)
return getattr(module, class_name)
def class_from_module(module: str):
"""
Capitalizes a module name to create class name
"""
return "".join([s.capitalize() for s in module.split("_")])
def stop_process_and_children(process: Popen, signal: int = 9, timeout: int = None):
"""
Stops a list of processes waiting for them to stop
"""
# collect processes to stop
proc = psutil.Process(process.pid)
children = proc.children(recursive=True)
children.append(proc)
# send signal to processes
for p in children:
try:
p.send_signal(signal)
except psutil.NoSuchProcess:
pass
gone, alive = psutil.wait_procs(children, timeout=timeout)
return gone, alive
class classproperty(property):
def __get__(self, cls, owner):
return classmethod(self.fget).__get__(None, owner)()
def is_xserver_running(display):
"""
Check whether the X server is running on a given display.
This function checks the existence of a Unix domain socket which X server
typically creates to communicate with clients. If the socket exists,
it is assumed that X server is running.s"""
display_number = display[1:]
x_socket_path = os.path.join("/tmp/.X11-unix/", f"X{display_number}")
return os.path.exists(x_socket_path)
def wait_for_xserver(display, timeout=120):
"""
Wait for the X server to start within a specified timeout period.
This function continuously checks if the X server is running on the specified
display by checking the existence of the Unix domain socket associated with the X server.
It waits until the X server is available or until the timeout is reached,
whichever comes first."""
start_time = time.time()
while time.time() - start_time < timeout:
if is_xserver_running(display):
print(f"Xserver on {display} is running!")
return
time.sleep(0.1)
print(f"Timeout: Xserver on {display} is not available after {timeout} seconds.")
def is_process_running(process_name):
"""
Check if a process with the specified name is currently running.
Parameters:
- process_name (str): The name of the process to check for.
"""
try:
process = subprocess.Popen(
["pgrep", "-f", process_name], stdout=subprocess.PIPE
)
# Este comando devuelve el PID si existe, o nada si no existe
process_return = process.communicate()[0]
return process_return != b""
except subprocess.CalledProcessError:
# El proceso no está corriendo
return False
def wait_for_process_to_start(process_name, timeout=60):
"""
Wait for a specified process to start for up to a defined timeout.
Parameters:
- process_name (str): The name of the process to wait for.
- timeout (int, optional): The number of seconds to wait before giving up. Default is 60.
"""
start_time = time.time()
while time.time() - start_time < timeout:
if is_process_running(process_name):
print(f"{process_name} is running!")
return True
time.sleep(1)
print(f"Timeout: {process_name} did not start within {timeout} seconds.")
return False
def check_gpu_acceleration():
try:
# Verifica si /dev/dri existe
if not os.path.exists("/dev/dri"):
LogManager.logger.error("/dev/dri does not exist. No direct GPU access.")
return "OFF"
# # Obtiene la salida de glxinfo
# result = subprocess.check_output(
# "glxinfo | grep direct", shell=True).decode('utf-8')
# print(result)
# # Verifica si la aceleración directa está habilitada
# return "direct rendering: Yes" in result
vendor_name = os.environ["DRI_VENDOR"]
return vendor_name.upper()
except Exception as e:
print(f"Error: {e}")
return "OFF"
def get_ros_version():
output = subprocess.check_output(["bash", "-c", "echo $ROS_VERSION"])
return output.decode("utf-8")[0]
def get_user_world(launch_file):
""" "
Processes a provided base64 encoded string representing a zip file, decodes it,
saves it as a zip file, and then extracts its contents.
The function takes a base64 encoded string (`launch_file`) as input. It first decodes this
string into binary format, then saves this binary content as a zip file in a predetermined
directory ('workspace/binaries/'). After saving, it extracts the contents of the zip file into
another specified directory ('workspace/worlds/').
"""
try:
# Convert base64 to binary
binary_content = base64.b64decode(launch_file)
# Save the binary content as a file
with open("workspace/binaries/user_worlds.zip", "wb") as file:
file.write(binary_content)
# Unzip the file
with zipfile.ZipFile("workspace/binaries/user_worlds.zip", "r") as zip_ref:
zip_ref.extractall("workspace/worlds/")
except Exception as e:
LogManager.logging.error(f"An error occurred getting user world: {e}")