Skip to content

Commit 29e6ac0

Browse files
authored
Merge branch 'ErikOsinga:main' into main
2 parents a36fe65 + 414e127 commit 29e6ac0

9 files changed

Lines changed: 132 additions & 25 deletions

automation/canfar_polling.py

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,24 @@ async def submit_canfar_job():
3333
this will depend on the type of CANFAR job that we were running though.
3434
3535
"""
36-
37-
3836
from __future__ import annotations
37+
import os
38+
import pprint
3939

40+
from canfar.sessions import Session
4041
from dataclasses import dataclass
4142

42-
from prefect import task
43+
from prefect import task, flow
4344
from prefect.client.orchestration import get_client
4445
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState
4546
from prefect.server.schemas.states import StateType
4647
from prefect.states import Failed
4748

4849
from print_all_open_sessions import get_open_sessions
50+
from vos import Client
4951

5052
TAG_PREFIX = "canfar_session:"
53+
VOS_LOG_FOLDER = "arc:projects/CIRADA/polarimetry/ASKAP/Pipeline_logs/canfar_logs"
5154

5255

5356
@dataclass(frozen=True)
@@ -206,3 +209,85 @@ async def reconcile_running_prefect_with_canfar_task(limit: int = 200) -> dict:
206209
"skipped_untagged": result.skipped_untagged,
207210
}
208211

212+
@flow(name="Get completed sessions logs and push to Canfar directory")
213+
def get_completed_session_logs():
214+
"""
215+
Get recently completed CANFAR sessions and push their logs to Canfar directory.
216+
This includes Succeeded, Completed, Failed or Error sessions.
217+
The directory is /arc/projects/CIRADA/polarimetry/ASKAP/Pipeline_logs/canfar_logs.
218+
"""
219+
session = Session()
220+
statuses = ["Succeeded", "Completed", "Failed", "Error"]
221+
# Get all session ids of completed sessions
222+
all_session_ids = [
223+
s["id"]
224+
for status in statuses
225+
for s in (session.fetch(kind="headless", status=status) or [])
226+
]
227+
for session_id in all_session_ids:
228+
# Get logs for each session
229+
get_logs(session, session_id)
230+
231+
def push_logs_to_canfar(tmp_file):
232+
"""
233+
Copy Canfar session logs to Canfar directory and remove local file
234+
Args:
235+
tmp_file: Local log file to copy and delete
236+
"""
237+
client = Client()
238+
239+
# Ensure remote directory exists
240+
try:
241+
client.mkdir(VOS_LOG_FOLDER)
242+
except:
243+
pass # Directory might already exist
244+
245+
try:
246+
remote_file = f"{VOS_LOG_FOLDER}/{tmp_file}"
247+
client.copy(tmp_file, remote_file)
248+
print(f"Upload Canfar log to {remote_file} completed successfully!\n")
249+
except Exception as e:
250+
print(f"Failed to upload {tmp_file} to Canfar: {e}\n")
251+
return
252+
253+
# Clean up local file
254+
os.remove(tmp_file)
255+
256+
def log_exists_in_canfar_dir(session_id):
257+
"""
258+
Check if log already exists in Canfar directory.
259+
Args:
260+
session_id: ID of the session
261+
Returns:
262+
True if log exists, False otherwise
263+
"""
264+
client = Client()
265+
file_name = f"{VOS_LOG_FOLDER}/{session_id}.log"
266+
try:
267+
return client.isfile(file_name)
268+
except:
269+
# Folder doesn't exist yet
270+
return False
271+
272+
273+
def get_logs(session, session_id):
274+
"""
275+
Get logs for a Canfar session.
276+
277+
Args:
278+
session: Canfar session object
279+
session_id: ID of the session to monitor
280+
output_file: File to write logs to
281+
"""
282+
if log_exists_in_canfar_dir(session_id):
283+
return
284+
285+
tmp_file = f"{session_id}.log"
286+
287+
with open(tmp_file, "w") as f:
288+
# Fetch logs before exiting
289+
logs_dict = session.logs(session_id).get(session_id)
290+
f.write(pprint.pformat(logs_dict, depth=4))
291+
f.flush
292+
push_logs_to_canfar(tmp_file)
293+

automation/prefect/prefect-git-clone.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,10 @@ deployments:
6262
work_pool:
6363
name: local-pool
6464
work_queue_name: default
65+
66+
- name: push-session-logs-to-canfar
67+
entrypoint: automation/canfar_polling.py:get_completed_session_logs
68+
description: 'Schedule this to run after each parent flow to push Canfar session logs to Canfar directory.'
69+
work_pool:
70+
name: local-pool
71+
work_queue_name: default

possum_pipeline_control/check_ingest_3Dpipeline.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,14 @@ def launch_ingest(tilenumber, band):
9191
replicas=1,
9292
)
9393

94+
session_id_str = session_id_str = session_id[0] if len(session_id) > 0 else None
95+
9496
print("Check sessions at https://ws-uv.canfar.net/skaha/v1/session")
9597
print(
96-
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id[0]}?view=logs"
98+
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id_str}?view=logs"
9799
)
98100

99-
return session_id[0]
101+
return session_id_str
100102

101103
def update_status(tile_number, band, status, conn):
102104
"""

