Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2067,7 +2067,7 @@ Process Pools
One can create a pool of processes which will carry out tasks submitted to it
with the :class:`Pool` class.

.. class:: Pool([processes[, initializer[, initargs[, maxtasksperchild [, context]]]]])
.. class:: Pool([processes[, initializer[, initargs[, expect_initret[, maxtasksperchild[, context]]]]])

A process pool object which controls a pool of worker processes to which jobs
can be submitted. It supports asynchronous results with timeouts and
Expand All @@ -2079,6 +2079,11 @@ with the :class:`Pool` class.
If *initializer* is not ``None`` then each worker process will call
``initializer(*initargs)`` when it starts.

If *expect_initret* is ``True``, the return value of *initializer* will be
passed to each process' applied fuction as a kwarg named `initret`. This
can be used to pass data your Pool workers, as an alternative to declaring
globals.

*maxtasksperchild* is the number of tasks a worker process can complete
before it will exit and be replaced with a fresh worker process, to enable
unused resources to be freed. The default *maxtasksperchild* is ``None``, which
Expand All @@ -2099,6 +2104,9 @@ with the :class:`Pool` class.
.. versionadded:: 3.4
*context*

.. versionadded:: 3.8
*expect_initret*

.. note::

Worker processes within a :class:`Pool` typically live for the complete
Expand Down
8 changes: 4 additions & 4 deletions Lib/multiprocessing/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ def SimpleQueue(self):
from .queues import SimpleQueue
return SimpleQueue(ctx=self.get_context())

def Pool(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
def Pool(self, processes=None, initializer=None,
initargs=(), expect_initret=False, maxtasksperchild=None):
'''Returns a process pool object'''
from .pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild,
context=self.get_context())
return Pool(processes, initializer, initargs, expect_initret,
maxtasksperchild, context=self.get_context())

def RawValue(self, typecode_or_type, *args):
'''Returns a shared object'''
Expand Down
5 changes: 3 additions & 2 deletions Lib/multiprocessing/dummy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ def Manager():
def shutdown():
pass

def Pool(processes=None, initializer=None, initargs=()):
def Pool(processes=None, initializer=None, initargs=(),
expect_initret=False):
from ..pool import ThreadPool
return ThreadPool(processes, initializer, initargs)
return ThreadPool(processes, initializer, initargs, expect_initret)

JoinableQueue = Queue
52 changes: 37 additions & 15 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import threading
import queue
import itertools
import functools
import collections
import os
import time
Expand All @@ -40,11 +41,14 @@

job_counter = itertools.count()

def mapstar(args):
return list(map(*args))
def mapstar(args, **kwargs):
return list(
map(functools.partial(args[0], **kwargs),
*args[1:]))

def starmapstar(args):
return list(itertools.starmap(args[0], args[1]))
def starmapstar(args, **kwargs):
return list(
itertools.starmap(functools.partial(args[0], **kwargs), args[1]))

#
# Hack to embed stringification of remote traceback in local traceback
Expand Down Expand Up @@ -90,19 +94,21 @@ def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self)


def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
wrap_exception=False):
def worker(inqueue, outqueue, initializer=None, initargs=(),
expect_initret=False, maxtasks=None, wrap_exception=False):
if (maxtasks is not None) and not (isinstance(maxtasks, int)
and maxtasks >= 1):
raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))

initret = None
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
inqueue._writer.close()
outqueue._reader.close()

if initializer is not None:
initializer(*initargs)
initret = initializer(*initargs)

completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
Expand All @@ -118,7 +124,11 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,

job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
if expect_initret and func is not _helper_reraises_exception:
result = (
True, func(*args, **kwds, initret=initret))
else:
result = (True, func(*args, **kwds))
except Exception as e:
if wrap_exception and func is not _helper_reraises_exception:
e = ExceptionWithTraceback(e, e.__traceback__)
Expand Down Expand Up @@ -154,14 +164,15 @@ def Process(ctx, *args, **kwds):
return ctx.Process(*args, **kwds)

def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
expect_initret=False, maxtasksperchild=None, context=None):
self._ctx = context or get_context()
self._setup_queues()
self._taskqueue = queue.SimpleQueue()
self._cache = {}
self._state = RUN
self._maxtasksperchild = maxtasksperchild
self._initializer = initializer
self._expect_initret = expect_initret
self._initargs = initargs

if processes is None:
Expand All @@ -171,6 +182,9 @@ def __init__(self, processes=None, initializer=None, initargs=(),

if initializer is not None and not callable(initializer):
raise TypeError('initializer must be a callable')
if initializer is None and self._expect_initret:
raise ValueError(
"initializer can't be None if expect_initret is True")

self._processes = processes
self._pool = []
Expand Down Expand Up @@ -232,8 +246,10 @@ def _repopulate_pool(self):
return self._repopulate_pool_static(self._ctx, self.Process,
self._processes,
self._pool, self._inqueue,
self._outqueue, self._initializer,
self._outqueue,
self._initializer,
self._initargs,
self._expect_initret,
self._maxtasksperchild,
self._wrap_exception)

Expand All @@ -244,11 +260,13 @@ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""

for i in range(processes - len(pool)):
w = Process(ctx, target=worker,
args=(inqueue, outqueue,
initializer,
initargs, maxtasksperchild,
initializer, initargs,
expect_initret,
maxtasksperchild,
wrap_exception)
)
pool.append(w)
Expand All @@ -264,7 +282,10 @@ def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
"""Clean up any exited workers and start replacements for them.
"""
if Pool._join_exited_workers(pool):
Pool._repopulate_pool_static(ctx, Process, processes, pool,
Pool.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. There seems to white spaces in this line and causes make patchcheck to fail in CI. Please remove them or run make patchcheck locally from the source directory that will fix this for you automatically.

Relevant make patchcheck output :

Getting base branch for PR ... upstream/master
Getting the list of files that have been added/changed ... 5 files
Fixing Python file whitespace ... 1 file:
  Lib/multiprocessing/pool.py
Fixing C file whitespace ... 0 files
Fixing docs whitespace ... 0 files
Docs modified ... yes
Misc/ACKS updated ... NO
Misc/NEWS.d updated with `blurb` ... NO
configure regenerated ... not needed
pyconfig.h.in regenerated ... not needed


(ctx, Process, processes, pool,
inqueue, outqueue, initializer,
initargs, maxtasksperchild,
wrap_exception)
Expand Down Expand Up @@ -824,8 +845,9 @@ def Process(ctx, *args, **kwds):
from .dummy import Process
return Process(*args, **kwds)

def __init__(self, processes=None, initializer=None, initargs=()):
Pool.__init__(self, processes, initializer, initargs)
def __init__(self, processes=None, initializer=None,
initargs=(), expect_initret=False):
Pool.__init__(self, processes, initializer, initargs, expect_initret)

def _setup_queues(self):
self._inqueue = queue.SimpleQueue()
Expand Down
Loading