Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,17 @@ And::
executor.submit(wait_on_future)


.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

An :class:`Executor` subclass that uses a pool of at most *max_workers*
threads to execute calls asynchronously.

*initializer* is an optional callable that is called at the start of
each worker thread; *initargs* is a tuple of arguments passed to the
initializer. Should *initializer* raise an exception, all currently
pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
as well any attempt to submit more jobs to the pool.

.. versionchanged:: 3.5
If *max_workers* is ``None`` or
not given, it will default to the number of processors on the machine,
Expand All @@ -142,6 +148,10 @@ And::
control the threading.Thread names for worker threads created by
the pool for easier debugging.

.. versionchanged:: 3.7
Added the *initializer* and *initargs* arguments.


.. _threadpoolexecutor-example:

ThreadPoolExecutor Example
Expand Down Expand Up @@ -191,7 +201,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock.

.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None)
.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

An :class:`Executor` subclass that executes calls asynchronously using a pool
of at most *max_workers* processes. If *max_workers* is ``None`` or not
Expand All @@ -202,6 +212,12 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
launch the workers. If *mp_context* is ``None`` or not given, the default
multiprocessing context is used.

*initializer* is an optional callable that is called at the start of
each worker process; *initargs* is a tuple of arguments passed to the
initializer. Should *initializer* raise an exception, all currently
pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
as well any attempt to submit more jobs to the pool.

.. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a
:exc:`BrokenProcessPool` error is now raised. Previously, behaviour
Expand All @@ -212,6 +228,8 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
The *mp_context* argument was added to allow users to control the
start_method for worker processes created by the pool.

Added the *initializer* and *initargs* arguments.


.. _processpoolexecutor-example:

Expand Down Expand Up @@ -432,13 +450,31 @@ Exception classes

Raised when a future operation exceeds the given timeout.

.. exception:: BrokenExecutor

Derived from :exc:`RuntimeError`, this exception class is raised
when an executor is broken for some reason, and cannot be used
to submit or execute new tasks.

.. versionadded:: 3.7

.. currentmodule:: concurrent.futures.thread

.. exception:: BrokenThreadPool

Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception
class is raised when one of the workers of a :class:`ThreadPoolExecutor`
has failed initializing.

.. versionadded:: 3.7

.. currentmodule:: concurrent.futures.process

.. exception:: BrokenProcessPool

Derived from :exc:`RuntimeError`, this exception class is raised when
one of the workers of a :class:`ProcessPoolExecutor` has terminated
in a non-clean fashion (for example, if it was killed from the outside).
Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly
:exc:`RuntimeError`), this exception class is raised when one of the
workers of a :class:`ProcessPoolExecutor` has terminated in a non-clean
fashion (for example, if it was killed from the outside).

.. versionadded:: 3.3

1 change: 1 addition & 0 deletions Lib/concurrent/futures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ALL_COMPLETED,
CancelledError,
TimeoutError,
BrokenExecutor,
Future,
Executor,
wait,
Expand Down
6 changes: 6 additions & 0 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,3 +610,9 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False


class BrokenExecutor(RuntimeError):
"""
Raised when a executor has become non-functional after a severe failure.
"""
36 changes: 29 additions & 7 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def __init__(self, work_id, fn, args, kwargs):
self.args = args
self.kwargs = kwargs


def _get_chunks(*iterables, chunksize):
""" Iterates over zip()ed iterables in chunks. """
it = zip(*iterables)
Expand All @@ -151,7 +152,7 @@ def _process_chunk(fn, chunk):
"""
return [fn(*args) for args in chunk]

def _process_worker(call_queue, result_queue):
def _process_worker(call_queue, result_queue, initializer, initargs):
"""Evaluates calls from call_queue and places the results in result_queue.

This worker is run in a separate process.
Expand All @@ -161,7 +162,17 @@ def _process_worker(call_queue, result_queue):
evaluated by the worker.
result_queue: A ctx.Queue of _ResultItems that will written
to by the worker.
initializer: A callable initializer, or None
initargs: A tuple of args for the initializer
"""
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
# The parent will notice that the process stopped and
# mark the pool broken
return
while True:
call_item = call_queue.get(block=True)
if call_item is None:
Expand Down Expand Up @@ -277,7 +288,9 @@ def shutdown_worker():
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._broken = ('A child process terminated '
'abruptly, the process pool is not '
'usable anymore')
executor._shutdown_thread = True
executor = None
# All futures in flight must be marked failed
Expand Down Expand Up @@ -372,15 +385,16 @@ def _chain_from_iterable_of_lists(iterable):
yield element.pop()


class BrokenProcessPool(RuntimeError):
class BrokenProcessPool(_base.BrokenExecutor):
"""
Raised when a process in a ProcessPoolExecutor terminated abruptly
while a future was in the running state.
"""


class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=()):
"""Initializes a new ProcessPoolExecutor instance.

Args:
Expand All @@ -389,6 +403,8 @@ def __init__(self, max_workers=None, mp_context=None):
worker processes will be created as the machine has processors.
mp_context: A multiprocessing context to launch the workers. This
object should provide SimpleQueue, Queue and Process.
initializer: An callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
"""
_check_system_limits()

Expand All @@ -403,6 +419,11 @@ def __init__(self, max_workers=None, mp_context=None):
mp_context = mp.get_context()
self._mp_context = mp_context

if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")
self._initializer = initializer
self._initargs = initargs

# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
Expand Down Expand Up @@ -450,15 +471,16 @@ def _adjust_process_count(self):
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue))
self._result_queue,
self._initializer,
self._initargs))
p.start()
self._processes[p.pid] = p

def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool('A child process terminated '
'abruptly, the process pool is not usable anymore')
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')

Expand Down
51 changes: 48 additions & 3 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def _python_exit():

atexit.register(_python_exit)


class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
Expand All @@ -61,7 +62,17 @@ def run(self):
else:
self.future.set_result(result)

def _worker(executor_reference, work_queue):

def _worker(executor_reference, work_queue, initializer, initargs):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
executor = executor_reference()
if executor is not None:
executor._initializer_failed()
return
try:
while True:
work_item = work_queue.get(block=True)
Expand All @@ -83,18 +94,28 @@ def _worker(executor_reference, work_queue):
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)


class BrokenThreadPool(_base.BrokenExecutor):
"""
Raised when a worker thread in a ThreadPoolExecutor failed initializing.
"""


class ThreadPoolExecutor(_base.Executor):

# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().__next__

def __init__(self, max_workers=None, thread_name_prefix=''):
def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=()):
"""Initializes a new ThreadPoolExecutor instance.

Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
initializer: An callable used to initialize worker threads.
initargs: A tuple of arguments to pass to the initializer.
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
Expand All @@ -103,16 +124,25 @@ def __init__(self, max_workers=None, thread_name_prefix=''):
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")

self._max_workers = max_workers
self._work_queue = queue.Queue()
self._threads = set()
self._broken = False
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))
self._initializer = initializer
self._initargs = initargs

def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)

if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')

Expand All @@ -137,12 +167,27 @@ def weakref_cb(_, q=self._work_queue):
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
self._work_queue,
self._initializer,
self._initargs))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue

def _initializer_failed(self):
with self._shutdown_lock:
self._broken = ('A thread initializer failed, the thread pool '
'is not usable anymore')
# Drain work queue and mark pending futures failed
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.set_exception(BrokenThreadPool(self._broken))

def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
Expand Down
Loading