Skip to content

Commit cf4de1f

Browse files
authored
Merge pull request #225 from ezmsg-org/dev
Prepare for release 3.8.0
2 parents 95418ad + 5998589 commit cf4de1f

6 files changed

Lines changed: 231 additions & 27 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "ezmsg"
3-
version = "3.7.3"
3+
version = "3.8.0"
44
description = "A simple DAG-based computation model"
55
authors = [
66
{ name = "Griffin Milsap", email = "griffin.milsap@gmail.com" },

src/ezmsg/core/backend.py

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import enum
55
import logging
66
import os
7+
import signal
78
from threading import BrokenBarrierError
89
from multiprocessing import Event, Barrier
910
from multiprocessing.synchronize import Event as EventType
@@ -23,6 +24,7 @@
2324
from .backendprocess import (
2425
BackendProcess,
2526
DefaultBackendProcess,
27+
ShutdownSummary,
2628
new_threaded_event_loop,
2729
)
2830

@@ -170,6 +172,7 @@ class GraphRunner:
170172
_graph_context: GraphContext | None
171173
_loop: asyncio.AbstractEventLoop | None
172174
_loop_cm: object | None
175+
_loop_shutdown_summary: ShutdownSummary | None
173176
_main_process: BackendProcess | None
174177
_spawned_processes: list[BackendProcess]
175178
_start_participant: bool
@@ -208,6 +211,7 @@ def __init__(
208211
self._graph_context = None
209212
self._loop = None
210213
self._loop_cm = None
214+
self._loop_shutdown_summary = None
211215
self._main_process = None
212216
self._spawned_processes = []
213217
self._start_participant = False
@@ -320,7 +324,10 @@ def _initialize(self, force_single_process: bool, wait_for_ready: bool) -> bool:
320324
if self._execution_context is None:
321325
return False
322326

323-
self._loop_cm = new_threaded_event_loop()
327+
self._loop_shutdown_summary = ShutdownSummary()
328+
self._loop_cm = new_threaded_event_loop(
329+
shutdown_summary=self._loop_shutdown_summary
330+
)
324331
self._loop = self._loop_cm.__enter__()
325332

326333
try:
@@ -389,12 +396,15 @@ def _run_main_process(self) -> None:
389396
self._main_process = self._execution_context.processes[0]
390397
self._start_processes(self._execution_context.processes[1:])
391398

399+
interrupts = 0
400+
forced_sigint = False
392401
try:
393402
self._main_process.process(self._loop)
394403
self._join_spawned_processes()
395404
logger.info("All processes exited normally")
396405

397406
except KeyboardInterrupt:
407+
interrupts += 1
398408
logger.info(
399409
"Attempting graceful shutdown, interrupt again to force quit..."
400410
)
@@ -404,17 +414,81 @@ def _run_main_process(self) -> None:
404414
self._join_spawned_processes()
405415

406416
except KeyboardInterrupt:
417+
interrupts += 1
418+
forced_sigint = True
407419
logger.warning("Interrupt intercepted, force quitting")
408420
self._execution_context.start_barrier.abort()
409421
self._execution_context.stop_barrier.abort()
410422
for proc in self._spawned_processes:
411423
proc.terminate()
412424

413425
finally:
414-
self._join_spawned_processes()
415-
self._cleanup()
426+
while True:
427+
try:
428+
self._join_spawned_processes()
429+
self._cleanup()
430+
break
431+
except KeyboardInterrupt:
432+
interrupts += 1
433+
if interrupts >= 2:
434+
forced_sigint = True
435+
logger.warning("Interrupt intercepted, force quitting")
436+
if self._execution_context is not None:
437+
self._execution_context.start_barrier.abort()
438+
self._execution_context.stop_barrier.abort()
439+
for proc in self._spawned_processes:
440+
proc.terminate()
441+
self._cleanup()
442+
break
443+
logger.info(
444+
"Interrupt received during cleanup; attempting graceful shutdown..."
445+
)
446+
if self._execution_context is not None:
447+
self._execution_context.term_ev.set()
416448
self._started = False
417449
self._stopped = True
450+
if interrupts and not forced_sigint and self._shutdown_was_unclean():
451+
forced_sigint = True
452+
if forced_sigint:
453+
self._exit_with_sigint()
454+
455+
def _shutdown_was_unclean(self) -> bool:
456+
main_shutdown_errors = bool(
457+
self._main_process is not None
458+
and getattr(self._main_process, "_shutdown_errors", False)
459+
)
460+
summary = self._loop_shutdown_summary
461+
loop_unclean = bool(summary is not None and summary.unclean)
462+
return main_shutdown_errors or loop_unclean
463+
464+
def _exit_with_sigint(self) -> None:
465+
code = 0xC000013A if os.name == "nt" else 130
466+
if os.name == "nt":
467+
try:
468+
import ctypes
469+
except Exception:
470+
os._exit(1)
471+
try:
472+
ctypes.windll.kernel32.ExitProcess(ctypes.c_uint(code).value)
473+
except Exception:
474+
os._exit(ctypes.c_int32(code).value)
475+
return
476+
477+
prev_handler = None
478+
try:
479+
prev_handler = signal.getsignal(signal.SIGINT)
480+
signal.signal(signal.SIGINT, signal.SIG_DFL)
481+
signal.raise_signal(signal.SIGINT)
482+
except Exception:
483+
raise SystemExit(code)
484+
finally:
485+
if prev_handler is not None:
486+
try:
487+
signal.signal(signal.SIGINT, prev_handler)
488+
except Exception:
489+
pass
490+
491+
raise SystemExit(code)
418492

419493
def _cleanup(self) -> None:
420494
if self._cleanup_done:

src/ezmsg/core/backendprocess.py

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import weakref
1010

1111
from abc import abstractmethod
12+
from dataclasses import dataclass
1213
from collections import defaultdict
1314
from collections.abc import Callable, Coroutine, Generator, Sequence
1415
from functools import wraps, partial
@@ -39,7 +40,30 @@ def _strict_shutdown_enabled() -> bool:
3940
return value.lower() in ("1", "true", "yes", "on")
4041

4142

43+
@dataclass
44+
class ShutdownSummary:
45+
cancelled_tasks: int = 0
46+
executor_active: int = 0
47+
pending_tasks: int = 0
48+
suppressed_errors: int = 0
49+
forced_interrupt: bool = False
50+
51+
@property
52+
def unclean(self) -> bool:
53+
return bool(
54+
self.executor_active
55+
or self.pending_tasks
56+
or self.suppressed_errors
57+
or self.forced_interrupt
58+
)
59+
60+
4261
class _DaemonThreadPoolExecutor(ThreadPoolExecutor):
62+
def __init__(self, *args, **kwargs) -> None:
63+
super().__init__(*args, **kwargs)
64+
self._active_count = 0
65+
self._active_lock = threading.Lock()
66+
4367
def _adjust_thread_count(self) -> None:
4468
if self._broken:
4569
return
@@ -61,6 +85,22 @@ def _adjust_thread_count(self) -> None:
6185
thread.start()
6286
self._threads.add(thread)
6387

88+
def submit(self, fn, /, *args, **kwargs):
89+
fut = super().submit(fn, *args, **kwargs)
90+
with self._active_lock:
91+
self._active_count += 1
92+
93+
def _decrement(_):
94+
with self._active_lock:
95+
self._active_count -= 1
96+
97+
fut.add_done_callback(_decrement)
98+
return fut
99+
100+
def active_count(self) -> int:
101+
with self._active_lock:
102+
return self._active_count
103+
64104

65105
class Complete(Exception):
66106
"""
@@ -178,11 +218,13 @@ class DefaultBackendProcess(BackendProcess):
178218
"""
179219

180220
pubs: dict[str, Publisher]
221+
_shutdown_errors: bool
181222

182223
def process(self, loop: asyncio.AbstractEventLoop) -> None:
183224
main_func = None
184225
context = GraphContext(self.graph_address)
185226
coro_callables: dict[str, Callable[[], Coroutine[Any, Any, None]]] = dict()
227+
self._shutdown_errors = False
186228

187229
try:
188230
self.pubs = dict()
@@ -445,6 +487,8 @@ async def wrapped_task(msg: Any = None) -> None:
445487
except Exception:
446488
logger.error(f"Exception in Task: {task_address}")
447489
logger.error(traceback.format_exc())
490+
if self.term_ev.is_set():
491+
self._shutdown_errors = True
448492
if strict_shutdown:
449493
raise
450494

@@ -522,6 +566,7 @@ def run_loop(loop: asyncio.AbstractEventLoop):
522566
@contextmanager
523567
def new_threaded_event_loop(
524568
ev: threading.Event | None = None,
569+
shutdown_summary: ShutdownSummary | None = None,
525570
) -> Generator[asyncio.AbstractEventLoop, None, None]:
526571
"""
527572
Create a new asyncio event loop running in a separate thread.
@@ -531,6 +576,8 @@ def new_threaded_event_loop(
531576
532577
:param ev: Optional event to signal when the loop is ready.
533578
:type ev: threading.Event | None
579+
:param shutdown_summary: Optional shutdown summary object to populate.
580+
:type shutdown_summary: ShutdownSummary | None
534581
:return: Context manager yielding the event loop.
535582
:rtype: Generator[asyncio.AbstractEventLoop, None, None]
536583
"""
@@ -539,10 +586,10 @@ def new_threaded_event_loop(
539586
shutdown_suppress = threading.Event()
540587
suppressed_shutdown_errors = {"count": 0}
541588
suppressed_lock = threading.Lock()
589+
executor = None
542590
if not strict_shutdown:
543-
loop.set_default_executor(
544-
_DaemonThreadPoolExecutor(thread_name_prefix="EZMSG")
545-
)
591+
executor = _DaemonThreadPoolExecutor(thread_name_prefix="EZMSG")
592+
loop.set_default_executor(executor)
546593
def _loop_exception_handler(
547594
loop_obj: asyncio.AbstractEventLoop, context: dict
548595
) -> None:
@@ -568,36 +615,45 @@ def _loop_exception_handler(
568615
if not strict_shutdown:
569616
shutdown_suppress.set()
570617
# Cancel and await remaining tasks before stopping the loop.
571-
async def _cancel_remaining() -> int:
618+
async def _cancel_remaining(timeout: float = 1.0) -> tuple[int, int]:
572619
tasks = [
573620
t
574621
for t in asyncio.all_tasks()
575622
if t is not asyncio.current_task() and not t.done()
576623
]
577624
for t in tasks:
578625
t.cancel()
579-
if tasks:
580-
await asyncio.wait(tasks)
581-
return len(tasks)
626+
if not tasks:
627+
return 0, 0
628+
_, pending = await asyncio.wait(tasks, timeout=timeout)
629+
return len(tasks), len(pending)
582630

583631
cancelled_count = 0
632+
pending_count = 0
584633
forced_interrupt = False
585634
fut = asyncio.run_coroutine_threadsafe(_cancel_remaining(), loop)
586635
try:
587-
cancelled_count = fut.result()
636+
cancelled_count, pending_count = fut.result()
588637
except KeyboardInterrupt:
589638
forced_interrupt = True
590639
fut.cancel()
591640
except Exception:
592641
cancelled_count = 0
642+
pending_count = 0
593643

594644
suppressed_count = suppressed_shutdown_errors["count"]
595-
if cancelled_count or suppressed_count or forced_interrupt:
645+
if cancelled_count or suppressed_count or forced_interrupt or pending_count:
596646
if forced_interrupt and not cancelled_count and not suppressed_count:
597647
logger.warning(
598648
"Shutdown interrupted; tasks may still be running. "
599649
"Re-run with EZMSG_STRICT_SHUTDOWN=1 to debug tasks with poor shutdown behavior."
600650
)
651+
elif pending_count:
652+
logger.warning(
653+
"Shutdown timed out waiting for %d task(s). "
654+
"Re-run with EZMSG_STRICT_SHUTDOWN=1 to debug tasks with poor shutdown behavior.",
655+
pending_count,
656+
)
601657
else:
602658
logger.warning(
603659
"Shutdown suppressed %d error(s) and cancelled %d task(s). "
@@ -607,6 +663,15 @@ async def _cancel_remaining() -> int:
607663
cancelled_count,
608664
)
609665

666+
if shutdown_summary is not None:
667+
shutdown_summary.cancelled_tasks = cancelled_count
668+
shutdown_summary.pending_tasks = pending_count
669+
shutdown_summary.executor_active = (
670+
executor.active_count() if executor is not None else 0
671+
)
672+
shutdown_summary.suppressed_errors = suppressed_count
673+
shutdown_summary.forced_interrupt = forced_interrupt
674+
610675
loop.call_soon_threadsafe(loop.stop)
611676
thread.join()
612677
loop.close()

tests/clean_shutdown_examples_runner.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,29 @@ def _listen() -> None:
142142
+ ", ".join(sorted(SYSTEMS))
143143
)
144144
system = SYSTEMS[case]()
145-
print("READY", flush=True)
146-
ez.run(SYSTEM=system)
145+
runner = ez.GraphRunner(SYSTEM=system)
146+
ready_emitted = threading.Event()
147+
done = threading.Event()
148+
149+
def _emit_ready() -> None:
150+
if not ready_emitted.is_set():
151+
print("READY", flush=True)
152+
ready_emitted.set()
153+
154+
def _watch_ready() -> None:
155+
while not done.is_set():
156+
if runner.running:
157+
_emit_ready()
158+
return
159+
time.sleep(0.01)
160+
_emit_ready()
161+
162+
threading.Thread(target=_watch_ready, daemon=True).start()
163+
try:
164+
runner.run_blocking()
165+
finally:
166+
done.set()
167+
_emit_ready()
147168

148169

149170
if __name__ == "__main__":

0 commit comments

Comments
 (0)