[Python-checkins] cpython: Use setUpClass() and tearDownClass() in test_multiprocessing.

richard.oudkerk python-checkins at python.org
Mon Oct 8 15:59:17 CEST 2012


http://hg.python.org/cpython/rev/dd5e98ddcd39
changeset:   79595:dd5e98ddcd39
user:        Richard Oudkerk <shibturn at gmail.com>
date:        Mon Oct 08 14:56:24 2012 +0100
summary:
  Use setUpClass() and tearDownClass() in test_multiprocessing.

Each manager test class now uses a separate manager.  Also, process
pools are no longer created before starting any tests.

Note that warnings are written if the manager for a test case still
has live objects when it is shutdown.  This is true for a few test cases
which fail to wait for all child processes to end.

files:
  Lib/test/test_multiprocessing.py |  172 +++++++++---------
  1 files changed, 87 insertions(+), 85 deletions(-)


diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -19,6 +19,7 @@
 import random
 import logging
 import struct
+import operator
 import test.support
 import test.script_helper
 
@@ -1617,6 +1618,18 @@
 
 class _TestPool(BaseTestCase):
 
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+        cls.pool = cls.Pool(4)
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.pool.terminate()
+        cls.pool.join()
+        cls.pool = None
+        super().tearDownClass()
+
     def test_apply(self):
         papply = self.pool.apply
         self.assertEqual(papply(sqr, (5,)), sqr(5))
@@ -1691,15 +1704,6 @@
         p.join()
 
     def test_terminate(self):
-        if self.TYPE == 'manager':
-            # On Unix a forked process increfs each shared object to
-            # which its parent process held a reference.  If the
-            # forked process gets terminated then there is likely to
-            # be a reference leak.  So to prevent
-            # _TestZZZNumberOfObjects from failing we skip this test
-            # when using a manager.
-            return
-
         result = self.pool.map_async(
             time.sleep, [0.1 for i in range(10000)], chunksize=1
             )
@@ -1821,35 +1825,6 @@
         for (j, res) in enumerate(results):
             self.assertEqual(res.get(), sqr(j))
 
-
-#
-# Test that manager has expected number of shared objects left
-#
-
-class _TestZZZNumberOfObjects(BaseTestCase):
-    # Because test cases are sorted alphabetically, this one will get
-    # run after all the other tests for the manager.  It tests that
-    # there have been no "reference leaks" for the manager's shared
-    # objects.  Note the comment in _TestPool.test_terminate().
-
-    # If some other test using ManagerMixin.manager fails, then the
-    # raised exception may keep alive a frame which holds a reference
-    # to a managed object.  This will cause test_number_of_objects to
-    # also fail.
-    ALLOWED_TYPES = ('manager',)
-
-    def test_number_of_objects(self):
-        EXPECTED_NUMBER = 1                # the pool object is still alive
-        multiprocessing.active_children()  # discard dead process objs
-        gc.collect()                       # do garbage collection
-        refs = self.manager._number_of_objects()
-        debug_info = self.manager._debug_info()
-        if refs != EXPECTED_NUMBER:
-            print(self.manager._debug_info())
-            print(debug_info)
-
-        self.assertEqual(refs, EXPECTED_NUMBER)
-
 #
 # Test of creating a customized manager class
 #
@@ -2886,15 +2861,6 @@
 # Functions used to create test cases from the base ones in this module
 #
 
-def get_attributes(Source, names):
-    d = {}
-    for name in names:
-        obj = getattr(Source, name)
-        if type(obj) == type(get_attributes):
-            obj = staticmethod(obj)
-        d[name] = obj
-    return d
-
 def create_test_cases(Mixin, type):
     result = {}
     glob = globals()
@@ -2907,10 +2873,10 @@
             assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
             if type in base.ALLOWED_TYPES:
                 newname = 'With' + Type + name[1:]
-                class Temp(base, unittest.TestCase, Mixin):
+                class Temp(base, Mixin, unittest.TestCase):
                     pass
                 result[newname] = Temp
-                Temp.__name__ = newname
+                Temp.__name__ = Temp.__qualname__ = newname
                 Temp.__module__ = Mixin.__module__
     return result
 
@@ -2921,12 +2887,24 @@
 class ProcessesMixin(object):
     TYPE = 'processes'
     Process = multiprocessing.Process
-    locals().update(get_attributes(multiprocessing, (
-        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
-        'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue',
-        'RawArray', 'current_process', 'active_children', 'Pipe',
-        'connection', 'JoinableQueue', 'Pool'
-        )))
+    connection = multiprocessing.connection
+    current_process = staticmethod(multiprocessing.current_process)
+    active_children = staticmethod(multiprocessing.active_children)
+    Pool = staticmethod(multiprocessing.Pool)
+    Pipe = staticmethod(multiprocessing.Pipe)
+    Queue = staticmethod(multiprocessing.Queue)
+    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
+    Lock = staticmethod(multiprocessing.Lock)
+    RLock = staticmethod(multiprocessing.RLock)
+    Semaphore = staticmethod(multiprocessing.Semaphore)
+    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
+    Condition = staticmethod(multiprocessing.Condition)
+    Event = staticmethod(multiprocessing.Event)
+    Barrier = staticmethod(multiprocessing.Barrier)
+    Value = staticmethod(multiprocessing.Value)
+    Array = staticmethod(multiprocessing.Array)
+    RawValue = staticmethod(multiprocessing.RawValue)
+    RawArray = staticmethod(multiprocessing.RawArray)
 
 testcases_processes = create_test_cases(ProcessesMixin, type='processes')
 globals().update(testcases_processes)
