Skip to content

Commit c8df5bc

Browse files
committed
Use SimpleQueue for summary in parallel coordinator
SimpleQueue.put is synchronous (no feeder thread, no internal buffer), so a successful put() implies the bytes are already in the kernel pipe. That removes the need for the summary.close() + summary.join_thread() dance in on_timeout before the watcher's os._exit(1), and the comment that explained it. The coordinator-side drain thread is updated to a blocking get() driven by a sentinel on shutdown, eliminating its busy-loop timeout too. results stays a Queue because the progressbar liveness loop relies on get(timeout=...), which SimpleQueue does not expose publicly.
1 parent 4a8a452 commit c8df5bc

2 files changed

Lines changed: 35 additions & 40 deletions

File tree

RLTest/__main__.py

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import time
1313
import shlex
1414
import json
15-
from multiprocessing import Process, Queue, set_start_method
15+
from multiprocessing import Process, Queue, SimpleQueue, set_start_method
1616

1717
from RLTest.env import Env, TestAssertionFailure, Defaults
1818
from RLTest.utils import Colors, fix_modules, fix_modulesArgs, is_github_actions
@@ -357,25 +357,29 @@ def __enter__(self):
357357
def __exit__(self, type, value, traceback):
358358
self.runner.takeEnvDown()
359359

360+
# Sentinel used to signal the summary-drainer thread to stop. Must be
361+
# pickleable (SimpleQueue pickles everything) and distinguishable from any
362+
# payload a worker might put, which is always a dict.
363+
_SUMMARY_DRAIN_STOP = ('__rltest_summary_stop__',)
364+
365+
360366
def _join_workers_with_summary_drain(processes, summary, timeout=None):
361367
"""Wait for all worker processes to exit while continuously draining the
362368
``summary`` queue, and return the list of collected summary entries.
363369
364-
A background thread drains ``summary`` so worker feeder threads never
365-
block writing to a full summary-pipe buffer. Without this,
366-
``on_timeout``'s ``summary.join_thread()`` and Python's end-of-process
367-
queue finalization can both block in ``pipe_write``, in turn hanging
368-
``p.join()`` here indefinitely.
370+
A background thread drains ``summary`` so that worker ``put()`` calls
371+
never block on a full summary-pipe buffer. Without this, on_timeout and
372+
the final summary.put at worker-exit can block in ``pipe_write``, in
373+
turn hanging ``p.join()`` here indefinitely.
369374
"""
370-
stop = threading.Event()
371375
collected = []
372376

373377
def _drain():
374-
while not stop.is_set():
375-
try:
376-
collected.append(summary.get(timeout=0.1))
377-
except Exception:
378-
pass
378+
while True:
379+
item = summary.get()
380+
if item == _SUMMARY_DRAIN_STOP:
381+
break
382+
collected.append(item)
379383

380384
drainer = threading.Thread(target=_drain)
381385
drainer.start()
@@ -385,13 +389,8 @@ def _drain():
385389
remaining = None if deadline is None else max(0.0, deadline - time.time())
386390
p.join(timeout=remaining)
387391
finally:
388-
stop.set()
392+
summary.put(_SUMMARY_DRAIN_STOP)
389393
drainer.join()
390-
while True:
391-
try:
392-
collected.append(summary.get_nowait())
393-
except Exception:
394-
break
395394
return collected
396395

397396

