Skip to content

Commit 299c8a5

Browse files
Merge pull request #29 from CAAI/dev/cleanup_rhprocess
small cleanup. Manager Load estimation. The manager will check which server has the fewest queued jobs and assign the job to that server
2 parents 9c5f474 + 6fcdaee commit 299c8a5

6 files changed

Lines changed: 260 additions & 190 deletions

File tree

nodes/manager/manager.py

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,46 @@ def get_active_jobs_info(self):
154154
)
155155
return active_jobs_info
156156

157+
def get_queued_jobs_info(self):
158+
return {
159+
"queued_jobs": [
160+
{
161+
"priority": job[0] * -1,
162+
"job_id": job[1],
163+
"required_gpu_mem": job[2],
164+
"required_threads": job[3],
165+
"required_memory": job[4],
166+
}
167+
for job in self.job_queue
168+
]
169+
}
170+
171+
def get_load(self):
172+
active_jobs_info = self.get_active_jobs_info()
173+
queued_jobs_info = self.get_queued_jobs_info()["queued_jobs"]
174+
sum_gpu_mem = (
175+
sum(self.gpu_devices_mem_available)
176+
if isinstance(self.gpu_devices_mem_available, list)
177+
else self.gpu_devices_mem_available
178+
)
179+
tot_required_gpu_mem = sum(
180+
x["required_gpu_mem"] for x in active_jobs_info + queued_jobs_info
181+
)
182+
tot_required_mem = sum(
183+
x["required_memory"] for x in active_jobs_info + queued_jobs_info
184+
)
185+
tot_required_threads = sum(
186+
x["required_threads"] for x in active_jobs_info + queued_jobs_info
187+
)
188+
189+
load = max(
190+
tot_required_gpu_mem / sum_gpu_mem,
191+
tot_required_mem / self.memory_max,
192+
tot_required_threads / self.threads_max,
193+
)
194+
195+
return load
196+
157197

158198
templates = Jinja2Templates(
159199
directory=os.path.dirname(__file__) + "/resources/templates"
@@ -194,15 +234,28 @@ def get_addr_to_run_node(self, node_name):
194234
if node_name in self.nodes.keys():
195235
return "localhost:8000"
196236

237+
lowest_load = None
238+
addr_with_lowest_load = None
197239
# query other hosts for available gpu
198240
for addr in self.other_addrs:
199241
url = f"http://{addr}/manager/dispatcher/has_node/{node_name}"
200242
response = requests.get(url)
201243
response.raise_for_status()
202244
has_node = response.json()
245+
assert isinstance(has_node, bool)
203246
if has_node:
204-
return addr
205-
raise Exception("No servers were found with the node")
247+
url = f"http://{addr}/manager/get_load"
248+
response = requests.get(url)
249+
response.raise_for_status()
250+
node_load = response.json()
251+
if lowest_load == None or node_load < lowest_load:
252+
lowest_load = node_load
253+
addr_with_lowest_load = addr
254+
255+
if addr_with_lowest_load is not None:
256+
return addr_with_lowest_load
257+
else:
258+
raise Exception("No servers were found with the node")
206259

207260
def setup_routes(self):
208261
@self.post("/manager/register_node")
@@ -251,18 +304,11 @@ async def get_active_jobs():
251304

252305
@self.get("/manager/get_queued_jobs")
253306
async def get_queued_jobs():
254-
return {
255-
"queued_jobs": [
256-
{
257-
"priority": job[0] * -1,
258-
"job_id": job[1],
259-
"required_gpu_mem": job[2],
260-
"required_threads": job[3],
261-
"required_memory": job[4],
262-
}
263-
for job in self.queue.job_queue
264-
]
265-
}
307+
self.queue.get_queued_jobs_info()
308+
309+
@self.get("/manager/get_load")
310+
async def get_load():
311+
return self.queue.get_load()
266312

267313
@self.get("/manager/get_resource_info")
268314
async def get_resource_info():

rhnode/common.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,22 @@ def validate_input_output_spec(input_spec, output_spec):
134134
raise ValueError(
135135
"Input spec and output spec must not have overlapping key names"
136136
)
137+
138+
139+
def convert_string_to_type(text, type_):
140+
"""Convert a string to a type. This is used to parse the inputs to the process function."""
141+
if type_ == str or type_ == FilePath:
142+
return text
143+
elif type_ == bool:
144+
return text_to_bool(text)
145+
else:
146+
return type_(text)
147+
148+
149+
def text_to_bool(value):
150+
if value.lower() in ("yes", "true", "t", "y", "1"):
151+
return True
152+
elif value.lower() in ("no", "false", "f", "n", "0"):
153+
return False
154+
else:
155+
raise ValueError(f"Could not convert {value} to bool")

rhnode/rhjob.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from requests.exceptions import HTTPError
99

1010

11-
1211
class RHJob:
1312
def __init__(
1413
self,
@@ -269,7 +268,9 @@ def wait_for_finish(self):
269268
if self.strict_output_dir:
270269
output_path = self.output_directory
271270
else:
272-
output_path = _create_output_directory_name(self.output_directory, self.node_identifier)
271+
output_path = _create_output_directory_name(
272+
self.output_directory, self.node_identifier
273+
)
273274

274275
output = None
275276
while output is None:

rhnode/rhnode.py

Lines changed: 6 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
from fastapi.responses import FileResponse, JSONResponse
1111
from fastapi import FastAPI, File, Form, UploadFile, BackgroundTasks
1212
from .rhprocess import RHProcess
13-
from .frontend import setup_frontend_routes
13+
from .routing.frontend import setup_frontend_routes
14+
from .routing.backend import setup_api_routes
1415
import traceback
1516
from fastapi import Response
1617
from fastapi import HTTPException
@@ -70,11 +71,10 @@ def __init__(self):
7071
else:
7172
self.email_sender = None
7273

73-
self.setup_api_routes()
74+
setup_api_routes(self)
7475
setup_frontend_routes(self)
7576

7677
def _delete_job(self, job_id):
77-
pass
7878
job = self.jobs[job_id]
7979
job.delete_files()
8080
del self.jobs[job_id]
@@ -207,7 +207,9 @@ def _get_output_with_download_links(self, job_id):
207207
else val
208208
for key, val in job.output.dict(exclude_unset=True).items()
209209
# Only return links for non FilePath and FilePath with values that are not None unless required
210-
if not self.output_spec.__fields__[key].type_ == FilePath or val is not None or (val is None and self.output_spec.__fields__[key].required)
210+
if not self.output_spec.__fields__[key].type_ == FilePath
211+
or val is not None
212+
or (val is None and self.output_spec.__fields__[key].required)
211213
}
212214
)
213215

