Skip to content

Commit 4a8a452

Browse files
committed
Fix parallel-coordinator hang when summary pipe saturates
The coordinator drained the 'summary' queue only after joining all worker processes. With enough queued data (or a single large testsFailed dict), the summary-pipe buffer (~64 KiB on Linux) saturates and worker feeder threads block in pipe_write, both inside on_timeout's join_thread() and during Python's end-of-process queue finalization. This in turn hangs the coordinator's p.join() indefinitely. Introduce a module-level helper _join_workers_with_summary_drain that joins workers while continuously draining 'summary' from a background thread, and use it in execute(). Also correct the stale comment in the on_timeout closure to describe the actual watcher-thread os._exit(1) flow.
1 parent 96a65de commit 4a8a452

2 files changed

Lines changed: 133 additions & 10 deletions

File tree

RLTest/__main__.py

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,44 @@ def __enter__(self):
357357
def __exit__(self, type, value, traceback):
358358
self.runner.takeEnvDown()
359359

360+
def _join_workers_with_summary_drain(processes, summary, timeout=None):
361+
"""Wait for all worker processes to exit while continuously draining the
362+
``summary`` queue, and return the list of collected summary entries.
363+
364+
A background thread drains ``summary`` so worker feeder threads never
365+
block writing to a full summary-pipe buffer. Without this,
366+
``on_timeout``'s ``summary.join_thread()`` and Python's end-of-process
367+
queue finalization can both block in ``pipe_write``, in turn hanging
368+
``p.join()`` here indefinitely.
369+
"""
370+
stop = threading.Event()
371+
collected = []
372+
373+
def _drain():
374+
while not stop.is_set():
375+
try:
376+
collected.append(summary.get(timeout=0.1))
377+
except Exception:
378+
pass
379+
380+
drainer = threading.Thread(target=_drain)
381+
drainer.start()
382+
try:
383+
deadline = None if timeout is None else time.time() + timeout
384+
for p in processes:
385+
remaining = None if deadline is None else max(0.0, deadline - time.time())
386+
p.join(timeout=remaining)
387+
finally:
388+
stop.set()
389+
drainer.join()
390+
while True:
391+
try:
392+
collected.append(summary.get_nowait())
393+
except Exception:
394+
break
395+
return collected
396+
397+
360398
class TestTimeLimit(object):
361399
"""
362400
A test timeout watcher. The watcher opens thread that sleep for the
@@ -963,7 +1001,12 @@ def on_timeout():
9631001
finally:
9641002
results.put({'test_name': test.name, "output": output.getvalue()}, block=False)
9651003
summary.put({'done': done, 'failures': self.testsFailed}, block=False)
966-
# After we return the processes will be killed, so we must make sure the queues are drained properly.
1004+
# The watcher thread calls os._exit(1) immediately after this
1005+
# closure returns, bypassing Python finalization. Close the
1006+
# queues and join their feeder threads here so pending put()s
1007+
# are flushed to the pipes first. (The coordinator drains the
1008+
# summary queue concurrently, which prevents join_thread() from
1009+
# blocking on a full pipe.)
9671010
results.close()
9681011
summary.close()
9691012
summary.join_thread()
@@ -1008,15 +1051,9 @@ def on_timeout():
10081051
output = res['output']
10091052
print('%s' % output, end="")
10101053

1011-
for p in processes:
1012-
p.join()
1013-
1014-
# join results
1015-
while True:
1016-
try:
1017-
res = summary.get(timeout=1)
1018-
except Exception as e:
1019-
break
1054+
# Join worker processes while concurrently draining `summary`,
1055+
# so their feeder threads do not block on a full pipe buffer.
1056+
for res in _join_workers_with_summary_drain(processes, summary):
10201057
done += res['done']
10211058
self.testsFailed.update(res['failures'])
10221059

tests/unit/test_parallel_drain.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Regression test for a hang in the parallel test coordinator.
2+
3+
Prior to the fix, the coordinator joined all worker processes before draining
4+
the ``summary`` queue. With enough data queued (or a single large
5+
``self.testsFailed`` dict), the summary pipe buffer (~64 KiB on Linux)
6+
saturates and worker feeder threads block in ``pipe_write`` during Python's
7+
end-of-process queue finalization (and similarly inside ``on_timeout``'s
8+
``summary.join_thread()``). That causes the coordinator's ``p.join()`` to
9+
hang indefinitely.
10+
11+
The fix drains ``summary`` from a background thread while workers are being
12+
joined. This test reproduces the saturation scenario and asserts the helper
13+
completes within a bounded time with every worker cleanly exited.
14+
"""
15+
16+
import multiprocessing as mp
17+
import sys
18+
import time
19+
from unittest import TestCase
20+
21+
from RLTest.__main__ import _join_workers_with_summary_drain
22+
23+
24+
# ~32 KiB per message × 8 workers = 256 KiB total, comfortably exceeding the
25+
# typical 64 KiB pipe buffer on Linux, so at least some feeder threads will
26+
# block on ``pipe_write`` unless the parent is actively reading.
27+
_PAYLOAD_BYTES = 32 * 1024
28+
_NUM_WORKERS = 8
29+
_JOIN_TIMEOUT_SECS = 30.0
30+
31+
32+
def _worker_puts_large_summary(summary):
33+
summary.put({
34+
'done': 1,
35+
'failures': {},
36+
'payload': 'x' * _PAYLOAD_BYTES,
37+
})
38+
# Return normally; Python finalization will join the feeder thread,
39+
# which is where a non-draining parent would cause the hang.
40+
41+
42+
class TestJoinWorkersWithSummaryDrain(TestCase):
43+
44+
def setUp(self):
45+
if sys.platform == 'win32':
46+
self.skipTest('fork start method is unavailable on Windows')
47+
self._ctx = mp.get_context('fork')
48+
self._procs = []
49+
self._summary = None
50+
51+
def tearDown(self):
52+
# Safety net: if the helper ever hangs despite the fix, make sure the
53+
# pytest session can still exit cleanly.
54+
for p in self._procs:
55+
if p.is_alive():
56+
p.kill()
57+
p.join(timeout=5)
58+
59+
def test_large_summary_does_not_hang(self):
60+
self._summary = self._ctx.Queue()
61+
self._procs = [
62+
self._ctx.Process(
63+
target=_worker_puts_large_summary,
64+
args=(self._summary,),
65+
)
66+
for _ in range(_NUM_WORKERS)
67+
]
68+
for p in self._procs:
69+
p.start()
70+
71+
start = time.time()
72+
collected = _join_workers_with_summary_drain(
73+
self._procs, self._summary, timeout=_JOIN_TIMEOUT_SECS,
74+
)
75+
elapsed = time.time() - start
76+
77+
for p in self._procs:
78+
self.assertFalse(
79+
p.is_alive(),
80+
'worker still alive after drain-join; summary pipe likely saturated',
81+
)
82+
self.assertEqual(p.exitcode, 0)
83+
self.assertEqual(len(collected), _NUM_WORKERS)
84+
# The helper should return well under its own timeout; we only assert a
85+
# loose upper bound to avoid flakiness on slow machines.
86+
self.assertLess(elapsed, _JOIN_TIMEOUT_SECS)

0 commit comments

Comments
 (0)