@@ -2434,26 +2434,40 @@ def _make_executor() -> ProcessPoolExecutor:
24342434 self ._iteration , len (profile_dirs ),
24352435 "y" if len (profile_dirs ) == 1 else "ies" ,
24362436 )
2437- for pdir in profile_dirs :
2438- self ._process_profile (pdir , run_dir )
2439-
2440- # Persist the exploration record only after confirming success
2441- # (profile processing complete). Placing record() before
2442- # _process_profile() — the previous order — would mark the task
2443- # as explored even when _process_profile raises (e.g. disk full
2444- # in _persist_node_xyz), making it non-retryable on resume.
2445- # Must mirror the parallel path in _process_single_result.
2446- self .explored_log .record (task .node_id , atom_i , atom_j , gamma_sign )
2447- # Call release() after record() so that is_submitted() returns
2448- # True throughout the entire [pop() → record() → release()] window
2449- self .queue .release ((task .node_id , tuple (task .afir_params )))
2450-
2437+ _process_status = "DONE"
2438+ try :
2439+ for pdir in profile_dirs :
2440+ self ._process_profile (pdir , run_dir )
2441+ # Persist the exploration record only after confirming success
2442+ # (profile processing complete). Placing record() before
2443+ # _process_profile() — the previous order — would mark the task
2444+ # as explored even when _process_profile raises (e.g. disk full
2445+ # in _persist_node_xyz), making it non-retryable on resume.
2446+ # Must mirror the parallel path in _process_single_result.
2447+ self .explored_log .record (task .node_id , atom_i , atom_j , gamma_sign )
2448+ except Exception as exc :
2449+ logger .error (
2450+ "_run_sequential: _process_profile failed for run %s: %s — "
2451+ "marking FAILED; task remains retryable on resume." ,
2452+ run_dir , exc ,
2453+ )
2454+ _process_status = "FAILED"
2455+ finally :
2456+ # Always release the in-flight lock regardless of _process_profile
2457+ # outcome. Without this guard a RuntimeError (e.g. disk full in
2458+ # _persist_node_xyz) leaves the key in _in_flight indefinitely,
2459+ # and _enqueue_perturbations sees is_submitted()==True forever —
2460+ # the pair can never be re-queued even on resume.
2461+ # On the success path, release() is still called after record() so
2462+ # that is_submitted() stays True throughout [pop()→record()→release()].
2463+ self .queue .release ((task .node_id , tuple (task .afir_params )))
2464+
24512465 # Notify queue of updated graph (required by RCMCQueue)
24522466 if hasattr (self .queue , "set_graph" ):
24532467 self .queue .set_graph (self .graph )
2454-
2455- self ._append_history (history_log , self ._iteration , task , "DONE" )
2456- self ._finalize_iteration (run_dir , task , "DONE" , profile_dirs , priority_log )
2468+
2469+ self ._append_history (history_log , self ._iteration , task , _process_status )
2470+ self ._finalize_iteration (run_dir , task , _process_status , profile_dirs , priority_log )
24572471
24582472 finally :
24592473 executor .shutdown (wait = True )
@@ -2647,7 +2661,8 @@ def _try_submit() -> bool:
26472661 return False
26482662
26492663 futures_map [future ] = (
2650- task , run_dir , self ._iteration , gamma_sign , atom_i , atom_j
2664+ task , run_dir , self ._iteration , gamma_sign , atom_i , atom_j ,
2665+ time .monotonic (), # submit_time — used for per-future timeout
26512666 )
26522667 return True
26532668
@@ -2667,7 +2682,7 @@ def _handle_done(future) -> None:
26672682 main loop can trigger a rebuild) from ordinary worker failures.
26682683 """
26692684 nonlocal pool_broken
2670- task , run_dir , iteration , gamma_sign , atom_i , atom_j = (
2685+ task , run_dir , iteration , gamma_sign , atom_i , atom_j , _ = (
26712686 futures_map .pop (future )
26722687 )
26732688 try :
@@ -2704,7 +2719,7 @@ def _drain_broken_futures() -> None:
27042719 immediately, freeing futures_map for the rebuilt pool.
27052720 """
27062721 for f in list (futures_map ):
2707- task , run_dir , iteration , gamma_sign , atom_i , atom_j = (
2722+ task , run_dir , iteration , gamma_sign , atom_i , atom_j , _ = (
27082723 futures_map .pop (f )
27092724 )
27102725 logger .error (
@@ -2782,18 +2797,32 @@ def _rebuild_pool() -> None:
27822797
27832798 # ── Wait for the next completed future ────────────────────
27842799 if self .worker_timeout_s is not None :
2800+ # Compute how long until the earliest-submitted worker hits
2801+ # its individual deadline, then wait at most that long.
2802+ now = time .monotonic ()
2803+ min_remaining = min (
2804+ self .worker_timeout_s - (now - futures_map [f ][6 ])
2805+ for f in futures_map
2806+ )
27852807 done , _ = _fut_wait (
27862808 list (futures_map ),
2787- timeout = self . worker_timeout_s ,
2809+ timeout = max ( 0.05 , min_remaining ) ,
27882810 return_when = FIRST_COMPLETED ,
27892811 )
2790- if not done :
2791- # Per-worker timeout: at least one worker is stuck
2792- timed_out = True
2812+
2813+ # Identify futures that have individually exceeded the deadline,
2814+ # regardless of whether other workers completed in time.
2815+ now = time .monotonic ()
2816+ stalled = [
2817+ f for f in list (futures_map )
2818+ if not f .done ()
2819+ and (now - futures_map [f ][6 ]) >= self .worker_timeout_s
2820+ ]
2821+ if stalled :
27932822 logger .error (
2794- "_run_parallel_rolling: per-worker timeout (%ds ) exceeded "
2795- "— force-killing all %d remaining workers ." ,
2796- self . worker_timeout_s , len (futures_map ) ,
2823+ "_run_parallel_rolling: %d worker(s ) exceeded per-worker "
2824+ "timeout (%ds) — force-killing all worker processes ." ,
2825+ len (stalled ), self . worker_timeout_s ,
27972826 )
27982827 worker_procs = getattr (executor , "_processes" , {})
27992828 for pid , proc in list (worker_procs .items ()):
@@ -2802,9 +2831,8 @@ def _rebuild_pool() -> None:
28022831 "_run_parallel_rolling: force-killing pid=%d" , pid
28032832 )
28042833 proc .kill ()
2805- # Drain all remaining futures as TIMEOUT
2806- for future in list (futures_map ):
2807- task , run_dir , iteration , gamma_sign , atom_i , atom_j = (
2834+ for future in stalled :
2835+ task , run_dir , iteration , gamma_sign , atom_i , atom_j , _ = (
28082836 futures_map .pop (future )
28092837 )
28102838 logger .error (
@@ -2823,7 +2851,11 @@ def _rebuild_pool() -> None:
28232851 self .queue .release (
28242852 (task .node_id , tuple (task .afir_params ))
28252853 )
2826- break
2854+ # Killing workers breaks the pool. Fall through to the
2855+ # "for future in done:" block below to process any futures
2856+ # that completed normally in the same wait call, then let
2857+ # the next loop iteration trigger a pool rebuild.
2858+ pool_broken = True
28272859 else :
28282860 done , _ = _fut_wait (
28292861 list (futures_map ), return_when = FIRST_COMPLETED
0 commit comments