Skip to content
7 changes: 7 additions & 0 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ def shutdown_worker():
# - The executor that owns this worker has been shutdown.
if shutting_down():
try:
# Flag the executor as shutting down as early as possible if it
# is not gc-ed yet.
if executor is not None:
executor._shutdown_thread = True
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
Expand Down Expand Up @@ -593,6 +597,9 @@ def submit(self, fn, *args, **kwargs):
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
if _global_shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')

f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
Expand Down
7 changes: 7 additions & 0 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ def _worker(executor_reference, work_queue, initializer, initargs):
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
# Flag the executor as shutting down as early as possible if it
# is not gc-ed yet.
if executor is not None:
executor._shutdown = True
# Notice other workers
work_queue.put(None)
return
Expand Down Expand Up @@ -145,6 +149,9 @@ def submit(self, fn, *args, **kwargs):

if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
if _shutdown:
raise RuntimeError('cannot schedule new futures after'
'interpreter shutdown')

f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
Expand Down
28 changes: 28 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,34 @@ def test_interpreter_shutdown(self):
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")

def test_submit_after_interpreter_shutdown(self):
# Test the atexit hook for shutdown of worker threads and processes
rc, out, err = assert_python_ok('-c', """if 1:
import atexit
@atexit.register
def run_last():
try:
t.submit(id, None)
except RuntimeError:
print("runtime-error")
raise
from concurrent.futures import {executor_type}
if __name__ == "__main__":
context = '{context}'
if not context:
t = {executor_type}(5)
else:
from multiprocessing import get_context
context = get_context(context)
t = {executor_type}(5, mp_context=context)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the executor is not started before the call in atexit. This causes another set of problem as the sub{Process,Thread} are not started and cleaned properly. For your proposed test, I would add a t.submit(id, 42).result() to be sure that the executor has been started before the atexit calls.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added t.submit(id, 42).result() to the test in d5c28194a0b842833ac4729f5f333ce718120201

t.submit(id, 42).result()
""".format(executor_type=self.executor_type.__name__,
context=getattr(self, "ctx", "")))
# Errors in atexit hooks don't change the process exit code, check
# stderr manually.
self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
self.assertEqual(out.strip(), b"runtime-error")

def test_hang_issue12364(self):
fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
self.executor.shutdown()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Raise RuntimeError when ``executor.submit`` is called during interpreter
shutdown.