-
-
Notifications
You must be signed in to change notification settings - Fork 34.8k
bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool #11488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
b5c7d83
839f2e2
46d9625
9224c33
e1ee023
8d72dc8
ab44556
41cf470
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import time | ||
| import traceback | ||
| import warnings | ||
| from queue import Empty | ||
|
|
||
| # If threading is available then ThreadPool should be provided. Therefore | ||
| # we avoid top-level imports which are liable to fail on some systems. | ||
|
|
@@ -146,7 +147,13 @@ def _helper_reraises_exception(ex): | |
| # Class representing a process pool | ||
| # | ||
|
|
||
| class PoolCache(dict): | ||
| class _PoolCache(dict): | ||
| """ | ||
| Class that implements a cache for the Pool class that will notify | ||
| the pool management threads every time the cache is emptied. The | ||
| notification is done by the use of a queue that is provided when | ||
| instantiating the cache. | ||
| """ | ||
| def __init__(self, *args, notifier=None, **kwds): | ||
| self.notifier = notifier | ||
| super().__init__(*args, **kwds) | ||
|
|
@@ -179,7 +186,7 @@ def __init__(self, processes=None, initializer=None, initargs=(), | |
| # when the cache (self._cache) is empty or when ther is a change in | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "when there is" |
||
| # the _state variable of the thread that runs _handle_workers. | ||
| self._change_notifier = self._ctx.SimpleQueue() | ||
|
vstinner marked this conversation as resolved.
|
||
| self._cache = PoolCache(notifier=self._change_notifier) | ||
| self._cache = _PoolCache(notifier=self._change_notifier) | ||
| self._maxtasksperchild = maxtasksperchild | ||
| self._initializer = initializer | ||
| self._initargs = initargs | ||
|
|
@@ -465,18 +472,18 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, | |
| ) | ||
| return result | ||
|
|
||
| def _wait_for_updates(self, timeout): | ||
| def _wait_for_updates(self, timeout=0.2): | ||
| task_queue_sentinels = [self._outqueue._reader.fileno(), | ||
| self._outqueue._writer.fileno()] | ||
| self_notifier_sentinels = [self._change_notifier._reader.fileno()] | ||
| worker_sentinels = [worker.sentinel for worker in self._pool] | ||
| sentinels = [*task_queue_sentinels, | ||
| *worker_sentinels, | ||
| *self_notifier_sentinels] | ||
| # The timeout in wait() exist to make sure that we do not hang/deadlock | ||
| # if there are some edge case/race condition in the self-pipe based solution, | ||
| # so we fallback to active polling to maintain backwards compatibility. | ||
| wait(sentinels, timeout=timeout) | ||
|
pablogsal marked this conversation as resolved.
vstinner marked this conversation as resolved.
Outdated
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please add a comment on wait() to explain that it completes when at least one sentinel is set and that it's important to not wait until all sentinels completed, but exit frequently to refresh the pool. This point is non-trivial and it surprised me when I wrote PR #11136, my comment #11136 (comment):
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't ask to documen the behavior of wait, but more explicit that we stop as soon as the first event complete on purpose.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks obvious to me, especially as the function is named |
||
| while not self._change_notifier.empty(): | ||
| self._change_notifier.get() | ||
|
|
||
|
|
||
| @staticmethod | ||
| def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, | ||
|
|
@@ -486,13 +493,11 @@ def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, | |
|
|
||
| # Keep maintaining workers until the cache gets drained, unless the pool | ||
| # is terminated. | ||
| while True: | ||
| if thread._state != RUN and (not pool._cache or thread._state == TERMINATE): | ||
| break | ||
| while thread._state == RUN or (cache and thread._state != TERMINATE): | ||
| Pool._maintain_pool(ctx, Process, processes, pool, inqueue, | ||
| outqueue, initializer, initargs, | ||
| maxtasksperchild, wrap_exception) | ||
| pool._wait_for_updates(timeout=0.2) | ||
| pool._wait_for_updates() | ||
| # send sentinel to stop workers | ||
| taskqueue.put(None) | ||
| util.debug('worker handler exiting') | ||
|
|
@@ -914,5 +919,5 @@ def _help_stuff_finish(inqueue, task_handler, size): | |
| for i in range(size): | ||
| inqueue.put(None) | ||
|
|
||
| def _wait_for_updates(self, timeout): | ||
| def _wait_for_updates(self, timeout=0.2): | ||
| time.sleep(timeout) | ||
|
pablogsal marked this conversation as resolved.
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PEP 8, please add an empty line between methods.