[Python-checkins] bpo-33532: Fix test_multiprocessing_forkserver.test_ignore() (GH-7322)

Victor Stinner webhook-mailer at python.org
Fri Jun 1 13:39:58 EDT 2018


https://github.com/python/cpython/commit/64e538bc703e423a04ab435c4eab6b950b8aef7e
commit: 64e538bc703e423a04ab435c4eab6b950b8aef7e
branch: 3.6
author: Victor Stinner <vstinner at redhat.com>
committer: GitHub <noreply at github.com>
date: 2018-06-01T19:39:56+02:00
summary:

bpo-33532: Fix test_multiprocessing_forkserver.test_ignore() (GH-7322)

Use also support.SOCK_MAX_SIZE, not only support.PIPE_MAX_SIZE, to
get the size for a blocking send into a multiprocessing pipe.

Replace also test.support with support.

files:
M Lib/test/_test_multiprocessing.py

diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index d7eb69bb8b8f..0a82026f4d21 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -20,14 +20,14 @@
 import struct
 import operator
 import weakref
-import test.support
+from test import support
 import test.support.script_helper
 
 
 # Skip tests if _multiprocessing wasn't built.
-_multiprocessing = test.support.import_module('_multiprocessing')
+_multiprocessing = support.import_module('_multiprocessing')
 # Skip tests if sem_open implementation is broken.
-test.support.import_module('multiprocessing.synchronize')
+support.import_module('multiprocessing.synchronize')
 # import threading after _multiprocessing to raise a more relevant error
 # message: "No module named _multiprocessing". _multiprocessing is not compiled
 # without thread support.
@@ -567,8 +567,8 @@ def test_stderr_flush(self):
         if self.TYPE == "threads":
             self.skipTest('test not appropriate for {}'.format(self.TYPE))
 
-        testfn = test.support.TESTFN
-        self.addCleanup(test.support.unlink, testfn)
+        testfn = support.TESTFN
+        self.addCleanup(support.unlink, testfn)
         proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
         proc.start()
         proc.join()
@@ -597,8 +597,8 @@ def test_sys_exit(self):
         if self.TYPE == 'threads':
             self.skipTest('test not appropriate for {}'.format(self.TYPE))
 