possum_pipeline_control/check_status_and_launch_3Dpipeline_v2.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,11 +280,14 @@ def launch_download_session(jobname="3dtile-dl"):
280280
replicas=1,
281281
)
282282

283+
session_id_str = session_id_str = session_id[0] if len(session_id) > 0 else None
284+
283285
print("Check sessions at https://ws-uv.canfar.net/skaha/v1/session")
284286
print(
285-
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id[0]}?view=logs"
287+
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id_str}?view=logs"
286288
)
287-
return session_id[0]
289+
290+
return session_id_str
288291

289292
def launch_create_symlinks(jobname="3dsymlinks"):
290293
"""
@@ -317,11 +320,14 @@ def launch_create_symlinks(jobname="3dsymlinks"):
317320
replicas=1,
318321
)
319322

323+
session_id_str = session_id_str = session_id[0] if len(session_id) > 0 else None
324+
320325
print("Check sessions at https://ws-uv.canfar.net/skaha/v1/session")
321326
print(
322-
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id[0]}?view=logs"
327+
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id_str}?view=logs"
323328
)
324-
return session_id[0]
329+
330+
return session_id_str
325331

326332
def needs_prefect_db_backup(
327333
home_dir: str | Path,

possum_pipeline_control/launch_1Dpipeline_PartialTiles_band1.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@ def launch_session(run_name, field_ID, tilenumbers, SBnumber, image, cores, ram)
4242
replicas=1,
4343
)
4444

45+
session_id_str = session_id_str = session_id[0] if len(session_id) > 0 else None
46+
4547
print("Check sessions at https://ws-uv.canfar.net/skaha/v1/session")
4648
print(
47-
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id[0]}?view=logs"
49+
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id_str}?view=logs"
4850
)
4951

50-
return session_id[0]
52+
return session_id_str
5153

5254

5355
@flow(name="launch_1d_partialtiles", log_prints=True)

possum_pipeline_control/launch_1Dpipeline_PartialTiles_band1_pre_or_post.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
import os
44
from datetime import datetime
55

6-
# from skaha.session import Session
76
from canfar.sessions import Session
8-
97
from possum_pipeline_control.control_1D_pipeline_PartialTiles import get_open_sessions
108

119
"""
@@ -81,12 +79,14 @@ def launch_session(
8179
replicas=1,
8280
)
8381

82+
session_id_str = session_id_str = session_id[0] if len(session_id) > 0 else None
83+
8484
print("Check sessions at https://ws-uv.canfar.net/skaha/v1/session")
8585
print(
86-
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id[0]}?view=logs"
86+
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id_str}?view=logs"
8787
)
8888

89-
return session_id[0]
89+
return session_id_str
9090

9191
if __name__ == "__main__":
9292
parser = argparse.ArgumentParser(

possum_pipeline_control/launch_3Dpipeline_band1.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@ def launch_session(run_name, tilenumber, image, cores, ram):
2828
replicas=1,
2929
)
3030

31+
session_id_str = session_id_str = session_id[0] if len(session_id) > 0 else None
32+
3133
print("Check sessions at https://ws-uv.canfar.net/skaha/v1/session")
3234
print(
33-
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id[0]}?view=logs"
35+
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id_str}?view=logs"
3436
)
3537

36-
return session_id[0]
38+
return session_id_str
3739

3840

3941
def main_launch3D(tilenumber: str):

possum_pipeline_control/submit_download.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,13 @@ def launch_download():
5454
replicas=1,
5555
)
5656

57+
session_id_str = session_id_str = session_id[0] if len(session_id) > 0 else None
58+
5759
print("Check sessions at https://ws-uv.canfar.net/skaha/v1/session")
5860
print(
59-
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id[0]}?view=logs"
61+
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id_str}?view=logs"
6062
)
61-
62-
return session_id[0]
63+
return session_id_str
6364

6465

6566
if __name__ == "__main__":

possum_pipeline_control/update_partialtile_google_sheet.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from pathlib import Path
2626

2727
import astropy.table as at
28+
from canfar.sessions import Session
2829
import gspread
2930
import matplotlib.dates as mdates
3031
import matplotlib.pyplot as plt
@@ -37,6 +38,7 @@
3738
from possum_pipeline_control import util
3839

3940

41+
4042
def get_sheet_table(band):
4143
"""
4244
Connects to the POSSUM Status Monitor Google Sheet and returns a sub-table
@@ -461,9 +463,6 @@ def launch_collate_job():
461463
"""
462464
Launches the collate job for the 1D pipeline. Once per day
463465
"""
464-
# from skaha.session import Session
465-
from canfar.sessions import Session
466-
467466
session = Session()
468467

469468

@@ -495,11 +494,14 @@ def launch_collate_job():
495494
replicas=1,
496495
)
497496

497+
session_id_str = session_id_str = session_id[0] if len(session_id) > 0 else None
498+
498499
print("Check sessions at https://ws-uv.canfar.net/skaha/v1/session")
499500
print(
500-
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id[0]}?view=logs"
501+
f"Check logs at https://ws-uv.canfar.net/skaha/v1/session/{session_id_str}?view=logs"
501502
)
502-
return session_id[0]
503+
504+
return session_id_str
503505

504506
if __name__ == "__main__":
505507
parser = argparse.ArgumentParser(description="Update Partial Tile Google Sheet")

0 commit comments

Comments
 (0)