3232from utils .data import make_url_sassy
3333from utils .email import codalab_send_markdown_email
3434
35+ from channels .layers import get_channel_layer
36+ from asgiref .sync import async_to_sync
37+
3538import logging
3639logger = logging .getLogger (__name__ )
3740
@@ -771,9 +774,66 @@ def submission_status_cleanup():
771774 submissions = Submission .objects .filter (status = Submission .RUNNING , has_children = False ).select_related ('phase' , 'parent' )
772775
773776 for sub in submissions :
774- # Check if the submission has been running for 24 hours longer than execution_time_limit
775777 if sub .started_when < now () - timedelta (milliseconds = (3600000 * 24 ) + sub .phase .execution_time_limit ):
776778 if sub .parent is not None :
777779 sub .parent .cancel (status = Submission .FAILED )
778780 else :
779781 sub .cancel (status = Submission .FAILED )
782+
783+
784+ def _broadcast_worker_state (payload ):
785+ channel_layer = get_channel_layer ()
786+ if not channel_layer :
787+ return
788+
789+ async_to_sync (channel_layer .group_send )(
790+ "compute_workers" ,
791+ {
792+ "type" : "worker.health" ,
793+ "worker" : payload ,
794+ },
795+ )
796+
797+
798+ @app .task (queue = "site-worker" , soft_time_limit = 60 )
799+ def refresh_compute_worker_health ():
800+ celery_app = app_or_default ()
801+ inspector = celery_app .control .inspect (timeout = 1 )
802+
803+ if inspector is None :
804+ logger .warning ("Celery inspect returned None" )
805+ return
806+
807+ try :
808+ stats = inspector .stats () or {}
809+ active = inspector .active () or {}
810+ reserved = inspector .reserved () or {}
811+ except Exception :
812+ logger .exception ("Unable to inspect Celery workers" )
813+ return
814+
815+ for worker_name in stats .keys ():
816+ if not worker_name .startswith ("compute-worker" ):
817+ continue
818+
819+ raw_running_jobs = len (active .get (worker_name , [])) + len (reserved .get (worker_name , []))
820+ status = "busy" if raw_running_jobs > 0 else "available"
821+
822+ payload = {
823+ "hostname" : worker_name ,
824+ "status" : status ,
825+ "running_jobs" : raw_running_jobs ,
826+ "timestamp" : now ().timestamp (),
827+ }
828+
829+ r .set (f"worker:{ worker_name } :heartbeat" , json .dumps (payload ), ex = 35 )
830+ r .hset (
831+ WORKERS_REGISTRY_KEY ,
832+ worker_name ,
833+ json .dumps ({
834+ "hostname" : worker_name ,
835+ "last_seen" : payload ["timestamp" ],
836+ }),
837+ )
838+
839+ _broadcast_worker_state (payload )
0 commit comments