Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,6 @@ in a REPL or a lambda should not be expected to work.
default in absence of a *mp_context* parameter. This feature is incompatible
with the "fork" start method.

.. note::
Bugs have been reported when using the *max_tasks_per_child* feature that
can result in the :class:`ProcessPoolExecutor` hanging in some
circumstances. Follow its eventual resolution in :gh:`115634`.

.. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a
:exc:`~concurrent.futures.process.BrokenProcessPool` error is now raised.
Expand Down Expand Up @@ -426,6 +421,11 @@ in a REPL or a lambda should not be expected to work.
require the *fork* start method for :class:`ProcessPoolExecutor` you must
explicitly pass ``mp_context=multiprocessing.get_context("fork")``.

.. versionchanged:: next
Fixed a deadlock (:gh:`115634`) where the executor could hang after
a worker process exited upon reaching its *max_tasks_per_child*
limit while tasks remained queued.

.. method:: terminate_workers()

Attempt to terminate all living worker processes immediately by calling
Expand Down
38 changes: 31 additions & 7 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def run(self):
if executor := self.executor_reference():
if process_exited:
with self.shutdown_lock:
executor._adjust_process_count()
executor._replace_dead_worker()
else:
executor._idle_worker_semaphore.release()
del executor
Expand Down Expand Up @@ -772,6 +772,30 @@ def _start_executor_manager_thread(self):
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup

def _replace_dead_worker(self):
# gh-132969: avoid error when state is reset and executor is still running,
# which will happen when shutdown(wait=False) is called.
if self._processes is None:
return

# A replacement is pointless when shutting down with nothing left
# to run. Both attributes are read under _shutdown_lock, which
# shutdown() holds while setting _shutdown_thread.
assert self._shutdown_lock.locked()
if self._shutdown_thread and not self._pending_work_items:
return

# gh-115634: A worker exited after reaching max_tasks_per_child and
# has been removed from self._processes. Do not consult
# _idle_worker_semaphore here: it counts task completions, not idle
# workers, so it can hold a stale token released by the now-dead
# worker. Trusting such a token would leave the pool a worker short,
# deadlocking once all workers reach their task limit. Spawning is
# safe from this (manager) thread despite gh-90622 because
# max_tasks_per_child is rejected for the "fork" start method.
if len(self._processes) < self._max_workers:
self._spawn_process()

def _adjust_process_count(self):
# gh-132969: avoid error when state is reset and executor is still running,
# which will happen when shutdown(wait=False) is called.
Expand All @@ -784,12 +808,12 @@ def _adjust_process_count(self):

process_count = len(self._processes)
if process_count < self._max_workers:
# Assertion disabled as this codepath is also used to replace a
# worker that unexpectedly dies, even when using the 'fork' start
# method. That means there is still a potential deadlock bug. If a
# 'fork' mp_context worker dies, we'll be forking a new one when
# we know a thread is running (self._executor_manager_thread).
#assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
# gh-90622: spawning a child via fork while another thread is
# running can deadlock in the child. submit() only calls this
# method when using a non-fork start method.
assert (self._safe_to_dynamically_spawn_children
or not self._executor_manager_thread), (
'https://github.com/python/cpython/issues/90622')
self._spawn_process()

def _launch_processes(self):
Expand Down
27 changes: 27 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,33 @@ def test_max_tasks_per_child_defaults_to_spawn_context(self):
executor = self.executor_type(1, max_tasks_per_child=3)
self.assertEqual(executor._mp_context.get_start_method(), "spawn")

def test_max_tasks_per_child_pending_tasks_gh115634(self):
# gh-115634: A worker exiting at its max_tasks_per_child limit left a
# stale token in the idle worker semaphore, so no replacement worker
# was spawned and the remaining queued tasks deadlocked. Submit more
# tasks than the pool can run at once so a backlog is queued while
# workers hit their task limit.
context = self.get_context()
if context.get_start_method(allow_none=False) == "fork":
raise unittest.SkipTest("Incompatible with the fork start method.")

for max_workers, max_tasks, num_tasks in [(1, 2, 6), (2, 2, 8)]:
with self.subTest(max_workers=max_workers, max_tasks=max_tasks):
executor = self.executor_type(
max_workers, mp_context=context,
max_tasks_per_child=max_tasks)
try:
futures = [executor.submit(mul, i, 2)
for i in range(num_tasks)]
# If the deadlock regresses, the result() calls time out,
# and the shutdown below hangs until the test timeout.
results = [f.result(timeout=support.SHORT_TIMEOUT)
for f in futures]
self.assertEqual(results,
[i * 2 for i in range(num_tasks)])
finally:
executor.shutdown(wait=True, cancel_futures=True)

def test_max_tasks_early_shutdown(self):
context = self.get_context()
if context.get_start_method(allow_none=False) == "fork":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Fix a deadlock in :class:`concurrent.futures.ProcessPoolExecutor` when
using ``max_tasks_per_child``, present since the feature was introduced in
Python 3.11. The executor stopped scheduling queued tasks after a worker
process exited upon reaching its task limit. Based on a fix proposed by
Tabrez Mohammed.
Loading