[Python-checkins] [3.6] bpo-26762, bpo-31019: Backport multiprocessing fixes from master to 3.6 (#2879)

Victor Stinner webhook-mailer at python.org
Tue Jul 25 22:48:58 EDT 2017


https://github.com/python/cpython/commit/d0adfb25c5082046133a18fd185375508c1c334f
commit: d0adfb25c5082046133a18fd185375508c1c334f
branch: 3.6
author: Victor Stinner <victor.stinner at gmail.com>
committer: GitHub <noreply at github.com>
date: 2017-07-26T04:48:56+02:00
summary:

[3.6] bpo-26762, bpo-31019: Backport multiprocessing fixes from master to 3.6 (#2879)

* bpo-26762: Avoid daemon process in _test_multiprocessing (#2842)

test_level() of _test_multiprocessing._TestLogging now uses regular
processes rather than daemon processes to prevent zombi processes
(to not "leak" processes).
(cherry picked from commit 06634950c553f8df83330ed468c11483b857b7dc)

* test_multiprocessing: Fix dangling process/thread (#2850)

bpo-26762: Fix more dangling processes and threads in
test_multiprocessing:

* Queue: call close() followed by join_thread()
* Process: call join() or self.addCleanup(p.join)
(cherry picked from commit d7e64d9934d86aa6173229de5af5fe908662a33a)

* test_multiprocessing detects dangling per test case (#2841)

bpo-26762: test_multiprocessing now detects dangling processes and
threads per test case classes:

* setUpClass()/tearDownClass() of mixin classes now check if
  multiprocessing.process._dangling or threading._dangling was
  modified to detect "dangling" processses and threads.
* ManagerMixin.tearDownClass() now also emits a warning if it still
  has more than one active child process after 5 seconds.
* tearDownModule() now checks for dangling processes and threads
  before sleep 500 ms. And it now only sleeps if there is a least one
  dangling process or thread.
(cherry picked from commit ffb49408f0780ae80a553208aa133bc5bb3ba129)

* bpo-26762: test_multiprocessing close more queues (#2855)

* Close explicitly queues to make sure that we don't leave dangling
  threads
* test_queue_in_process(): remove unused queue
* test_access() joins also the process to fix a random warning
(cherry picked from commit b4c52966c810b5c5e088fceff403247f610b7d13)

* bpo-31019: Fix multiprocessing.Process.is_alive() (#2875)

multiprocessing.Process.is_alive() now removes the process from the
_children set if the process completed.

The change prevents leaking "dangling" processes.
(cherry picked from commit 2db64823c20538a6cfc6033661fab5711d2d4585)

files:
M Lib/multiprocessing/process.py
M Lib/test/_test_multiprocessing.py

diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index f9c22703df2..1d26b5e521e 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -132,10 +132,16 @@ def is_alive(self):
         if self is _current_process:
             return True
         assert self._parent_pid == os.getpid(), 'can only test a child process'
+
         if self._popen is None:
             return False
-        self._popen.poll()
-        return self._popen.returncode is None
+
+        returncode = self._popen.poll()
+        if returncode is None:
+            return True
+        else:
+            _children.discard(self)
+            return False
 
     @property
     def name(self):
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 871a34e4b26..4d3c6558374 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -32,11 +32,12 @@
 # without thread support.
 import threading
 
-import multiprocessing.dummy
 import multiprocessing.connection
-import multiprocessing.managers
+import multiprocessing.dummy
 import multiprocessing.heap
+import multiprocessing.managers
 import multiprocessing.pool
+import multiprocessing.queues
 
 from multiprocessing import util
 
@@ -64,6 +65,13 @@
 def latin(s):
     return s.encode('latin')
 
+
+def close_queue(queue):
+    if isinstance(queue, multiprocessing.queues.Queue):
+        queue.close()
+        queue.join_thread()
+
+
 #
 # Constants
 #
@@ -275,6 +283,7 @@ def test_process(self):
         self.assertEqual(p.exitcode, 0)
         self.assertEqual(p.is_alive(), False)
         self.assertNotIn(p, self.active_children())
+        close_queue(q)
 
     @classmethod
     def _test_terminate(cls):
@@ -414,6 +423,7 @@ def test_lose_target_ref(self):
         p.join()
         self.assertIs(wr(), None)
         self.assertEqual(q.get(), 5)
+        close_queue(q)
 
 
 #
@@ -600,6 +610,7 @@ def test_put(self):
         self.assertEqual(queue_full(queue, MAXSIZE), False)
 
         proc.join()
+        close_queue(queue)
 
     @classmethod
     def _test_get(cls, queue, child_can_start, parent_can_continue):
@@ -662,6 +673,7 @@ def test_get(self):
         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
 
         proc.join()
+        close_queue(queue)
 
     @classmethod
     def _test_fork(cls, queue):
@@ -697,6 +709,7 @@ def test_fork(self):
         self.assertRaises(pyqueue.Empty, queue.get, False)
 
         p.join()
+        close_queue(queue)
 
     def test_qsize(self):
         q = self.Queue()
@@ -712,6 +725,7 @@ def test_qsize(self):
         self.assertEqual(q.qsize(), 1)
         q.get()
         self.assertEqual(q.qsize(), 0)
+        close_queue(q)
 
     @classmethod
     def _test_task_done(cls, q):
@@ -739,6 +753,7 @@ def test_task_done(self):
 
         for p in workers:
             p.join()
+        close_queue(queue)
 
     def test_no_import_lock_contention(self):
         with test.support.temp_cwd():
@@ -769,6 +784,7 @@ def test_timeout(self):
         # Tolerate a delta of 30 ms because of the bad clock resolution on
         # Windows (usually 15.6 ms)
         self.assertGreaterEqual(delta, 0.170)
+        close_queue(q)
 
     def test_queue_feeder_donot_stop_onexc(self):
         # bpo-30414: verify feeder handles exceptions correctly
@@ -782,7 +798,9 @@ def __reduce__(self):
             q = self.Queue()
             q.put(NotSerializable())
             q.put(True)
-            self.assertTrue(q.get(timeout=0.1))
+            # bpo-30595: use a timeout of 1 second for slow buildbots
+            self.assertTrue(q.get(timeout=1.0))
+            close_queue(q)
 
 #
 #
@@ -895,10 +913,12 @@ def test_notify(self):
         p = self.Process(target=self.f, args=(cond, sleeping, woken))
         p.daemon = True
         p.start()
+        self.addCleanup(p.join)
 
         p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
         p.daemon = True
         p.start()
+        self.addCleanup(p.join)
 
         # wait for both children to start sleeping
         sleeping.acquire()
@@ -941,11 +961,13 @@ def test_notify_all(self):
                              args=(cond, sleeping, woken, TIMEOUT1))
             p.daemon = True
             p.start()
+            self.addCleanup(p.join)
 
             t = threading.Thread(target=self.f,
                                  args=(cond, sleeping, woken, TIMEOUT1))
             t.daemon = True
             t.start()
+            self.addCleanup(t.join)
 
         # wait for them all to sleep
         for i in range(6):
@@ -964,10 +986,12 @@ def test_notify_all(self):
             p = self.Process(target=self.f, args=(cond, sleeping, woken))
             p.daemon = True
             p.start()
+            self.addCleanup(p.join)
 
             t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
             t.daemon = True
             t.start()
+            self.addCleanup(t.join)
 
         # wait for them to all sleep
         for i in range(6):
@@ -1143,6 +1167,7 @@ def test_event(self):
         p.daemon = True
         p.start()
         self.assertEqual(wait(), True)
+        p.join()
 
 #
 # Tests for Barrier - adapted from tests in test/lock_tests.py
@@ -1318,6 +1343,7 @@ def test_wait_return(self):
         self.run_threads(self._test_wait_return_f, (self.barrier, queue))
         results = [queue.get() for i in range(self.N)]
         self.assertEqual(results.count(0), 1)
+        close_queue(queue)
 
     @classmethod
     def _test_action_f(cls, barrier, results):
@@ -1488,6 +1514,7 @@ def test_thousand(self):
             p = self.Process(target=self._test_thousand_f,
                            args=(self.barrier, passes, child_conn, lock))
             p.start()
+            self.addCleanup(p.join)
 
         for i in range(passes):
             for j in range(self.N):
@@ -2971,6 +2998,8 @@ def test_access(self):
         w.close()
         self.assertEqual(conn.recv(), 'foobar'*2)
 
+        p.join()
+
 #
 #
 #
@@ -3296,16 +3325,16 @@ def test_level(self):
 
         logger.setLevel(LEVEL1)
         p = self.Process(target=self._test_level, args=(writer,))
-        p.daemon = True
         p.start()
         self.assertEqual(LEVEL1, reader.recv())
+        p.join()
 
         logger.setLevel(logging.NOTSET)
         root_logger.setLevel(LEVEL2)
         p = self.Process(target=self._test_level, args=(writer,))
-        p.daemon = True
         p.start()
         self.assertEqual(LEVEL2, reader.recv())
+        p.join()
 
         root_logger.setLevel(root_level)
         logger.setLevel(level=LOG_LEVEL)
@@ -3459,7 +3488,7 @@ def _this_sub_process(q):
     except pyqueue.Empty:
         pass
 
-def _test_process(q):
+def _test_process():
     queue = multiprocessing.Queue()
     subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
     subProc.daemon = True
@@ -3499,8 +3528,7 @@ def flush(self):
 class TestStdinBadfiledescriptor(unittest.TestCase):
 
     def test_queue_in_process(self):
-        queue = multiprocessing.Queue()
-        proc = multiprocessing.Process(target=_test_process, args=(queue,))
+        proc = multiprocessing.Process(target=_test_process)
         proc.start()
         proc.join()
 
@@ -4108,7 +4136,32 @@ def test_empty(self):
 # Mixins
 #
 
-class ProcessesMixin(object):
+class BaseMixin(object):
+    @classmethod
+    def setUpClass(cls):
+        cls.dangling = (multiprocessing.process._dangling.copy(),
+                        threading._dangling.copy())
+
+    @classmethod
+    def tearDownClass(cls):
+        # bpo-26762: Some multiprocessing objects like Pool create reference
+        # cycles. Trigger a garbage collection to break these cycles.
+        test.support.gc_collect()
+
+        processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
+        if processes:
+            print('Warning -- Dangling processes: %s' % processes,
+                  file=sys.stderr)
+        processes = None
+
+        threads = set(threading._dangling) - set(cls.dangling[1])
+        if threads:
+            print('Warning -- Dangling threads: %s' % threads,
+                  file=sys.stderr)
+        threads = None
+
+
+class ProcessesMixin(BaseMixin):
     TYPE = 'processes'
     Process = multiprocessing.Process
     connection = multiprocessing.connection
@@ -4131,7 +4184,7 @@ class ProcessesMixin(object):
     RawArray = staticmethod(multiprocessing.RawArray)
 
 
-class ManagerMixin(object):
+class ManagerMixin(BaseMixin):
     TYPE = 'manager'
     Process = multiprocessing.Process
     Queue = property(operator.attrgetter('manager.Queue'))
@@ -4155,6 +4208,7 @@ def Pool(cls, *args, **kwds):
 
     @classmethod
     def setUpClass(cls):
+        super().setUpClass()
         cls.manager = multiprocessing.Manager()
 
     @classmethod
@@ -4162,23 +4216,35 @@ def tearDownClass(cls):
         # only the manager process should be returned by active_children()
         # but this can take a bit on slow machines, so wait a few seconds
         # if there are other children too (see #17395)
+        start_time = time.monotonic()
         t = 0.01
-        while len(multiprocessing.active_children()) > 1 and t < 5:
+        while len(multiprocessing.active_children()) > 1:
             time.sleep(t)
             t *= 2
+            dt = time.monotonic() - start_time
+            if dt >= 5.0:
+                print("Warning -- multiprocessing.Manager still has %s active "
+                      "children after %s seconds"
+                      % (multiprocessing.active_children(), dt),
+                      file=sys.stderr)
+                break
+
         gc.collect()                       # do garbage collection
         if cls.manager._number_of_objects() != 0:
             # This is not really an error since some tests do not
             # ensure that all processes which hold a reference to a
             # managed object have been joined.
-            print('Shared objects which still exist at manager shutdown:')
+            print('Warning -- Shared objects which still exist at manager '
+                  'shutdown:')
             print(cls.manager._debug_info())
         cls.manager.shutdown()
         cls.manager.join()
         cls.manager = None
 
+        super().tearDownClass()
+
 
-class ThreadsMixin(object):
+class ThreadsMixin(BaseMixin):
     TYPE = 'threads'
     Process = multiprocessing.dummy.Process
     connection = multiprocessing.dummy.connection
@@ -4255,18 +4321,33 @@ def setUpModule():
         multiprocessing.get_logger().setLevel(LOG_LEVEL)
 
     def tearDownModule():
+        need_sleep = False
+
+        # bpo-26762: Some multiprocessing objects like Pool create reference
+        # cycles. Trigger a garbage collection to break these cycles.
+        test.support.gc_collect()
+
         multiprocessing.set_start_method(old_start_method[0], force=True)
         # pause a bit so we don't get warning about dangling threads/processes
-        time.sleep(0.5)
+        processes = set(multiprocessing.process._dangling) - set(dangling[0])
+        if processes:
+            need_sleep = True
+            print('Warning -- Dangling processes: %s' % processes,
+                  file=sys.stderr)
+        processes = None
+
+        threads = set(threading._dangling) - set(dangling[1])
+        if threads:
+            need_sleep = True
+            print('Warning -- Dangling threads: %s' % threads,
+                  file=sys.stderr)
+        threads = None
+
+        # Sleep 500 ms to give time to child processes to complete.
+        if need_sleep:
+            time.sleep(0.5)
         multiprocessing.process._cleanup()
-        gc.collect()
-        tmp = set(multiprocessing.process._dangling) - set(dangling[0])
-        if tmp:
-            print('Dangling processes:', tmp, file=sys.stderr)
-        del tmp
-        tmp = set(threading._dangling) - set(dangling[1])
-        if tmp:
-            print('Dangling threads:', tmp, file=sys.stderr)
+        test.support.gc_collect()
 
     remote_globs['setUpModule'] = setUpModule
     remote_globs['tearDownModule'] = tearDownModule



More information about the Python-checkins mailing list