-        testfn = test.support.TESTFN
-        self.addCleanup(test.support.unlink, testfn)
+        testfn = support.TESTFN
+        self.addCleanup(support.unlink, testfn)
 
         for reason in (
             [1, 2, 3],
@@ -853,7 +853,7 @@ def test_task_done(self):
         close_queue(queue)
 
     def test_no_import_lock_contention(self):
-        with test.support.temp_cwd():
+        with support.temp_cwd():
             module_name = 'imported_by_an_imported_module'
             with open(module_name + '.py', 'w') as f:
                 f.write("""if 1:
@@ -866,7 +866,7 @@ def test_no_import_lock_contention(self):
                     del q
                 """)
 
-            with test.support.DirsOnSysPath(os.getcwd()):
+            with support.DirsOnSysPath(os.getcwd()):
                 try:
                     __import__(module_name)
                 except pyqueue.Empty:
@@ -891,7 +891,7 @@ def test_queue_feeder_donot_stop_onexc(self):
         class NotSerializable(object):
             def __reduce__(self):
                 raise AttributeError
-        with test.support.captured_stderr():
+        with support.captured_stderr():
             q = self.Queue()
             q.put(NotSerializable())
             q.put(True)
@@ -2194,7 +2194,7 @@ def test_traceback(self):
             self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
             self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
 
-            with test.support.captured_stderr() as f1:
+            with support.captured_stderr() as f1:
                 try:
                     raise exc
                 except RuntimeError:
@@ -2476,7 +2476,7 @@ def test_remote(self):
         authkey = os.urandom(32)
 
         manager = QueueManager(
-            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
+            address=(support.HOST, 0), authkey=authkey, serializer=SERIALIZER
             )
         manager.start()
 
@@ -2513,7 +2513,7 @@ def _putter(cls, address, authkey):
     def test_rapid_restart(self):
         authkey = os.urandom(32)
         manager = QueueManager(
-            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
+            address=(support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
         srvr = manager.get_server()
         addr = srvr.address
         # Close the connection.Listener socket which gets opened as a part
@@ -2736,14 +2736,14 @@ def test_fd_transfer(self):
         p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
         p.daemon = True
         p.start()
-        self.addCleanup(test.support.unlink, test.support.TESTFN)
-        with open(test.support.TESTFN, "wb") as f:
+        self.addCleanup(support.unlink, support.TESTFN)
+        with open(support.TESTFN, "wb") as f:
             fd = f.fileno()
             if msvcrt:
                 fd = msvcrt.get_osfhandle(fd)
             reduction.send_handle(conn, fd, p.pid)
         p.join()
-        with open(test.support.TESTFN, "rb") as f:
+        with open(support.TESTFN, "rb") as f:
             self.assertEqual(f.read(), b"foo")
 
     @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
@@ -2762,8 +2762,8 @@ def test_large_fd_transfer(self):
         p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
         p.daemon = True
         p.start()
-        self.addCleanup(test.support.unlink, test.support.TESTFN)
-        with open(test.support.TESTFN, "wb") as f:
+        self.addCleanup(support.unlink, support.TESTFN)
+        with open(support.TESTFN, "wb") as f:
             fd = f.fileno()
             for newfd in range(256, MAXFD):
                 if not self._is_fd_assigned(newfd):
@@ -2776,7 +2776,7 @@ def test_large_fd_transfer(self):
             finally:
                 os.close(newfd)
         p.join()
-        with open(test.support.TESTFN, "rb") as f:
+        with open(support.TESTFN, "rb") as f:
             self.assertEqual(f.read(), b"bar")
 
     @classmethod
@@ -2986,7 +2986,7 @@ def _listener(cls, conn, families):
             l.close()
 
         l = socket.socket()
-        l.bind((test.support.HOST, 0))
+        l.bind((support.HOST, 0))
         l.listen()
         conn.send(l.getsockname())
         new_conn, addr = l.accept()
@@ -3336,7 +3336,7 @@ def make_finalizers():
             gc.set_threshold(5, 5, 5)
             threads = [threading.Thread(target=run_finalizers),
                        threading.Thread(target=make_finalizers)]
-            with test.support.start_threads(threads):
+            with support.start_threads(threads):
                 time.sleep(4.0)  # Wait a bit to trigger race condition
                 finish = True
             if exc is not None:
@@ -3697,7 +3697,7 @@ def _child_test_wait_socket(cls, address, slow):
     def test_wait_socket(self, slow=False):
         from multiprocessing.connection import wait
         l = socket.socket()
-        l.bind((test.support.HOST, 0))
+        l.bind((support.HOST, 0))
         l.listen()
         addr = l.getsockname()
         readers = []
@@ -3910,11 +3910,11 @@ def test_noforkbomb(self):
         sm = multiprocessing.get_start_method()
         name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
         if sm != 'fork':
-            rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
+            rc, out, err = support.script_helper.assert_python_failure(name, sm)
             self.assertEqual(out, b'')
             self.assertIn(b'RuntimeError', err)
         else:
-            rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
+            rc, out, err = support.script_helper.assert_python_ok(name, sm)
             self.assertEqual(out.rstrip(), b'123')
             self.assertEqual(err, b'')
 
@@ -4021,6 +4021,9 @@ def test_closefd(self):
 
 class TestIgnoreEINTR(unittest.TestCase):
 
+    # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
+    CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
+
     @classmethod
     def _test_ignore(cls, conn):
         def handler(signum, frame):
@@ -4029,7 +4032,7 @@ def handler(signum, frame):
         conn.send('ready')
         x = conn.recv()
         conn.send(x)
-        conn.send_bytes(b'x' * test.support.PIPE_MAX_SIZE)
+        conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
 
     @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
     def test_ignore(self):
@@ -4048,8 +4051,7 @@ def test_ignore(self):
             self.assertEqual(conn.recv(), 1234)
             time.sleep(0.1)
             os.kill(p.pid, signal.SIGUSR1)
-            self.assertEqual(conn.recv_bytes(),
-                             b'x' * test.support.PIPE_MAX_SIZE)
+            self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
             time.sleep(0.1)
             p.join()
         finally:
@@ -4145,7 +4147,7 @@ def test_preload_resources(self):
         if multiprocessing.get_start_method() != 'forkserver':
             self.skipTest("test only relevant for 'forkserver' method")
         name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
-        rc, out, err = test.support.script_helper.assert_python_ok(name)
+        rc, out, err = support.script_helper.assert_python_ok(name)
         out = out.decode()
         err = err.decode()
         if out.rstrip() != 'ok' or err != '':
@@ -4279,7 +4281,7 @@ def setUpClass(cls):
     def tearDownClass(cls):
         # bpo-26762: Some multiprocessing objects like Pool create reference
         # cycles. Trigger a garbage collection to break these cycles.
-        test.support.gc_collect()
+        support.gc_collect()
 
         processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
         if processes:
@@ -4458,7 +4460,7 @@ def tearDownModule():
 
         # bpo-26762: Some multiprocessing objects like Pool create reference
         # cycles. Trigger a garbage collection to break these cycles.
-        test.support.gc_collect()
+        support.gc_collect()
 
         multiprocessing.set_start_method(old_start_method[0], force=True)
         # pause a bit so we don't get warning about dangling threads/processes
@@ -4480,7 +4482,7 @@ def tearDownModule():
         if need_sleep:
             time.sleep(0.5)
         multiprocessing.process._cleanup()
-        test.support.gc_collect()
+        support.gc_collect()
 
     remote_globs['setUpModule'] = setUpModule
     remote_globs['tearDownModule'] = tearDownModule



More information about the Python-checkins mailing list