[Python-checkins] [3.6] bpo-31234: Add test.support.wait_threads_exit() (GH-3578) (GH-7315)

Victor Stinner webhook-mailer at python.org
Fri Jun 1 09:51:06 EDT 2018


https://github.com/python/cpython/commit/5dbb48aaac0ff74648b355ebdde222856004b1ef
commit: 5dbb48aaac0ff74648b355ebdde222856004b1ef
branch: 3.6
author: Victor Stinner <vstinner at redhat.com>
committer: GitHub <noreply at github.com>
date: 2018-06-01T15:51:02+02:00
summary:

[3.6] bpo-31234: Add test.support.wait_threads_exit() (GH-3578) (GH-7315)

* bpo-31234: Add test.support.wait_threads_exit() (GH-3578)

Use _thread.count() to wait until threads exit. The new context
manager prevents the "dangling thread" warning.

(cherry picked from commit ff40ecda73178dfcad24e26240d684356ef20793)

* bpo-31234: Try to fix lock_tests warning (#3557)

Try to fix the "Warning -- threading_cleanup() failed to cleanup 1
threads" warning in test.lock_tests: wait a little bit longer to give
time to the threads to complete.

Warning seen on test_thread and test_importlib.

(cherry picked from commit 096ae3373abac2c8b3a26a3fe33cc8bd4cbccd4e)

files:
M Lib/test/lock_tests.py
M Lib/test/support/__init__.py
M Lib/test/test_socket.py
M Lib/test/test_thread.py
M Lib/test/test_threading.py
M Lib/test/test_threadsignals.py

diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
index c1775702487d..5b1f033c6f80 100644
--- a/Lib/test/lock_tests.py
+++ b/Lib/test/lock_tests.py
@@ -31,6 +31,9 @@ def __init__(self, f, n, wait_before_exit=False):
         self.started = []
         self.finished = []
         self._can_exit = not wait_before_exit
+        self.wait_thread = support.wait_threads_exit()
+        self.wait_thread.__enter__()
+
         def task():
             tid = threading.get_ident()
             self.started.append(tid)
@@ -40,6 +43,7 @@ def task():
                 self.finished.append(tid)
                 while not self._can_exit:
                     _wait()
+
         try:
             for i in range(n):
                 start_new_thread(task, ())
@@ -54,6 +58,8 @@ def wait_for_started(self):
     def wait_for_finished(self):
         while len(self.finished) < self.n:
             _wait()
+        # Wait for threads exit
+        self.wait_thread.__exit__(None, None, None)
 
     def do_finish(self):
         self._can_exit = True
@@ -220,20 +226,23 @@ def test_reacquire(self):
         # Lock needs to be released before re-acquiring.
         lock = self.locktype()
         phase = []
+
         def f():
             lock.acquire()
             phase.append(None)
             lock.acquire()
             phase.append(None)
-        start_new_thread(f, ())
-        while len(phase) == 0:
-            _wait()
-        _wait()
-        self.assertEqual(len(phase), 1)
-        lock.release()
-        while len(phase) == 1:
+
+        with support.wait_threads_exit():
+            start_new_thread(f, ())
+            while len(phase) == 0:
+                _wait()
             _wait()
-        self.assertEqual(len(phase), 2)
+            self.assertEqual(len(phase), 1)
+            lock.release()
+            while len(phase) == 1:
+                _wait()
+            self.assertEqual(len(phase), 2)
 
     def test_different_thread(self):
         # Lock can be released from a different thread.
@@ -304,6 +313,7 @@ def f():
             self.assertRaises(RuntimeError, lock.release)
         finally:
             b.do_finish()
+        b.wait_for_finished()
 
     def test__is_owned(self):
         lock = self.locktype()
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 3def27b12ed0..8f1aeee72f18 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -2112,6 +2112,42 @@ def decorator(*args):
             threading_cleanup(*key)
     return decorator
 
+
+ at contextlib.contextmanager
+def wait_threads_exit(timeout=60.0):
+    """
+    bpo-31234: Context manager to wait until all threads created in the with
+    statement exit.
+
+    Use _thread.count() to check if threads exited. Indirectly, wait until
+    threads exit the internal t_bootstrap() C function of the _thread module.
+
+    threading_setup() and threading_cleanup() are designed to emit a warning
+    if a test leaves running threads in the background. This context manager
+    is designed to cleanup threads started by the _thread.start_new_thread()
+    which doesn't allow to wait for thread exit, whereas thread.Thread has a
+    join() method.
+    """
+    old_count = _thread._count()
+    try:
+        yield
+    finally:
+        start_time = time.monotonic()
+        deadline = start_time + timeout
+        while True:
+            count = _thread._count()
+            if count <= old_count:
+                break
+            if time.monotonic() > deadline:
+                dt = time.monotonic() - start_time
+                msg = (f"wait_threads() failed to cleanup {count - old_count} "
+                       f"threads after {dt:.1f} seconds "
+                       f"(count: {count}, old count: {old_count})")
+                raise AssertionError(msg)
+            time.sleep(0.010)
+            gc_collect()
+
+
 def reap_children():
     """Use this function at the end of test_main() whenever sub-processes
     are started.  This will help ensure that no extra children (zombies)
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 2d2ec04fc091..e41ddfd8afd6 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -245,6 +245,9 @@ def serverExplicitReady(self):
         self.server_ready.set()
 
     def _setUp(self):
+        self.wait_threads = support.wait_threads_exit()
+        self.wait_threads.__enter__()
+
         self.server_ready = threading.Event()
         self.client_ready = threading.Event()
         self.done = threading.Event()
@@ -271,6 +274,7 @@ def _setUp(self):
     def _tearDown(self):
         self.__tearDown()
         self.done.wait()
+        self.wait_threads.__exit__(None, None, None)
 
         if self.queue.qsize():
             exc = self.queue.get()
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py
index 2dd1593eaa0c..52f6c798b878 100644
--- a/Lib/test/test_thread.py
+++ b/Lib/test/test_thread.py
@@ -59,12 +59,13 @@ def task(self, ident):
                 self.done_mutex.release()
 
     def test_starting_threads(self):
-        # Basic test for thread creation.
-        for i in range(NUMTASKS):
-            self.newtask()
-        verbose_print("waiting for tasks to complete...")
-        self.done_mutex.acquire()
-        verbose_print("all tasks done")
+        with support.wait_threads_exit():
+            # Basic test for thread creation.
+            for i in range(NUMTASKS):
+                self.newtask()
+            verbose_print("waiting for tasks to complete...")
+            self.done_mutex.acquire()
+            verbose_print("all tasks done")
 
     def test_stack_size(self):
         # Various stack size tests.
@@ -94,12 +95,13 @@ def test_nt_and_posix_stack_size(self):
             verbose_print("trying stack_size = (%d)" % tss)
             self.next_ident = 0
             self.created = 0
-            for i in range(NUMTASKS):
-                self.newtask()
+            with support.wait_threads_exit():
+                for i in range(NUMTASKS):
+                    self.newtask()
 
-            verbose_print("waiting for all tasks to complete")
-            self.done_mutex.acquire()
-            verbose_print("all tasks done")
+                verbose_print("waiting for all tasks to complete")
+                self.done_mutex.acquire()
+                verbose_print("all tasks done")
 
         thread.stack_size(0)
 
@@ -109,25 +111,28 @@ def test__count(self):
         mut = thread.allocate_lock()
         mut.acquire()
         started = []
+
         def task():
             started.append(None)
             mut.acquire()
             mut.release()
-        thread.start_new_thread(task, ())
-        while not started:
-            time.sleep(POLL_SLEEP)
-        self.assertEqual(thread._count(), orig + 1)
-        # Allow the task to finish.
-        mut.release()
-        # The only reliable way to be sure that the thread ended from the
-        # interpreter's point of view is to wait for the function object to be
-        # destroyed.
-        done = []
-        wr = weakref.ref(task, lambda _: done.append(None))
-        del task
-        while not done:
-            time.sleep(POLL_SLEEP)
-        self.assertEqual(thread._count(), orig)
+
+        with support.wait_threads_exit():
+            thread.start_new_thread(task, ())
+            while not started:
+                time.sleep(POLL_SLEEP)
+            self.assertEqual(thread._count(), orig + 1)
+            # Allow the task to finish.
+            mut.release()
+            # The only reliable way to be sure that the thread ended from the
+            # interpreter's point of view is to wait for the function object to be
+            # destroyed.
+            done = []
+            wr = weakref.ref(task, lambda _: done.append(None))
+            del task
+            while not done:
+                time.sleep(POLL_SLEEP)
+            self.assertEqual(thread._count(), orig)
 
     def test_save_exception_state_on_error(self):
         # See issue #14474
@@ -140,16 +145,14 @@ def mywrite(self, *args):
             except ValueError:
                 pass
             real_write(self, *args)
-        c = thread._count()
         started = thread.allocate_lock()
         with support.captured_output("stderr") as stderr:
             real_write = stderr.write
             stderr.write = mywrite
             started.acquire()
-            thread.start_new_thread(task, ())
-            started.acquire()
-            while thread._count() > c:
-                time.sleep(POLL_SLEEP)
+            with support.wait_threads_exit():
+                thread.start_new_thread(task, ())
+                started.acquire()
         self.assertIn("Traceback", stderr.getvalue())
 
 
@@ -181,13 +184,14 @@ def enter(self):
 class BarrierTest(BasicThreadTest):
 
     def test_barrier(self):
-        self.bar = Barrier(NUMTASKS)
-        self.running = NUMTASKS
-        for i in range(NUMTASKS):
-            thread.start_new_thread(self.task2, (i,))
-        verbose_print("waiting for tasks to end")
-        self.done_mutex.acquire()
-        verbose_print("tasks done")
+        with support.wait_threads_exit():
+            self.bar = Barrier(NUMTASKS)
+            self.running = NUMTASKS
+            for i in range(NUMTASKS):
+                thread.start_new_thread(self.task2, (i,))
+            verbose_print("waiting for tasks to end")
+            self.done_mutex.acquire()
+            verbose_print("tasks done")
 
     def task2(self, ident):
         for i in range(NUMTRIPS):
@@ -225,11 +229,10 @@ def setUp(self):
     @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
     @support.reap_threads
     def test_forkinthread(self):
-        running = True
         status = "not set"
 
         def thread1():
-            nonlocal running, status
+            nonlocal status
 
             # fork in a thread
             pid = os.fork()
@@ -244,13 +247,11 @@ def thread1():
                 # parent
                 os.close(self.write_fd)
                 pid, status = os.waitpid(pid, 0)
-                running = False
 
-        thread.start_new_thread(thread1, ())
-        self.assertEqual(os.read(self.read_fd, 2), b"OK",
-                         "Unable to fork() in thread")
-        while running:
-            time.sleep(POLL_SLEEP)
+        with support.wait_threads_exit():
+            thread.start_new_thread(thread1, ())
+            self.assertEqual(os.read(self.read_fd, 2), b"OK",
+                             "Unable to fork() in thread")
         self.assertEqual(status, 0)
 
     def tearDown(self):
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index b42314fdbb20..43ef22b24d49 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -125,9 +125,10 @@ def f():
             done.set()
         done = threading.Event()
         ident = []
-        _thread.start_new_thread(f, ())
-        done.wait()
-        self.assertIsNotNone(ident[0])
+        with support.wait_threads_exit():
+            tid = _thread.start_new_thread(f, ())
+            done.wait()
+            self.assertEqual(ident[0], tid)
         # Kill the "immortal" _DummyThread
         del threading._active[ident[0]]
 
@@ -165,9 +166,10 @@ def f(mutex):
 
         mutex = threading.Lock()
         mutex.acquire()
-        tid = _thread.start_new_thread(f, (mutex,))
-        # Wait for the thread to finish.
-        mutex.acquire()
+        with support.wait_threads_exit():
+            tid = _thread.start_new_thread(f, (mutex,))
+            # Wait for the thread to finish.
+            mutex.acquire()
         self.assertIn(tid, threading._active)
         self.assertIsInstance(threading._active[tid], threading._DummyThread)
         #Issue 29376
diff --git a/Lib/test/test_threadsignals.py b/Lib/test/test_threadsignals.py
index 7d4d8c4106f8..99b60cd9e6b1 100644
--- a/Lib/test/test_threadsignals.py
+++ b/Lib/test/test_threadsignals.py
@@ -4,8 +4,8 @@
 import signal
 import os
 import sys
-from test.support import run_unittest, import_module
-thread = import_module('_thread')
+from test import support
+thread = support.import_module('_thread')
 import time
 
 if (sys.platform[:3] == 'win'):
@@ -39,13 +39,15 @@ def send_signals():
 class ThreadSignals(unittest.TestCase):
 
     def test_signals(self):
-        # Test signal handling semantics of threads.
-        # We spawn a thread, have the thread send two signals, and
-        # wait for it to finish. Check that we got both signals
-        # and that they were run by the main thread.
-        signalled_all.acquire()
-        self.spawnSignallingThread()
-        signalled_all.acquire()
+        with support.wait_threads_exit():
+            # Test signal handling semantics of threads.
+            # We spawn a thread, have the thread send two signals, and
+            # wait for it to finish. Check that we got both signals
+            # and that they were run by the main thread.
+            signalled_all.acquire()
+            self.spawnSignallingThread()
+            signalled_all.acquire()
+
         # the signals that we asked the kernel to send
         # will come back, but we don't know when.
         # (it might even be after the thread exits
@@ -118,17 +120,19 @@ def test_rlock_acquire_interruption(self):
             # thread.
             def other_thread():
                 rlock.acquire()
-            thread.start_new_thread(other_thread, ())
-            # Wait until we can't acquire it without blocking...
-            while rlock.acquire(blocking=False):
-                rlock.release()
-                time.sleep(0.01)
-            signal.alarm(1)
-            t1 = time.time()
-            self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
-            dt = time.time() - t1
-            # See rationale above in test_lock_acquire_interruption
-            self.assertLess(dt, 3.0)
+
+            with support.wait_threads_exit():
+                thread.start_new_thread(other_thread, ())
+                # Wait until we can't acquire it without blocking...
+                while rlock.acquire(blocking=False):
+                    rlock.release()
+                    time.sleep(0.01)
+                signal.alarm(1)
+                t1 = time.time()
+                self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
+                dt = time.time() - t1
+                # See rationale above in test_lock_acquire_interruption
+                self.assertLess(dt, 3.0)
         finally:
             signal.alarm(0)
             signal.signal(signal.SIGALRM, oldalrm)
@@ -137,6 +141,7 @@ def acquire_retries_on_intr(self, lock):
         self.sig_recvd = False
         def my_handler(signal, frame):
             self.sig_recvd = True
+
         old_handler = signal.signal(signal.SIGUSR1, my_handler)
         try:
             def other_thread():
@@ -151,14 +156,16 @@ def other_thread():
                 # the lock acquisition.  Then we'll let it run.
                 time.sleep(0.5)
                 lock.release()
-            thread.start_new_thread(other_thread, ())
-            # Wait until we can't acquire it without blocking...
-            while lock.acquire(blocking=False):
-                lock.release()
-                time.sleep(0.01)
-            result = lock.acquire()  # Block while we receive a signal.
-            self.assertTrue(self.sig_recvd)
-            self.assertTrue(result)
+
+            with support.wait_threads_exit():
+                thread.start_new_thread(other_thread, ())
+                # Wait until we can't acquire it without blocking...
+                while lock.acquire(blocking=False):
+                    lock.release()
+                    time.sleep(0.01)
+                result = lock.acquire()  # Block while we receive a signal.
+                self.assertTrue(self.sig_recvd)
+                self.assertTrue(result)
         finally:
             signal.signal(signal.SIGUSR1, old_handler)
 
@@ -197,19 +204,20 @@ def send_signals():
                     os.kill(process_pid, signal.SIGUSR1)
                 done.release()
 
-            # Send the signals from the non-main thread, since the main thread
-            # is the only one that can process signals.
-            thread.start_new_thread(send_signals, ())
-            timed_acquire()
-            # Wait for thread to finish
-            done.acquire()
-            # This allows for some timing and scheduling imprecision
-            self.assertLess(self.end - self.start, 2.0)
-            self.assertGreater(self.end - self.start, 0.3)
-            # If the signal is received several times before PyErr_CheckSignals()
-            # is called, the handler will get called less than 40 times. Just
-            # check it's been called at least once.
-            self.assertGreater(self.sigs_recvd, 0)
+            with support.wait_threads_exit():
+                # Send the signals from the non-main thread, since the main thread
+                # is the only one that can process signals.
+                thread.start_new_thread(send_signals, ())
+                timed_acquire()
+                # Wait for thread to finish
+                done.acquire()
+                # This allows for some timing and scheduling imprecision
+                self.assertLess(self.end - self.start, 2.0)
+                self.assertGreater(self.end - self.start, 0.3)
+                # If the signal is received several times before PyErr_CheckSignals()
+                # is called, the handler will get called less than 40 times. Just
+                # check it's been called at least once.
+                self.assertGreater(self.sigs_recvd, 0)
         finally:
             signal.signal(signal.SIGUSR1, old_handler)
 
@@ -223,7 +231,7 @@ def test_main():
 
     oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
     try:
-        run_unittest(ThreadSignals)
+        support.run_unittest(ThreadSignals)
     finally:
         registerSignals(*oldsigs)
 



More information about the Python-checkins mailing list