@@ -2935,12 +2913,42 @@
 class ManagerMixin(object):
     TYPE = 'manager'
     Process = multiprocessing.Process
-    manager = object.__new__(multiprocessing.managers.SyncManager)
-    locals().update(get_attributes(manager, (
-        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
-        'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict',
-        'Namespace', 'JoinableQueue', 'Pool'
-        )))
+    Queue = property(operator.attrgetter('manager.Queue'))
+    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
+    Lock = property(operator.attrgetter('manager.Lock'))
+    RLock = property(operator.attrgetter('manager.RLock'))
+    Semaphore = property(operator.attrgetter('manager.Semaphore'))
+    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
+    Condition = property(operator.attrgetter('manager.Condition'))
+    Event = property(operator.attrgetter('manager.Event'))
+    Barrier = property(operator.attrgetter('manager.Barrier'))
+    Value = property(operator.attrgetter('manager.Value'))
+    Array = property(operator.attrgetter('manager.Array'))
+    list = property(operator.attrgetter('manager.list'))
+    dict = property(operator.attrgetter('manager.dict'))
+    Namespace = property(operator.attrgetter('manager.Namespace'))
+
+    @classmethod
+    def Pool(cls, *args, **kwds):
+        return cls.manager.Pool(*args, **kwds)
+
+    @classmethod
+    def setUpClass(cls):
+        cls.manager = multiprocessing.Manager()
+
+    @classmethod
+    def tearDownClass(cls):
+        multiprocessing.active_children()  # discard dead process objs
+        gc.collect()                       # do garbage collection
+        if cls.manager._number_of_objects() != 0:
+            # This is not really an error since some tests do not
+            # ensure that all processes which hold a reference to a
+            # managed object have been joined.
+            print('Shared objects which still exist at manager shutdown:')
+            print(cls.manager._debug_info())
+        cls.manager.shutdown()
+        cls.manager.join()
+        cls.manager = None
 
 testcases_manager = create_test_cases(ManagerMixin, type='manager')
 globals().update(testcases_manager)
@@ -2949,16 +2957,27 @@
 class ThreadsMixin(object):
     TYPE = 'threads'
     Process = multiprocessing.dummy.Process
-    locals().update(get_attributes(multiprocessing.dummy, (
-        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
-        'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process',
-        'active_children', 'Pipe', 'connection', 'dict', 'list',
-        'Namespace', 'JoinableQueue', 'Pool'
-        )))
+    connection = multiprocessing.dummy.connection
+    current_process = staticmethod(multiprocessing.dummy.current_process)
+    active_children = staticmethod(multiprocessing.dummy.active_children)
+    Pool = staticmethod(multiprocessing.Pool)
+    Pipe = staticmethod(multiprocessing.dummy.Pipe)
+    Queue = staticmethod(multiprocessing.dummy.Queue)
+    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
+    Lock = staticmethod(multiprocessing.dummy.Lock)
+    RLock = staticmethod(multiprocessing.dummy.RLock)
+    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
+    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
+    Condition = staticmethod(multiprocessing.dummy.Condition)
+    Event = staticmethod(multiprocessing.dummy.Event)
+    Barrier = staticmethod(multiprocessing.dummy.Barrier)
+    Value = staticmethod(multiprocessing.dummy.Value)
+    Array = staticmethod(multiprocessing.dummy.Array)
 
 testcases_threads = create_test_cases(ThreadsMixin, type='threads')
 globals().update(testcases_threads)
 
+
 class OtherTest(unittest.TestCase):
     # TODO: add more tests for deliver/answer challenge.
     def test_deliver_challenge_auth_failure(self):
@@ -3390,12 +3409,6 @@
 
     multiprocessing.get_logger().setLevel(LOG_LEVEL)
 
-    ProcessesMixin.pool = multiprocessing.Pool(4)
-    ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
-    ManagerMixin.manager.__init__()
-    ManagerMixin.manager.start()
-    ManagerMixin.pool = ManagerMixin.manager.Pool(4)
-
     testcases = (
         sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
         sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
@@ -3405,18 +3418,7 @@
 
     loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
     suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
-    try:
-        run(suite)
-    finally:
-        ThreadsMixin.pool.terminate()
-        ProcessesMixin.pool.terminate()
-        ManagerMixin.pool.terminate()
-        ManagerMixin.pool.join()
-        ManagerMixin.manager.shutdown()
-        ManagerMixin.manager.join()
-        ThreadsMixin.pool.join()
-        ProcessesMixin.pool.join()
-        del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
+    run(suite)
 
 def main():
     test_main(unittest.TextTestRunner(verbosity=2).run)

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list