Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 33 additions & 31 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import struct
import operator
import weakref
import test.support
from test import support
import test.support.script_helper


# Skip tests if _multiprocessing wasn't built.
_multiprocessing = test.support.import_module('_multiprocessing')
_multiprocessing = support.import_module('_multiprocessing')
# Skip tests if sem_open implementation is broken.
test.support.import_module('multiprocessing.synchronize')
support.import_module('multiprocessing.synchronize')
# import threading after _multiprocessing to raise a more relevant error
# message: "No module named _multiprocessing". _multiprocessing is not compiled
# without thread support.
Expand Down Expand Up @@ -567,8 +567,8 @@ def test_stderr_flush(self):
if self.TYPE == "threads":
self.skipTest('test not appropriate for {}'.format(self.TYPE))

testfn = test.support.TESTFN
self.addCleanup(test.support.unlink, testfn)
testfn = support.TESTFN
self.addCleanup(support.unlink, testfn)
proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
proc.start()
proc.join()
Expand Down Expand Up @@ -597,8 +597,8 @@ def test_sys_exit(self):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))

testfn = test.support.TESTFN
self.addCleanup(test.support.unlink, testfn)
testfn = support.TESTFN
self.addCleanup(support.unlink, testfn)

for reason in (
[1, 2, 3],
Expand Down Expand Up @@ -853,7 +853,7 @@ def test_task_done(self):
close_queue(queue)

def test_no_import_lock_contention(self):
with test.support.temp_cwd():
with support.temp_cwd():
module_name = 'imported_by_an_imported_module'
with open(module_name + '.py', 'w') as f:
f.write("""if 1:
Expand All @@ -866,7 +866,7 @@ def test_no_import_lock_contention(self):
del q
""")

with test.support.DirsOnSysPath(os.getcwd()):
with support.DirsOnSysPath(os.getcwd()):
try:
__import__(module_name)
except pyqueue.Empty:
Expand All @@ -891,7 +891,7 @@ def test_queue_feeder_donot_stop_onexc(self):
class NotSerializable(object):
def __reduce__(self):
raise AttributeError
with test.support.captured_stderr():
with support.captured_stderr():
q = self.Queue()
q.put(NotSerializable())
q.put(True)
Expand Down Expand Up @@ -2194,7 +2194,7 @@ def test_traceback(self):
self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
self.assertIn('raise RuntimeError(123) # some comment', cause.tb)

with test.support.captured_stderr() as f1:
with support.captured_stderr() as f1:
try:
raise exc
except RuntimeError:
Expand Down Expand Up @@ -2476,7 +2476,7 @@ def test_remote(self):
authkey = os.urandom(32)

manager = QueueManager(
address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
address=(support.HOST, 0), authkey=authkey, serializer=SERIALIZER
)
manager.start()

Expand Down Expand Up @@ -2513,7 +2513,7 @@ def _putter(cls, address, authkey):
def test_rapid_restart(self):
authkey = os.urandom(32)
manager = QueueManager(
address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
address=(support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
srvr = manager.get_server()
addr = srvr.address
# Close the connection.Listener socket which gets opened as a part
Expand Down Expand Up @@ -2736,14 +2736,14 @@ def test_fd_transfer(self):
p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
p.daemon = True
p.start()
self.addCleanup(test.support.unlink, test.support.TESTFN)
with open(test.support.TESTFN, "wb") as f:
self.addCleanup(support.unlink, support.TESTFN)
with open(support.TESTFN, "wb") as f:
fd = f.fileno()
if msvcrt:
fd = msvcrt.get_osfhandle(fd)
reduction.send_handle(conn, fd, p.pid)
p.join()
with open(test.support.TESTFN, "rb") as f:
with open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"foo")

@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Expand All @@ -2762,8 +2762,8 @@ def test_large_fd_transfer(self):
p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
p.daemon = True
p.start()
self.addCleanup(test.support.unlink, test.support.TESTFN)
with open(test.support.TESTFN, "wb") as f:
self.addCleanup(support.unlink, support.TESTFN)
with open(support.TESTFN, "wb") as f:
fd = f.fileno()
for newfd in range(256, MAXFD):
if not self._is_fd_assigned(newfd):
Expand All @@ -2776,7 +2776,7 @@ def test_large_fd_transfer(self):
finally:
os.close(newfd)
p.join()
with open(test.support.TESTFN, "rb") as f:
with open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"bar")

@classmethod
Expand Down Expand Up @@ -2986,7 +2986,7 @@ def _listener(cls, conn, families):
l.close()

l = socket.socket()
l.bind((test.support.HOST, 0))
l.bind((support.HOST, 0))
l.listen()
conn.send(l.getsockname())
new_conn, addr = l.accept()
Expand Down Expand Up @@ -3336,7 +3336,7 @@ def make_finalizers():
gc.set_threshold(5, 5, 5)
threads = [threading.Thread(target=run_finalizers),
threading.Thread(target=make_finalizers)]
with test.support.start_threads(threads):
with support.start_threads(threads):
time.sleep(4.0) # Wait a bit to trigger race condition
finish = True
if exc is not None:
Expand Down Expand Up @@ -3697,7 +3697,7 @@ def _child_test_wait_socket(cls, address, slow):
def test_wait_socket(self, slow=False):
from multiprocessing.connection import wait
l = socket.socket()
l.bind((test.support.HOST, 0))
l.bind((support.HOST, 0))
l.listen()
addr = l.getsockname()
readers = []
Expand Down Expand Up @@ -3910,11 +3910,11 @@ def test_noforkbomb(self):
sm = multiprocessing.get_start_method()
name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
if sm != 'fork':
rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
rc, out, err = support.script_helper.assert_python_failure(name, sm)
self.assertEqual(out, b'')
self.assertIn(b'RuntimeError', err)
else:
rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
rc, out, err = support.script_helper.assert_python_ok(name, sm)
self.assertEqual(out.rstrip(), b'123')
self.assertEqual(err, b'')

Expand Down Expand Up @@ -4021,6 +4021,9 @@ def test_closefd(self):

class TestIgnoreEINTR(unittest.TestCase):

# Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)

@classmethod
def _test_ignore(cls, conn):
def handler(signum, frame):
Expand All @@ -4029,7 +4032,7 @@ def handler(signum, frame):
conn.send('ready')
x = conn.recv()
conn.send(x)
conn.send_bytes(b'x' * test.support.PIPE_MAX_SIZE)
conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)

