[Python-checkins] bpo-35917: Test multiprocessing manager classes and shareable types (GH-11772)
Antoine Pitrou
webhook-mailer at python.org
Thu Feb 7 06:03:25 EST 2019
https://github.com/python/cpython/commit/2848d9d29914948621bce26bf0d9a89f2e19b97b
commit: 2848d9d29914948621bce26bf0d9a89f2e19b97b
branch: master
author: Giampaolo Rodola <g.rodola at gmail.com>
committer: Antoine Pitrou <pitrou at free.fr>
date: 2019-02-07T11:03:11Z
summary:
bpo-35917: Test multiprocessing manager classes and shareable types (GH-11772)
multiprocessing: provide unittests for manager classes and shareable types
files:
A Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst
M Lib/test/_test_multiprocessing.py
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 7341131231a4..2f839b952126 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -4706,6 +4706,252 @@ def is_alive(self):
any(process.is_alive() for process in forked_processes))
+class TestSyncManagerTypes(unittest.TestCase):
+ """Test all the types which can be shared between a parent and a
+ child process by using a manager which acts as an intermediary
+ between them.
+
+ In the following unit-tests the base type is created in the parent
+ process, the @classmethod represents the worker process and the
+ shared object is readable and editable between the two.
+
+ # The child.
+ @classmethod
+ def _test_list(cls, obj):
+ assert obj[0] == 5
+ assert obj.append(6)
+
+ # The parent.
+ def test_list(self):
+ o = self.manager.list()
+ o.append(5)
+ self.run_worker(self._test_list, o)
+ assert o[1] == 6
+ """
+ manager_class = multiprocessing.managers.SyncManager
+
+ def setUp(self):
+ self.manager = self.manager_class()
+ self.manager.start()
+ self.proc = None
+
+ def tearDown(self):
+ if self.proc is not None and self.proc.is_alive():
+ self.proc.terminate()
+ self.proc.join()
+ self.manager.shutdown()
+
+ @classmethod
+ def setUpClass(cls):
+ support.reap_children()
+
+ tearDownClass = setUpClass
+
+ def wait_proc_exit(self):
+ # Only the manager process should be returned by active_children()
+ # but this can take a bit on slow machines, so wait a few seconds
+ # if there are other children too (see #17395).
+ join_process(self.proc)
+ start_time = time.monotonic()
+ t = 0.01
+ while len(multiprocessing.active_children()) > 1:
+ time.sleep(t)
+ t *= 2
+ dt = time.monotonic() - start_time
+ if dt >= 5.0:
+ test.support.environment_altered = True
+ print("Warning -- multiprocessing.Manager still has %s active "
+ "children after %s seconds"
+ % (multiprocessing.active_children(), dt),
+ file=sys.stderr)
+ break
+
+ def run_worker(self, worker, obj):
+ self.proc = multiprocessing.Process(target=worker, args=(obj, ))
+ self.proc.daemon = True
+ self.proc.start()
+ self.wait_proc_exit()
+ self.assertEqual(self.proc.exitcode, 0)
+
+ @classmethod
+ def _test_queue(cls, obj):
+ assert obj.qsize() == 2
+ assert obj.full()
+ assert not obj.empty()
+ assert obj.get() == 5
+ assert not obj.empty()
+ assert obj.get() == 6
+ assert obj.empty()
+
+ def test_queue(self, qname="Queue"):
+ o = getattr(self.manager, qname)(2)
+ o.put(5)
+ o.put(6)
+ self.run_worker(self._test_queue, o)
+ assert o.empty()
+ assert not o.full()
+
+ def test_joinable_queue(self):
+ self.test_queue("JoinableQueue")
+
+ @classmethod
+ def _test_event(cls, obj):
+ assert obj.is_set()
+ obj.wait()
+ obj.clear()
+ obj.wait(0.001)
+
+ def test_event(self):
+ o = self.manager.Event()
+ o.set()
+ self.run_worker(self._test_event, o)
+ assert not o.is_set()
+ o.wait(0.001)
+
+ @classmethod
+ def _test_lock(cls, obj):
+ obj.acquire()
+
+ def test_lock(self, lname="Lock"):
+ o = getattr(self.manager, lname)()
+ self.run_worker(self._test_lock, o)
+ o.release()
+ self.assertRaises(RuntimeError, o.release) # already released
+
+ @classmethod
+ def _test_rlock(cls, obj):
+ obj.acquire()
+ obj.release()
+
+ def test_rlock(self, lname="Lock"):
+ o = getattr(self.manager, lname)()
+ self.run_worker(self._test_rlock, o)
+
+ @classmethod
+ def _test_semaphore(cls, obj):
+ obj.acquire()
+
+ def test_semaphore(self, sname="Semaphore"):
+ o = getattr(self.manager, sname)()
+ self.run_worker(self._test_semaphore, o)
+ o.release()
+
+ def test_bounded_semaphore(self):
+ self.test_semaphore(sname="BoundedSemaphore")
+
+ @classmethod
+ def _test_condition(cls, obj):
+ obj.acquire()
+ obj.release()
+
+ def test_condition(self):
+ o = self.manager.Condition()
+ self.run_worker(self._test_condition, o)
+
+ @classmethod
+ def _test_barrier(cls, obj):
+ assert obj.parties == 5
+ obj.reset()
+
+ def test_barrier(self):
+ o = self.manager.Barrier(5)
+ self.run_worker(self._test_barrier, o)
+
+ @classmethod
+ def _test_pool(cls, obj):
+ # TODO: fix https://bugs.python.org/issue35919
+ with obj:
+ pass
+
+ def test_pool(self):
+ o = self.manager.Pool(processes=4)
+ self.run_worker(self._test_pool, o)
+
+ @classmethod
+ def _test_list(cls, obj):
+ assert obj[0] == 5
+ assert obj.count(5) == 1
+ assert obj.index(5) == 0
+ obj.sort()
+ obj.reverse()
+ for x in obj:
+ pass
+ assert len(obj) == 1
+ assert obj.pop(0) == 5
+
+ def test_list(self):
+ o = self.manager.list()
+ o.append(5)
+ self.run_worker(self._test_list, o)
+ assert not o
+ self.assertEqual(len(o), 0)
+
+ @classmethod
+ def _test_dict(cls, obj):
+ assert len(obj) == 1
+ assert obj['foo'] == 5
+ assert obj.get('foo') == 5
+ # TODO: fix https://bugs.python.org/issue35918
+ # assert obj.has_key('foo')
+ assert list(obj.items()) == [('foo', 5)]
+ assert list(obj.keys()) == ['foo']
+ assert list(obj.values()) == [5]
+ assert obj.copy() == {'foo': 5}
+ assert obj.popitem() == ('foo', 5)
+
+ def test_dict(self):
+ o = self.manager.dict()
+ o['foo'] = 5
+ self.run_worker(self._test_dict, o)
+ assert not o
+ self.assertEqual(len(o), 0)
+
+ @classmethod
+ def _test_value(cls, obj):
+ assert obj.value == 1
+ assert obj.get() == 1
+ obj.set(2)
+
+ def test_value(self):
+ o = self.manager.Value('i', 1)
+ self.run_worker(self._test_value, o)
+ self.assertEqual(o.value, 2)
+ self.assertEqual(o.get(), 2)
+
+ @classmethod
+ def _test_array(cls, obj):
+ assert obj[0] == 0
+ assert obj[1] == 1
+ assert len(obj) == 2
+ assert list(obj) == [0, 1]
+
+ def test_array(self):
+ o = self.manager.Array('i', [0, 1])
+ self.run_worker(self._test_array, o)
+
+ @classmethod
+ def _test_namespace(cls, obj):
+ assert obj.x == 0
+ assert obj.y == 1
+
+ def test_namespace(self):
+ o = self.manager.Namespace()
+ o.x = 0
+ o.y = 1
+ self.run_worker(self._test_namespace, o)
+
+
+try:
+ import multiprocessing.shared_memory
+except ImportError:
+ @unittest.skip("SharedMemoryManager not available on this platform")
+ class TestSharedMemoryManagerTypes(TestSyncManagerTypes):
+ pass
+else:
+ class TestSharedMemoryManagerTypes(TestSyncManagerTypes):
+ """Same as above but by using SharedMemoryManager."""
+ manager_class = multiprocessing.shared_memory.SharedMemoryManager
+
class MiscTestCase(unittest.TestCase):
def test__all__(self):
diff --git a/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst b/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst
new file mode 100644
index 000000000000..546d47e39d87
--- /dev/null
+++ b/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst
@@ -0,0 +1,3 @@
+multiprocessing: provide unit tests for SyncManager and SharedMemoryManager
+classes + all the shareable types which are supposed to be supported by
+them. (patch by Giampaolo Rodola)
More information about the Python-checkins
mailing list