[Python-checkins] r76137 - in python/trunk: Lib/test/lock_tests.py Lib/test/test_thread.py Lib/test/test_threading.py Misc/NEWS

antoine.pitrou python-checkins at python.org
Fri Nov 6 23:34:35 CET 2009


Author: antoine.pitrou
Date: Fri Nov  6 23:34:35 2009
New Revision: 76137

Log:
Issue #7270: Add some dedicated unit tests for multi-thread synchronization
primitives such as Lock, RLock, Condition, Event and Semaphore.



Added:
   python/trunk/Lib/test/lock_tests.py   (contents, props changed)
Modified:
   python/trunk/Lib/test/test_thread.py
   python/trunk/Lib/test/test_threading.py
   python/trunk/Misc/NEWS

Added: python/trunk/Lib/test/lock_tests.py
==============================================================================
--- (empty file)
+++ python/trunk/Lib/test/lock_tests.py	Fri Nov  6 23:34:35 2009
@@ -0,0 +1,533 @@
+"""
+Various tests for synchronization primitives.
+"""
+
+import sys
+import time
+from thread import start_new_thread, get_ident
+import threading
+import unittest
+
+from test import test_support as support
+
+
+def _wait():
+    # A crude wait/yield function not relying on synchronization primitives.
+    time.sleep(0.01)
+
+class Bunch(object):
+    """
+    A bunch of threads.
+    """
+    def __init__(self, f, n, wait_before_exit=False):
+        """
+        Construct a bunch of `n` threads running the same function `f`.
+        If `wait_before_exit` is True, the threads won't terminate until
+        do_finish() is called.
+        """
+        self.f = f
+        self.n = n
+        self.started = []
+        self.finished = []
+        self._can_exit = not wait_before_exit
+        def task():
+            tid = get_ident()
+            self.started.append(tid)
+            try:
+                f()
+            finally:
+                self.finished.append(tid)
+                while not self._can_exit:
+                    _wait()
+        for i in range(n):
+            start_new_thread(task, ())
+
+    def wait_for_started(self):
+        while len(self.started) < self.n:
+            _wait()
+
+    def wait_for_finished(self):
+        while len(self.finished) < self.n:
+            _wait()
+
+    def do_finish(self):
+        self._can_exit = True
+
+
+class BaseTestCase(unittest.TestCase):
+    def setUp(self):
+        self._threads = support.threading_setup()
+
+    def tearDown(self):
+        support.threading_cleanup(*self._threads)
+        support.reap_children()
+
+
+class BaseLockTests(BaseTestCase):
+    """
+    Tests for both recursive and non-recursive locks.
+    """
+
+    def test_constructor(self):
+        lock = self.locktype()
+        del lock
+
+    def test_acquire_destroy(self):
+        lock = self.locktype()
+        lock.acquire()
+        del lock
+
+    def test_acquire_release(self):
+        lock = self.locktype()
+        lock.acquire()
+        lock.release()
+        del lock
+
+    def test_try_acquire(self):
+        lock = self.locktype()
+        self.assertTrue(lock.acquire(False))
+        lock.release()
+
+    def test_try_acquire_contended(self):
+        lock = self.locktype()
+        lock.acquire()
+        result = []
+        def f():
+            result.append(lock.acquire(False))
+        Bunch(f, 1).wait_for_finished()
+        self.assertFalse(result[0])
+        lock.release()
+
+    def test_acquire_contended(self):
+        lock = self.locktype()
+        lock.acquire()
+        N = 5
+        def f():
+            lock.acquire()
+            lock.release()
+
+        b = Bunch(f, N)
+        b.wait_for_started()
+        _wait()
+        self.assertEqual(len(b.finished), 0)
+        lock.release()
+        b.wait_for_finished()
+        self.assertEqual(len(b.finished), N)
+
+    def test_with(self):
+        lock = self.locktype()
+        def f():
+            lock.acquire()
+            lock.release()
+        def _with(err=None):
+            with lock:
+                if err is not None:
+                    raise err
+        _with()
+        # Check the lock is unacquired
+        Bunch(f, 1).wait_for_finished()
+        self.assertRaises(TypeError, _with, TypeError)
+        # Check the lock is unacquired
+        Bunch(f, 1).wait_for_finished()
+
+
+class LockTests(BaseLockTests):
+    """
+    Tests for non-recursive, weak locks
+    (which can be acquired and released from different threads).
+    """
+    def test_reacquire(self):
+        # Lock needs to be released before re-acquiring.
+        lock = self.locktype()
+        phase = []
+        def f():
+            lock.acquire()
+            phase.append(None)
+            lock.acquire()
+            phase.append(None)
+        start_new_thread(f, ())
+        while len(phase) == 0:
+            _wait()
+        _wait()
+        self.assertEqual(len(phase), 1)
+        lock.release()
+        while len(phase) == 1:
+            _wait()
+        self.assertEqual(len(phase), 2)
+
+    def test_different_thread(self):
+        # Lock can be released from a different thread.
+        lock = self.locktype()
+        lock.acquire()
+        def f():
+            lock.release()
+        b = Bunch(f, 1)
+        b.wait_for_finished()
+        lock.acquire()
+        lock.release()
+
+
+class RLockTests(BaseLockTests):
+    """
+    Tests for recursive locks.
+    """
+    def test_reacquire(self):
+        lock = self.locktype()
+        lock.acquire()
+        lock.acquire()
+        lock.release()
+        lock.acquire()
+        lock.release()
+        lock.release()
+
+    def test_release_unacquired(self):
+        # Cannot release an unacquired lock
+        lock = self.locktype()
+        self.assertRaises(RuntimeError, lock.release)
+        lock.acquire()
+        lock.acquire()
+        lock.release()
+        lock.acquire()
+        lock.release()
+        lock.release()
+        self.assertRaises(RuntimeError, lock.release)
+
+    def test_different_thread(self):
+        # Cannot release from a different thread
+        lock = self.locktype()
+        def f():
+            lock.acquire()
+        b = Bunch(f, 1, True)
+        try:
+            self.assertRaises(RuntimeError, lock.release)
+        finally:
+            b.do_finish()
+
+    def test__is_owned(self):
+        lock = self.locktype()
+        self.assertFalse(lock._is_owned())
+        lock.acquire()
+        self.assertTrue(lock._is_owned())
+        lock.acquire()
+        self.assertTrue(lock._is_owned())
+        result = []
+        def f():
+            result.append(lock._is_owned())
+        Bunch(f, 1).wait_for_finished()
+        self.assertFalse(result[0])
+        lock.release()
+        self.assertTrue(lock._is_owned())
+        lock.release()
+        self.assertFalse(lock._is_owned())
+
+
+class EventTests(BaseTestCase):
+    """
+    Tests for Event objects.
+    """
+
+    def test_is_set(self):
+        evt = self.eventtype()
+        self.assertFalse(evt.is_set())
+        evt.set()
+        self.assertTrue(evt.is_set())
+        evt.set()
+        self.assertTrue(evt.is_set())
+        evt.clear()
+        self.assertFalse(evt.is_set())
+        evt.clear()
+        self.assertFalse(evt.is_set())
+
+    def _check_notify(self, evt):
+        # All threads get notified
+        N = 5
+        results1 = []
+        results2 = []
+        def f():
+            results1.append(evt.wait())
+            results2.append(evt.wait())
+        b = Bunch(f, N)
+        b.wait_for_started()
+        _wait()
+        self.assertEqual(len(results1), 0)
+        evt.set()
+        b.wait_for_finished()
+        self.assertEqual(results1, [True] * N)
+        self.assertEqual(results2, [True] * N)
+
+    def test_notify(self):
+        evt = self.eventtype()
+        self._check_notify(evt)
+        # Another time, after an explicit clear()
+        evt.set()
+        evt.clear()
+        self._check_notify(evt)
+
+    def test_timeout(self):
+        evt = self.eventtype()
+        results1 = []
+        results2 = []
+        N = 5
+        def f():
+            results1.append(evt.wait(0.0))
+            t1 = time.time()
+            r = evt.wait(0.2)
+            t2 = time.time()
+            results2.append((r, t2 - t1))
+        Bunch(f, N).wait_for_finished()
+        self.assertEqual(results1, [False] * N)
+        for r, dt in results2:
+            self.assertFalse(r)
+            self.assertTrue(dt >= 0.2, dt)
+        # The event is set
+        results1 = []
+        results2 = []
+        evt.set()
+        Bunch(f, N).wait_for_finished()
+        self.assertEqual(results1, [True] * N)
+        for r, dt in results2:
+            self.assertTrue(r)
+
+
+class ConditionTests(BaseTestCase):
+    """
+    Tests for condition variables.
+    """
+
+    def test_acquire(self):
+        cond = self.condtype()
+        # Be default we have an RLock: the condition can be acquired multiple
+        # times.
+        cond.acquire()
+        cond.acquire()
+        cond.release()
+        cond.release()
+        lock = threading.Lock()
+        cond = self.condtype(lock)
+        cond.acquire()
+        self.assertFalse(lock.acquire(False))
+        cond.release()
+        self.assertTrue(lock.acquire(False))
+        self.assertFalse(cond.acquire(False))
+        lock.release()
+        with cond:
+            self.assertFalse(lock.acquire(False))
+
+    def test_unacquired_wait(self):
+        cond = self.condtype()
+        self.assertRaises(RuntimeError, cond.wait)
+
+    def test_unacquired_notify(self):
+        cond = self.condtype()
+        self.assertRaises(RuntimeError, cond.notify)
+
+    def _check_notify(self, cond):
+        N = 5
+        results1 = []
+        results2 = []
+        phase_num = 0
+        def f():
+            cond.acquire()
+            cond.wait()
+            cond.release()
+            results1.append(phase_num)
+            cond.acquire()
+            cond.wait()
+            cond.release()
+            results2.append(phase_num)
+        b = Bunch(f, N)
+        b.wait_for_started()
+        _wait()
+        self.assertEqual(results1, [])
+        # Notify 3 threads at first
+        cond.acquire()
+        cond.notify(3)
+        _wait()
+        phase_num = 1
+        cond.release()
+        while len(results1) < 3:
+            _wait()
+        self.assertEqual(results1, [1] * 3)
+        self.assertEqual(results2, [])
+        # Notify 5 threads: they might be in their first or second wait
+        cond.acquire()
+        cond.notify(5)
+        _wait()
+        phase_num = 2
+        cond.release()
+        while len(results1) + len(results2) < 8:
+            _wait()
+        self.assertEqual(results1, [1] * 3 + [2] * 2)
+        self.assertEqual(results2, [2] * 3)
+        # Notify all threads: they are all in their second wait
+        cond.acquire()
+        cond.notify_all()
+        _wait()
+        phase_num = 3
+        cond.release()
+        while len(results2) < 5:
+            _wait()
+        self.assertEqual(results1, [1] * 3 + [2] * 2)
+        self.assertEqual(results2, [2] * 3 + [3] * 2)
+        b.wait_for_finished()
+
+    def test_notify(self):
+        cond = self.condtype()
+        self._check_notify(cond)
+        # A second time, to check internal state is still ok.
+        self._check_notify(cond)
+
+    def test_timeout(self):
+        cond = self.condtype()
+        results = []
+        N = 5
+        def f():
+            cond.acquire()
+            t1 = time.time()
+            cond.wait(0.2)
+            t2 = time.time()
+            cond.release()
+            results.append(t2 - t1)
+        Bunch(f, N).wait_for_finished()
+        self.assertEqual(len(results), 5)
+        for dt in results:
+            self.assertTrue(dt >= 0.2, dt)
+
+
+class BaseSemaphoreTests(BaseTestCase):
+    """
+    Common tests for {bounded, unbounded} semaphore objects.
+    """
+
+    def test_constructor(self):
+        self.assertRaises(ValueError, self.semtype, value = -1)
+        self.assertRaises(ValueError, self.semtype, value = -sys.maxint)
+
+    def test_acquire(self):
+        sem = self.semtype(1)
+        sem.acquire()
+        sem.release()
+        sem = self.semtype(2)
+        sem.acquire()
+        sem.acquire()
+        sem.release()
+        sem.release()
+
+    def test_acquire_destroy(self):
+        sem = self.semtype()
+        sem.acquire()
+        del sem
+
+    def test_acquire_contended(self):
+        sem = self.semtype(7)
+        sem.acquire()
+        N = 10
+        results1 = []
+        results2 = []
+        phase_num = 0
+        def f():
+            sem.acquire()
+            results1.append(phase_num)
+            sem.acquire()
+            results2.append(phase_num)
+        b = Bunch(f, 10)
+        b.wait_for_started()
+        while len(results1) + len(results2) < 6:
+            _wait()
+        self.assertEqual(results1 + results2, [0] * 6)
+        phase_num = 1
+        for i in range(7):
+            sem.release()
+        while len(results1) + len(results2) < 13:
+            _wait()
+        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
+        phase_num = 2
+        for i in range(6):
+            sem.release()
+        while len(results1) + len(results2) < 19:
+            _wait()
+        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
+        # The semaphore is still locked
+        self.assertFalse(sem.acquire(False))
+        # Final release, to let the last thread finish
+        sem.release()
+        b.wait_for_finished()
+
+    def test_try_acquire(self):
+        sem = self.semtype(2)
+        self.assertTrue(sem.acquire(False))
+        self.assertTrue(sem.acquire(False))
+        self.assertFalse(sem.acquire(False))
+        sem.release()
+        self.assertTrue(sem.acquire(False))
+
+    def test_try_acquire_contended(self):
+        sem = self.semtype(4)
+        sem.acquire()
+        results = []
+        def f():
+            results.append(sem.acquire(False))
+            results.append(sem.acquire(False))
+        Bunch(f, 5).wait_for_finished()
+        # There can be a thread switch between acquiring the semaphore and
+        # appending the result, therefore results will not necessarily be
+        # ordered.
+        self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )
+
+    def test_default_value(self):
+        # The default initial value is 1.
+        sem = self.semtype()
+        sem.acquire()
+        def f():
+            sem.acquire()
+            sem.release()
+        b = Bunch(f, 1)
+        b.wait_for_started()
+        _wait()
+        self.assertFalse(b.finished)
+        sem.release()
+        b.wait_for_finished()
+
+    def test_with(self):
+        sem = self.semtype(2)
+        def _with(err=None):
+            with sem:
+                self.assertTrue(sem.acquire(False))
+                sem.release()
+                with sem:
+                    self.assertFalse(sem.acquire(False))
+                    if err:
+                        raise err
+        _with()
+        self.assertTrue(sem.acquire(False))
+        sem.release()
+        self.assertRaises(TypeError, _with, TypeError)
+        self.assertTrue(sem.acquire(False))
+        sem.release()
+
+class SemaphoreTests(BaseSemaphoreTests):
+    """
+    Tests for unbounded semaphores.
+    """
+
+    def test_release_unacquired(self):
+        # Unbounded releases are allowed and increment the semaphore's value
+        sem = self.semtype(1)
+        sem.release()
+        sem.acquire()
+        sem.acquire()
+        sem.release()
+
+
+class BoundedSemaphoreTests(BaseSemaphoreTests):
+    """
+    Tests for bounded semaphores.
+    """
+
+    def test_release_unacquired(self):
+        # Cannot go past the initial value
+        sem = self.semtype()
+        self.assertRaises(ValueError, sem.release)
+        sem.acquire()
+        sem.release()
+        self.assertRaises(ValueError, sem.release)

