Skip to content

Commit 1621b25

Browse files
CopilotIntegerAlex
andcommitted
fix: FlushSignal barrier ordering, task_done safety, shutdown race; fix CHANGELOG
Co-authored-by: IntegerAlex <84370725+IntegerAlex@users.noreply.github.com> Agent-Logs-Url: https://github.com/IntegerAlex/kakashi/sessions/9e3024fc-f9e8-4dd5-8869-74ebc0dcfbb8
1 parent b06b97c commit 1621b25

2 files changed

Lines changed: 67 additions & 59 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10-
## [0.2.2] - 2026-03-
10+
## [0.2.2] - 2026-03-17
1111

1212
### Added
1313
- `LOG_LEVEL_DEBUG`, `LOG_LEVEL_INFO`, `LOG_LEVEL_WARNING`, `LOG_LEVEL_ERROR`, and `LOG_LEVEL_CRITICAL` named constants exported from the top-level package, replacing bare integer literals throughout the API.
@@ -17,7 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1717
### Changed
1818
- `shutdown_async_logging` is now registered with `atexit` so buffered async messages are flushed when the interpreter exits rather than being silently dropped.
1919
- `shutdown_async_logging()` now drains the queue via a `None` sentinel and waits for the background worker to finish before returning, ensuring pending messages are processed.
20-
- `AsyncLogger.flush()` and `AsyncLogger.close()` docstrings clarified to accurately describe the best-effort, timing-based nature of the operation.
20+
- `AsyncLogger.flush()` and `AsyncLogger.close()` now use a deterministic synchronization barrier (`_FlushSignal`) so callers block until all preceding messages are fully written, replacing the previous timing-based best-effort approach.
2121
- Removed legacy root-level `__init__.py` that shadowed the `kakashi/` package in editable installs.
2222
- Removed `setup.py`; `pyproject.toml` is now the single canonical build configuration.
2323

kakashi/core/logger.py

Lines changed: 65 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
_async_worker = None
4444
_async_shutdown = threading.Event()
4545

46+
# Set to True before draining on shutdown so _log_async drops new items
47+
_async_shutting_down = False
48+
4649

4750
class _FlushSignal:
4851
"""Queue item used to signal a synchronous flush barrier."""
@@ -62,58 +65,59 @@ def _async_worker_thread():
6265
batch_size = 50 # Optimal batch size for throughput/latency balance
6366

6467
while True:
65-
try:
66-
# Collect batch
67-
batch.clear()
68-
timeout = 0.1 # 100ms batch timeout
69-
shutdown_requested = False
70-
pending_count = 0
68+
batch.clear()
7169

70+
try:
71+
item = _async_queue.get(timeout=0.1)
72+
except queue.Empty:
73+
continue
74+
75+
# Handle the first item from the blocking get
76+
if item is None:
77+
_async_queue.task_done()
78+
break
79+
80+
if isinstance(item, _FlushSignal):
81+
# No prior items in this batch; signal the barrier immediately
82+
item.event.set()
83+
_async_queue.task_done()
84+
continue
85+
86+
batch.append(item)
87+
88+
# Collect additional items (non-blocking), stopping at any sentinel
89+
flush_signal = None
90+
shutdown_requested = False
91+
for _ in range(batch_size - 1):
7292
try:
73-
# Get first item (blocking)
74-
item = _async_queue.get(timeout=timeout)
75-
if item is None:
76-
_async_queue.task_done()
77-
break
78-
if isinstance(item, _FlushSignal):
79-
item.event.set()
80-
_async_queue.task_done()
81-
continue
82-
83-
batch.append(item)
84-
pending_count = 1
85-
86-
# Collect additional items (non-blocking)
87-
for _ in range(batch_size - 1):
88-
try:
89-
item = _async_queue.get_nowait()
90-
if item is None:
91-
_async_queue.task_done()
92-
shutdown_requested = True
93-
break
94-
if isinstance(item, _FlushSignal):
95-
item.event.set()
96-
_async_queue.task_done()
97-
continue
98-
batch.append(item)
99-
pending_count += 1
100-
except queue.Empty:
101-
break
102-
93+
extra = _async_queue.get_nowait()
10394
except queue.Empty:
104-
continue
105-
106-
# Process batch
107-
if batch:
108-
_process_async_batch(batch)
109-
for _ in range(pending_count):
110-
_async_queue.task_done()
95+
break
11196

112-
if shutdown_requested:
97+
if extra is None:
98+
shutdown_requested = True
99+
break
100+
if isinstance(extra, _FlushSignal):
101+
# Stop collecting; process prior items before signalling
102+
flush_signal = extra
113103
break
104+
batch.append(extra)
114105

115-
except Exception:
116-
pass # Ignore errors in background thread
106+
# Process all collected log items before acknowledging any barrier
107+
try:
108+
_process_async_batch(batch)
109+
finally:
110+
for _ in range(len(batch)):
111+
_async_queue.task_done()
112+
113+
# Signal the flush barrier AFTER all preceding messages are written
114+
if flush_signal is not None:
115+
flush_signal.event.set()
116+
_async_queue.task_done()
117+
118+
if shutdown_requested:
119+
_async_queue.task_done() # for the None sentinel
120+
break
117121

118122

119123
def _process_async_batch(batch):
@@ -141,8 +145,9 @@ def _process_async_batch(batch):
141145

142146
def _ensure_async_worker():
143147
"""Ensure async worker thread is running."""
144-
global _async_worker
148+
global _async_worker, _async_shutting_down
145149
if _async_worker is None or not _async_worker.is_alive():
150+
_async_shutting_down = False
146151
_async_worker = threading.Thread(target=_async_worker_thread, daemon=True)
147152
_async_worker.start()
148153

@@ -290,7 +295,11 @@ def _log_async(self, level: int, message: str, fields: Optional[Dict[str, Any]]
290295
# Fast level check
291296
if level < self.min_level:
292297
return
293-
298+
299+
# Drop messages once shutdown has started to prevent shutdown deadlock
300+
if _async_shutting_down:
301+
return
302+
294303
# Non-blocking enqueue to background worker
295304
try:
296305
timestamp = time.time()
@@ -396,18 +405,17 @@ def clear_logger_cache() -> None:
396405

397406

398407
def shutdown_async_logging() -> None:
399-
"""Shutdown async logging gracefully."""
400-
global _async_worker
408+
"""Shutdown async logging gracefully, draining all pending messages first."""
409+
global _async_worker, _async_shutting_down
401410
if _async_worker and _async_worker.is_alive():
402-
# Ensure all enqueued items are processed before stopping the worker.
411+
# Prevent new messages from being enqueued so join() cannot hang
412+
_async_shutting_down = True
413+
414+
# Drain all pre-shutdown items before sending the stop sentinel
403415
_async_queue.join()
404416

405-
# Send shutdown sentinel so the worker can finish processing
406-
try:
407-
_async_queue.put_nowait(None) # Shutdown signal
408-
except queue.Full:
409-
# Fall back to blocking put to ensure the sentinel is enqueued
410-
_async_queue.put(None)
417+
# Send shutdown sentinel so the worker exits its loop
418+
_async_queue.put(None)
411419

412420
# Wait for worker to finish (with timeout)
413421
_async_worker.join(timeout=1.0)

0 commit comments

Comments
 (0)