Skip to content

Commit b706767

Browse files
authored
gh-115634: Fix ProcessPoolExecutor deadlock with max_tasks_per_child (GH-140900)
The idle worker semaphore counts task completions, not idle workers, so it can hold a stale token released by a worker that later exited upon reaching its max_tasks_per_child limit. The worker replacement path consumed such tokens and skipped spawning a replacement, deadlocking the remaining queued tasks once no workers were left. Replace dead workers based on len(self._processes) without consulting the semaphore. The submit() path is unchanged, preserving on-demand spawning and idle worker reuse. Replace the documentation note added in GH-140897 with a versionchanged entry now that the bug is fixed. Based on a fix proposed by Tabrez Mohammed.
1 parent 0a13efc commit b706767

4 files changed

Lines changed: 68 additions & 12 deletions

File tree

Doc/library/concurrent.futures.rst

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -386,11 +386,6 @@ in a REPL or a lambda should not be expected to work.
386386
default in absence of a *mp_context* parameter. This feature is incompatible
387387
with the "fork" start method.
388388

389-
.. note::
390-
Bugs have been reported when using the *max_tasks_per_child* feature that
391-
can result in the :class:`ProcessPoolExecutor` hanging in some
392-
circumstances. Follow its eventual resolution in :gh:`115634`.
393-
394389
.. versionchanged:: 3.3
395390
When one of the worker processes terminates abruptly, a
396391
:exc:`~concurrent.futures.process.BrokenProcessPool` error is now raised.
@@ -426,6 +421,11 @@ in a REPL or a lambda should not be expected to work.
426421
require the *fork* start method for :class:`ProcessPoolExecutor` you must
427422
explicitly pass ``mp_context=multiprocessing.get_context("fork")``.
428423

424+
.. versionchanged:: next
425+
Fixed a deadlock (:gh:`115634`) where the executor could hang after
426+
a worker process exited upon reaching its *max_tasks_per_child*
427+
limit while tasks remained queued.
428+
429429
.. method:: terminate_workers()
430430

431431
Attempt to terminate all living worker processes immediately by calling

Lib/concurrent/futures/process.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ def run(self):
360360
if executor := self.executor_reference():
361361
if process_exited:
362362
with self.shutdown_lock:
363-
executor._adjust_process_count()
363+
executor._replace_dead_worker()
364364
else:
365365
executor._idle_worker_semaphore.release()
366366
del executor
@@ -772,6 +772,30 @@ def _start_executor_manager_thread(self):
772772
_threads_wakeups[self._executor_manager_thread] = \
773773
self._executor_manager_thread_wakeup
774774

775+
def _replace_dead_worker(self):
776+
# gh-132969: avoid error when state is reset and executor is still running,
777+
# which will happen when shutdown(wait=False) is called.
778+
if self._processes is None:
779+
return
780+
781+
# A replacement is pointless when shutting down with nothing left
782+
# to run. Both attributes are read under _shutdown_lock, which
783+
# shutdown() holds while setting _shutdown_thread.
784+
assert self._shutdown_lock.locked()
785+
if self._shutdown_thread and not self._pending_work_items:
786+
return
787+
788+
# gh-115634: A worker exited after reaching max_tasks_per_child and
789+
# has been removed from self._processes. Do not consult
790+
# _idle_worker_semaphore here: it counts task completions, not idle
791+
# workers, so it can hold a stale token released by the now-dead
792+
# worker. Trusting such a token would leave the pool a worker short,
793+
# deadlocking once all workers reach their task limit. Spawning is
794+
# safe from this (manager) thread despite gh-90622 because
795+
# max_tasks_per_child is rejected for the "fork" start method.
796+
if len(self._processes) < self._max_workers:
797+
self._spawn_process()
798+
775799
def _adjust_process_count(self):
776800
# gh-132969: avoid error when state is reset and executor is still running,
777801
# which will happen when shutdown(wait=False) is called.
@@ -784,12 +808,12 @@ def _adjust_process_count(self):
784808

785809
process_count = len(self._processes)
786810
if process_count < self._max_workers:
787-
# Assertion disabled as this codepath is also used to replace a
788-
# worker that unexpectedly dies, even when using the 'fork' start
789-
# method. That means there is still a potential deadlock bug. If a
790-
# 'fork' mp_context worker dies, we'll be forking a new one when
791-
# we know a thread is running (self._executor_manager_thread).
792-
#assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
811+
# gh-90622: spawning a child via fork while another thread is
812+
# running can deadlock in the child. submit() only calls this
813+
# method when using a non-fork start method.
814+
assert (self._safe_to_dynamically_spawn_children
815+
or not self._executor_manager_thread), (
816+
'https://github.com/python/cpython/issues/90622')
793817
self._spawn_process()
794818

795819
def _launch_processes(self):

Lib/test/test_concurrent_futures/test_process_pool.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,33 @@ def test_max_tasks_per_child_defaults_to_spawn_context(self):
241241
executor = self.executor_type(1, max_tasks_per_child=3)
242242
self.assertEqual(executor._mp_context.get_start_method(), "spawn")
243243

244+
def test_max_tasks_per_child_pending_tasks_gh115634(self):
245+
# gh-115634: A worker exiting at its max_tasks_per_child limit left a
246+
# stale token in the idle worker semaphore, so no replacement worker
247+
# was spawned and the remaining queued tasks deadlocked. Submit more
248+
# tasks than the pool can run at once so a backlog is queued while
249+
# workers hit their task limit.
250+
context = self.get_context()
251+
if context.get_start_method(allow_none=False) == "fork":
252+
raise unittest.SkipTest("Incompatible with the fork start method.")
253+
254+
for max_workers, max_tasks, num_tasks in [(1, 2, 6), (2, 2, 8)]:
255+
with self.subTest(max_workers=max_workers, max_tasks=max_tasks):
256+
executor = self.executor_type(
257+
max_workers, mp_context=context,
258+
max_tasks_per_child=max_tasks)
259+
try:
260+
futures = [executor.submit(mul, i, 2)
261+
for i in range(num_tasks)]
262+
# If the deadlock regresses, the result() calls time out,
263+
# and the shutdown below hangs until the test timeout.
264+
results = [f.result(timeout=support.SHORT_TIMEOUT)
265+
for f in futures]
266+
self.assertEqual(results,
267+
[i * 2 for i in range(num_tasks)])
268+
finally:
269+
executor.shutdown(wait=True, cancel_futures=True)
270+
244271
def test_max_tasks_early_shutdown(self):
245272
context = self.get_context()
246273
if context.get_start_method(allow_none=False) == "fork":
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Fix a deadlock in :class:`concurrent.futures.ProcessPoolExecutor` when
2+
using ``max_tasks_per_child``, present since the feature was introduced in
3+
Python 3.11. The executor stopped scheduling queued tasks after a worker
4+
process exited upon reaching its task limit. Based on a fix proposed by
5+
Tabrez Mohammed.

0 commit comments

Comments
 (0)