Modified: python/trunk/Lib/test/test_thread.py
==============================================================================
--- python/trunk/Lib/test/test_thread.py	(original)
+++ python/trunk/Lib/test/test_thread.py	Fri Nov  6 23:34:35 2009
@@ -6,6 +6,7 @@
 import time
 import weakref
 
+from test import lock_tests
 
 NUMTASKS = 10
 NUMTRIPS = 3
@@ -191,8 +192,12 @@
             self.done_mutex.release()
 
 
+class LockTests(lock_tests.LockTests):
+    locktype = thread.allocate_lock
+
+
 def test_main():
-    test_support.run_unittest(ThreadRunningTests, BarrierTest)
+    test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests)
 
 if __name__ == "__main__":
     test_main()

Modified: python/trunk/Lib/test/test_threading.py
==============================================================================
--- python/trunk/Lib/test/test_threading.py	(original)
+++ python/trunk/Lib/test/test_threading.py	Fri Nov  6 23:34:35 2009
@@ -11,6 +11,8 @@
 import unittest
 import weakref
 
+from test import lock_tests
+
 # A trivial mutable counter.
 class Counter(object):
     def __init__(self):
@@ -482,22 +484,6 @@
         thread.start()
         self.assertRaises(RuntimeError, thread.start)
 