@@ -999,17 +998,14 @@ def on_timeout():
999998
except Exception as e:
1000999
self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Exception on timeout function %s' % str(e)))
10011000
finally:
1001+
# `summary` is a SimpleQueue, so its put() writes straight to
1002+
# the pipe and needs no explicit flush. `results` is a Queue
1003+
# with a feeder thread; close() + join_thread() ensures the
1004+
# put above is written to the pipe before the watcher thread
1005+
# calls os._exit(1), which bypasses Python finalization.
1006+
summary.put({'done': done, 'failures': self.testsFailed})
10021007
results.put({'test_name': test.name, "output": output.getvalue()}, block=False)
1003-
summary.put({'done': done, 'failures': self.testsFailed}, block=False)
1004-
# The watcher thread calls os._exit(1) immediately after this
1005-
# closure returns, bypassing Python finalization. Close the
1006-
# queues and join their feeder threads here so pending put()s
1007-
# are flushed to the pipes first. (The coordinator drains the
1008-
# summary queue concurrently, which prevents join_thread() from
1009-
# blocking on a full pipe.)
10101008
results.close()
1011-
summary.close()
1012-
summary.join_thread()
10131009
results.join_thread()
10141010
done += self.run_single_test(test, on_timeout)
10151011

@@ -1018,10 +1014,10 @@ def on_timeout():
10181014
self.takeEnvDown(fullShutDown=True)
10191015

10201016
# serialized the results back
1021-
summary.put({'done': done, 'failures': self.testsFailed}, block=False)
1017+
summary.put({'done': done, 'failures': self.testsFailed})
10221018

10231019
results = Queue()
1024-
summary = Queue()
1020+
summary = SimpleQueue()
10251021
# Open group for all tests at the start (parallel execution)
10261022
self._openGitHubActionsTestsGroup()
10271023
if self.parallelism == 1:

tests/unit/test_parallel_drain.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
"""Regression test for a hang in the parallel test coordinator.
22
3-
Prior to the fix, the coordinator joined all worker processes before draining
3+
Prior to the fix, the coordinator joined all worker processes before reading
44
the ``summary`` queue. With enough data queued (or a single large
55
``self.testsFailed`` dict), the summary pipe buffer (~64 KiB on Linux)
6-
saturates and worker feeder threads block in ``pipe_write`` during Python's
7-
end-of-process queue finalization (and similarly inside ``on_timeout``'s
8-
``summary.join_thread()``). That causes the coordinator's ``p.join()`` to
9-
hang indefinitely.
6+
saturates and worker ``put()``s block in ``pipe_write``, which in turn hangs
7+
the coordinator's ``p.join()`` indefinitely.
108
119
The fix drains ``summary`` from a background thread while workers are being
1210
joined. This test reproduces the saturation scenario and asserts the helper
13-
completes within a bounded time with every worker cleanly exited.
11+
completes with every worker cleanly exited.
1412
"""
1513

1614
import multiprocessing as mp
@@ -22,21 +20,22 @@
2220

2321

2422
# ~32 KiB per message × 8 workers = 256 KiB total, comfortably exceeding the
25-
# typical 64 KiB pipe buffer on Linux, so at least some feeder threads will
26-
# block on ``pipe_write`` unless the parent is actively reading.
23+
# typical 64 KiB pipe buffer on Linux, so at least some writers will block
24+
# on ``pipe_write`` unless the parent is actively reading.
2725
_PAYLOAD_BYTES = 32 * 1024
2826
_NUM_WORKERS = 8
2927
_JOIN_TIMEOUT_SECS = 30.0
3028

3129

3230
def _worker_puts_large_summary(summary):
31+
# SimpleQueue.put writes synchronously to the pipe: without a parallel
32+
# drain on the parent side, this call itself blocks forever once the
33+
# pipe saturates.
3334
summary.put({
3435
'done': 1,
3536
'failures': {},
3637
'payload': 'x' * _PAYLOAD_BYTES,
3738
})
38-
# Return normally; Python finalization will join the feeder thread,
39-
# which is where a non-draining parent would cause the hang.
4039

4140

4241
class TestJoinWorkersWithSummaryDrain(TestCase):
@@ -57,7 +56,7 @@ def tearDown(self):
5756
p.join(timeout=5)
5857

5958
def test_large_summary_does_not_hang(self):
60-
self._summary = self._ctx.Queue()
59+
self._summary = self._ctx.SimpleQueue()
6160
self._procs = [
6261
self._ctx.Process(
6362
target=_worker_puts_large_summary,

0 commit comments

Comments
 (0)