Skip to content
Prev Previous commit
Next Next commit
Restore timeout and poll
  • Loading branch information
pablogsal committed Feb 12, 2019
commit 9224c332096c95601ad44298245bc6a6a0d87f25
27 changes: 16 additions & 11 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import time
import traceback
import warnings
from queue import Empty

# If threading is available then ThreadPool should be provided. Therefore
# we avoid top-level imports which are liable to fail on some systems.
Expand Down Expand Up @@ -146,7 +147,13 @@ def _helper_reraises_exception(ex):
# Class representing a process pool
#

class PoolCache(dict):
class _PoolCache(dict):
"""
Class that implements a cache for the Pool class that will notify
the pool management threads every time the cache is emptied. The
notification is done by the use of a queue that is provided when
instantiating the cache.
"""
def __init__(self, *args, notifier=None, **kwds):
self.notifier = notifier
super().__init__(*args, **kwds)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PEP 8, please add an empty line between methods.

Expand Down Expand Up @@ -179,7 +186,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
# when the cache (self._cache) is empty or when ther is a change in

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"when there is"

# the _state variable of the thread that runs _handle_workers.
self._change_notifier = self._ctx.SimpleQueue()
Comment thread
vstinner marked this conversation as resolved.
self._cache = PoolCache(notifier=self._change_notifier)
self._cache = _PoolCache(notifier=self._change_notifier)
self._maxtasksperchild = maxtasksperchild
self._initializer = initializer
self._initargs = initargs
Expand Down Expand Up @@ -465,18 +472,18 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
)
return result

def _wait_for_updates(self, timeout):
def _wait_for_updates(self, timeout=0.2):
task_queue_sentinels = [self._outqueue._reader.fileno(),
self._outqueue._writer.fileno()]
self_notifier_sentinels = [self._change_notifier._reader.fileno()]
worker_sentinels = [worker.sentinel for worker in self._pool]
sentinels = [*task_queue_sentinels,
*worker_sentinels,
*self_notifier_sentinels]
# The timeout in wait() exist to make sure that we do not hang/deadlock
# if there are some edge case/race condition in the self-pipe based solution,
# so we fallback to active polling to maintain backwards compatibility.
wait(sentinels, timeout=timeout)
Comment thread
pablogsal marked this conversation as resolved.
Comment thread
vstinner marked this conversation as resolved.
Outdated

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment on wait() to explain that it completes when at least one sentinel is set and that it's important to not wait until all sentinels completed, but exit frequently to refresh the pool.

This point is non-trivial and it surprised me when I wrote PR #11136, my comment #11136 (comment):

My change doesn't work: self._worker_state_event isn't set when a worker completes, whereas _maintain_pool() should be called frequently to check when a worker completed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't ask to documen the behavior of wait, but more explicit that we stop as soon as the first event complete on purpose.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks obvious to me, especially as the function is named wait_for_updates, but I guess it doesn't hurt to add a comment.

while not self._change_notifier.empty():
self._change_notifier.get()


@staticmethod
def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
Expand All @@ -486,13 +493,11 @@ def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,

# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
while True:
if thread._state != RUN and (not pool._cache or thread._state == TERMINATE):
break
while thread._state == RUN or (cache and thread._state != TERMINATE):
Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception)
pool._wait_for_updates(timeout=0.2)
pool._wait_for_updates()
# send sentinel to stop workers
taskqueue.put(None)
util.debug('worker handler exiting')
Expand Down Expand Up @@ -914,5 +919,5 @@ def _help_stuff_finish(inqueue, task_handler, size):
for i in range(size):
inqueue.put(None)

def _wait_for_updates(self, timeout):
def _wait_for_updates(self, timeout=0.2):
time.sleep(timeout)
Comment thread
pablogsal marked this conversation as resolved.