-    def test_releasing_unacquired_rlock(self):
-        rlock = threading.RLock()
-        self.assertRaises(RuntimeError, rlock.release)
-
-    def test_waiting_on_unacquired_condition(self):
-        cond = threading.Condition()
-        self.assertRaises(RuntimeError, cond.wait)
-
-    def test_notify_on_unacquired_condition(self):
-        cond = threading.Condition()
-        self.assertRaises(RuntimeError, cond.notify)
-
-    def test_semaphore_with_negative_value(self):
-        self.assertRaises(ValueError, threading.Semaphore, value = -1)
-        self.assertRaises(ValueError, threading.Semaphore, value = -sys.maxint)
-
     def test_joining_current_thread(self):
         current_thread = threading.current_thread()
         self.assertRaises(RuntimeError, current_thread.join);
@@ -512,8 +498,34 @@
         self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
 
 
+class LockTests(lock_tests.LockTests):
+    locktype = staticmethod(threading.Lock)
+
+class RLockTests(lock_tests.RLockTests):
+    locktype = staticmethod(threading.RLock)
+
+class EventTests(lock_tests.EventTests):
+    eventtype = staticmethod(threading.Event)
+
+class ConditionAsRLockTests(lock_tests.RLockTests):
+    # An Condition uses an RLock by default and exports its API.
+    locktype = staticmethod(threading.Condition)
+
+class ConditionTests(lock_tests.ConditionTests):
+    condtype = staticmethod(threading.Condition)
+
+class SemaphoreTests(lock_tests.SemaphoreTests):
+    semtype = staticmethod(threading.Semaphore)
+
+class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
+    semtype = staticmethod(threading.BoundedSemaphore)
+
+
 def test_main():
-    test.test_support.run_unittest(ThreadTests,
+    test.test_support.run_unittest(LockTests, RLockTests, EventTests,
+                                   ConditionAsRLockTests, ConditionTests,
+                                   SemaphoreTests, BoundedSemaphoreTests,
+                                   ThreadTests,
                                    ThreadJoinOnShutdown,
                                    ThreadingExceptionTests,
                                    )

Modified: python/trunk/Misc/NEWS
==============================================================================
--- python/trunk/Misc/NEWS	(original)
+++ python/trunk/Misc/NEWS	Fri Nov  6 23:34:35 2009
@@ -1551,6 +1551,9 @@
 Tests
 -----
 
+- Issue #7270: Add some dedicated unit tests for multi-thread synchronization
+  primitives such as Lock, RLock, Condition, Event and Semaphore.
+
 - Issue #7222: Make thread "reaping" more reliable so that reference
   leak-chasing test runs give sensible results. The previous method of
   reaping threads could return successfully while some Thread objects were


More information about the Python-checkins mailing list