@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
def test_ignore(self):
Expand All @@ -4048,8 +4051,7 @@ def test_ignore(self):
self.assertEqual(conn.recv(), 1234)
time.sleep(0.1)
os.kill(p.pid, signal.SIGUSR1)
self.assertEqual(conn.recv_bytes(),
b'x' * test.support.PIPE_MAX_SIZE)
self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
time.sleep(0.1)
p.join()
finally:
Expand Down Expand Up @@ -4145,7 +4147,7 @@ def test_preload_resources(self):
if multiprocessing.get_start_method() != 'forkserver':
self.skipTest("test only relevant for 'forkserver' method")
name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
rc, out, err = test.support.script_helper.assert_python_ok(name)
rc, out, err = support.script_helper.assert_python_ok(name)
out = out.decode()
err = err.decode()
if out.rstrip() != 'ok' or err != '':
Expand Down Expand Up @@ -4279,7 +4281,7 @@ def setUpClass(cls):
def tearDownClass(cls):
# bpo-26762: Some multiprocessing objects like Pool create reference
# cycles. Trigger a garbage collection to break these cycles.
test.support.gc_collect()
support.gc_collect()

processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
if processes:
Expand Down Expand Up @@ -4458,7 +4460,7 @@ def tearDownModule():

# bpo-26762: Some multiprocessing objects like Pool create reference
# cycles. Trigger a garbage collection to break these cycles.
test.support.gc_collect()
support.gc_collect()

multiprocessing.set_start_method(old_start_method[0], force=True)
# pause a bit so we don't get warning about dangling threads/processes
Expand All @@ -4480,7 +4482,7 @@ def tearDownModule():
if need_sleep:
time.sleep(0.5)
multiprocessing.process._cleanup()
test.support.gc_collect()
support.gc_collect()

remote_globs['setUpModule'] = setUpModule
remote_globs['tearDownModule'] = tearDownModule