@@ -225,153 +227,6 @@ def _ensure_job_status(self, status, valid_statuses):
225227
),
226228
)
227229

228-
def setup_api_routes(self):
229-
"""Setup the API routes for the node. See frontend.py for the frontend routes."""
230-
231-
@self.post(self._create_url("/jobs"))
232-
async def _post_new_job(inputs: self.input_spec_no_file) -> str:
233-
job_id = self.CREATE_JOB(inputs)
234-
return job_id
235-
236-
@self.post(self._create_url("/jobs/{job_id}/start"))
237-
async def START_JOB(
238-
job_id: str, job_meta_data: JobMetaData, background_tasks: BackgroundTasks
239-
) -> Response:
240-
job_obj = self.get_job_by_id(job_id)
241-
self._ensure_job_status(job_obj.status, JobStatus.Preparing)
242-
if not job_obj.is_ready_to_run():
243-
raise HTTPException(
244-
status_code=400,
245-
detail="Job is not ready to run. Some files are likely missing.",
246-
)
247-
background_tasks.add_task(job_obj.run, job_meta_data)
248-
return Response(status_code=204)
249-
250-
@self.get(self._create_url("/jobs/{job_id}/status"))
251-
async def _get_job_status(job_id: str) -> JobStatus:
252-
return self.get_job_by_id(job_id).status
253-
254-
@self.get(self._create_url("/jobs/{job_id}/data"))
255-
async def _get_job_data_download_urls(job_id: str) -> self.output_spec_url:
256-
return self._get_output_with_download_links(job_id)
257-
258-
@self.get(self._create_url("/jobs/{job_id}/error"))
259-
def _get_job_error(job_id: str):
260-
job = self.get_job_by_id(job_id)
261-
self._ensure_job_status(job.status, [JobStatus.Cancelled, JobStatus.Error])
262-
return job.error
263-
264-
@self.post(self._create_url("/jobs/{job_id}/stop"))
265-
def _stop_task(job_id: str):
266-
job = self.get_job_by_id(job_id)
267-
if job.status not in [
268-
JobStatus.Finished,
269-
JobStatus.Cancelled,
270-
JobStatus.Cancelling,
271-
JobStatus.Error,
272-
]:
273-
job.stop()
274-
return Response(status_code=204)
275-
276-
return Response(status_code=200, content="Job is already in terminal phase")
277-
278-
@self.post(self._create_url("/jobs/{job_id}/delete"))
279-
async def _delete_job(job_id: str):
280-
"""Delete a job from the node."""
281-
job = self.get_job_by_id(job_id)
282-
self._delete_job(job_id)
283-
284-
@self.get(self._create_url("/jobs/{job_id}/download/{filename}"))
285-
def _get_file(job_id, filename):
286-
job = self.get_job_by_id(job_id)
287-
self._ensure_job_status(job.status, JobStatus.Finished)
288-
if not filename in self.output_spec.__fields__:
289-
raise HTTPException(
290-
status_code=404,
291-
detail="The requested file key {} is invalid.".format(filename),
292-
)
293-
try:
294-
fname = job.output.dict()[filename]
295-
except KeyError:
296-
raise HTTPException(
297-
status_code=404,
298-
detail="The requested file corresponding to key {} could not be found.".format(
299-
filename
300-
),
301-
)
302-
return FileResponse(
303-
fname, filename=create_file_name_from_key(filename, fname)
304-
)
305-
306-
@self.get(self._create_url("/filename_keys"))
307-
async def _get_file_keys():
308-
return self.input_file_keys
309-
310-
@self.get(self._create_url("/keys"))
311-
async def _get_keys():
312-
return {
313-
"output_keys": list(self.output_spec.__fields__.keys()),
314-
"input_keys": list(self.input_spec.__fields__.keys()),
315-
}
316-
317-
@self.post(self._create_url("/cli/parse"))
318-
async def _parse_cli_args(cli: list):
319-
try:
320-
return self.parse_cli_args(cli)
321-
except Exception as e:
322-
raise HTTPException(status_code=400, detail=str(e))
323-
324-
@self.get(self._create_url("/cli/help"))
325-
async def _get_job_input():
326-
return self.help_cli_args()
327-
328-
@self.post(self._create_url("/jobs/{job_id}/upload"))
329-
async def _upload(
330-
job_id: str,
331-
file: UploadFile = File(...),
332-
key: str = Form(...),
333-
):
334-
job = self.get_job_by_id(job_id)
335-
self._ensure_job_status(job.status, JobStatus.Preparing)
336-
337-
"""Upload an input file to a job. The key must be one of the file keys."""
338-
339-
if not key in self.input_file_keys:
340-
raise HTTPException(
341-
status_code=404,
342-
detail="The requested file key {} is invalid.".format(key),
343-
)
344-
# The outer with statement is to ensure that the file is validated after upload
345-
with job.upload_file(key, file.filename) as fpath:
346-
with open(fpath, "wb") as f:
347-
f.write(await file.read())
348-
349-
return Response(status_code=204)
350-
351-
### OTHER
352-
@self.on_event("startup")
353-
async def register_on_startup():
354-
# print(multiprocessing.get_start_method())
355-
"""Looks for manager node at startup and initializes multiprocessing module"""
356-
# if not os.environ.get("PYTEST", False):
357-
multiprocessing.set_start_method("spawn")
358-
asyncio.create_task(self._register_with_manager())
359-
360-
@self.exception_handler(500)
361-
async def internal_exception_handler(request: Request, exc: Exception):
362-
if self.email_sender:
363-
self.email_sender.send_email_exception(
364-
self.name, self.host_name, datetime.datetime.now()
365-
)
366-
367-
return JSONResponse(
368-
status_code=500, content={"code": 500, "msg": "Internal Server Error"}
369-
)
370-
371-
@self.on_event("startup")
372-
async def start_cleaning_loop():
373-
asyncio.create_task(self._delete_expired_jobs_loop())
374-
375230
@classmethod
376231
def process_wrapper(cls, inputs, job, result_queue):
377232
"""Wrapper for the process function. It has two purposes: catching errors and packing the output of the process function into the "queue" object."""
@@ -423,22 +278,3 @@ def help_cli_args(self):
423278
for key, val in self.output_spec.__fields__.items():
424279
help_string += f"\t{key}: {val.type_.__name__}\n"
425280
return help_string
426-
427-
428-
def convert_string_to_type(text, type_):
429-
"""Convert a string to a type. This is used to parse the inputs to the process function."""
430-
if type_ == str or type_ == FilePath:
431-
return text
432-
elif type_ == bool:
433-
return text_to_bool(text)
434-
else:
435-
return type_(text)
436-
437-
438-
def text_to_bool(value):
439-
if value.lower() in ("yes", "true", "t", "y", "1"):
440-
return True
441-
elif value.lower() in ("no", "false", "f", "n", "0"):
442-
return False
443-
else:
444-
raise ValueError(f"Could not convert {value} to bool")

0 commit comments

Comments
 (0)