[Python-checkins] Add test.support.busy_retry() (#93770)

vstinner webhook-mailer at python.org
Wed Jun 15 05:42:25 EDT 2022


https://github.com/python/cpython/commit/7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16
commit: 7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16
branch: main
author: Victor Stinner <vstinner at python.org>
committer: vstinner <vstinner at python.org>
date: 2022-06-15T11:42:10+02:00
summary:

Add test.support.busy_retry() (#93770)

Add busy_retry() and sleeping_retry() functions to test.support.

files:
M Doc/library/test.rst
M Lib/test/_test_multiprocessing.py
M Lib/test/fork_wait.py
M Lib/test/support/__init__.py
M Lib/test/test__xxsubinterpreters.py
M Lib/test/test_concurrent_futures.py
M Lib/test/test_multiprocessing_main_handling.py
M Lib/test/test_signal.py
M Lib/test/test_ssl.py
M Lib/test/test_support.py
M Lib/test/test_wait3.py
M Lib/test/test_wait4.py

diff --git a/Doc/library/test.rst b/Doc/library/test.rst
index 5c458bf3ec2f2..e255952d4570e 100644
--- a/Doc/library/test.rst
+++ b/Doc/library/test.rst
@@ -413,6 +413,51 @@ The :mod:`test.support` module defines the following constants:
 
 The :mod:`test.support` module defines the following functions:
 
+.. function:: busy_retry(timeout, err_msg=None, /, *, error=True)
+
+   Run the loop body until ``break`` stops the loop.
+
+   After *timeout* seconds, raise an :exc:`AssertionError` if *error* is true,
+   or just stop the loop if *error* is false.
+
+   Example::
+
+       for _ in support.busy_retry(support.SHORT_TIMEOUT):
+           if check():
+               break
+
+   Example of error=False usage::
+
+       for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
+           if check():
+               break
+       else:
+           raise RuntimeError('my custom error')
+
+.. function:: sleeping_retry(timeout, err_msg=None, /, *, init_delay=0.010, max_delay=1.0, error=True)
+
+   Wait strategy that applies exponential backoff.
+
+   Run the loop body until ``break`` stops the loop. Sleep at each loop
+   iteration, but not at the first iteration. The sleep delay is doubled at
+   each iteration (up to *max_delay* seconds).
+
+   See :func:`busy_retry` documentation for the parameters usage.
+
+   Example raising an exception after SHORT_TIMEOUT seconds::
+
+       for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+           if check():
+               break
+
+   Example of error=False usage::
+
+       for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+           if check():
+               break
+       else:
+           raise RuntimeError('my custom error')
+
 .. function:: is_resource_enabled(resource)
 
    Return ``True`` if *resource* is enabled and available. The list of
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 4a588d96deb94..dca5a19d2ed62 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -4313,18 +4313,13 @@ def test_shared_memory_cleaned_after_process_termination(self):
             p.terminate()
             p.wait()
 
-            deadline = time.monotonic() + support.LONG_TIMEOUT
-            t = 0.1
-            while time.monotonic() < deadline:
-                time.sleep(t)
-                t = min(t*2, 5)
+            err_msg = ("A SharedMemory segment was leaked after "
+                       "a process was abruptly terminated")
+            for _ in support.sleeping_retry(support.LONG_TIMEOUT, err_msg):
                 try:
                     smm = shared_memory.SharedMemory(name, create=False)
                 except FileNotFoundError:
                     break
-            else:
-                raise AssertionError("A SharedMemory segment was leaked after"
-                                     " a process was abruptly terminated.")
 
             if os.name == 'posix':
                 # Without this line it was raising warnings like:
@@ -5334,9 +5329,10 @@ def create_and_register_resource(rtype):
                 p.terminate()
                 p.wait()
 
-                deadline = time.monotonic() + support.LONG_TIMEOUT
-                while time.monotonic() < deadline:
-                    time.sleep(.5)
+                err_msg = (f"A {rtype} resource was leaked after a process was "
+                           f"abruptly terminated")
+                for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
+                                                  err_msg):
                     try:
                         _resource_unlink(name2, rtype)
                     except OSError as e:
@@ -5344,10 +5340,7 @@ def create_and_register_resource(rtype):
                         # EINVAL
                         self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
                         break
-                else:
-                    raise AssertionError(
-                        f"A {rtype} resource was leaked after a process was "
-                        f"abruptly terminated.")
+
                 err = p.stderr.read().decode('utf-8')
                 p.stderr.close()
                 expected = ('resource_tracker: There appear to be 2 leaked {} '
@@ -5575,18 +5568,17 @@ def wait_proc_exit(self):
         # but this can take a bit on slow machines, so wait a few seconds
         # if there are other children too (see #17395).
         join_process(self.proc)
+
         start_time = time.monotonic()
-        t = 0.01
-        while len(multiprocessing.active_children()) > 1:
-            time.sleep(t)
-            t *= 2
-            dt = time.monotonic() - start_time
-            if dt >= 5.0:
-                test.support.environment_altered = True
-                support.print_warning(f"multiprocessing.Manager still has "
-                                      f"{multiprocessing.active_children()} "
-                                      f"active children after {dt} seconds")
+        for _ in support.sleeping_retry(5.0, error=False):
+            if len(multiprocessing.active_children()) <= 1:
                 break
+        else:
+            dt = time.monotonic() - start_time
+            support.environment_altered = True
+            support.print_warning(f"multiprocessing.Manager still has "
+                                  f"{multiprocessing.active_children()} "
+                                  f"active children after {dt:.1f} seconds")
 
     def run_worker(self, worker, obj):
         self.proc = multiprocessing.Process(target=worker, args=(obj, ))
@@ -5884,17 +5876,15 @@ def tearDownClass(cls):
         # but this can take a bit on slow machines, so wait a few seconds
         # if there are other children too (see #17395)
         start_time = time.monotonic()
-        t = 0.01
-        while len(multiprocessing.active_children()) > 1:
-            time.sleep(t)
-            t *= 2
-            dt = time.monotonic() - start_time
-            if dt >= 5.0:
-                test.support.environment_altered = True
-                support.print_warning(f"multiprocessing.Manager still has "
-                                      f"{multiprocessing.active_children()} "
-                                      f"active children after {dt} seconds")
+        for _ in support.sleeping_retry(5.0, error=False):
+            if len(multiprocessing.active_children()) <= 1:
                 break
+        else:
+            dt = time.monotonic() - start_time
+            support.environment_altered = True
+            support.print_warning(f"multiprocessing.Manager still has "
+                                  f"{multiprocessing.active_children()} "
+                                  f"active children after {dt:.1f} seconds")
 
         gc.collect()                       # do garbage collection
         if cls.manager._number_of_objects() != 0:
diff --git a/Lib/test/fork_wait.py b/Lib/test/fork_wait.py
index 4d3dbd8e83f5a..c565f59355948 100644
--- a/Lib/test/fork_wait.py
+++ b/Lib/test/fork_wait.py
@@ -54,10 +54,8 @@ def test_wait(self):
             self.threads.append(thread)
 
         # busy-loop to wait for threads
-        deadline = time.monotonic() + support.SHORT_TIMEOUT
-        while len(self.alive) < NUM_THREADS:
-            time.sleep(0.1)
-            if deadline < time.monotonic():
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+            if len(self.alive) >= NUM_THREADS:
                 break
 
         a = sorted(self.alive.keys())
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 4baaeb766317f..a62e8b4ec4f6b 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -2250,3 +2250,79 @@ def atfork_func():
             pass
         atfork_func.reference = ref_cycle
         os.register_at_fork(before=atfork_func)
+
+
+def busy_retry(timeout, err_msg=None, /, *, error=True):
+    """
+    Run the loop body until "break" stops the loop.
+
+    After *timeout* seconds, raise an AssertionError if *error* is true,
+    or just stop if *error is false.
+
+    Example:
+
+        for _ in support.busy_retry(support.SHORT_TIMEOUT):
+            if check():
+                break
+
+    Example of error=False usage:
+
+        for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
+            if check():
+                break
+        else:
+            raise RuntimeError('my custom error')
+
+    """
+    if timeout <= 0:
+        raise ValueError("timeout must be greater than zero")
+
+    start_time = time.monotonic()
+    deadline = start_time + timeout
+
+    while True:
+        yield
+
+        if time.monotonic() >= deadline:
+            break
+
+    if error:
+        dt = time.monotonic() - start_time
+        msg = f"timeout ({dt:.1f} seconds)"
+        if err_msg:
+            msg = f"{msg}: {err_msg}"
+        raise AssertionError(msg)
+
+
+def sleeping_retry(timeout, err_msg=None, /,
+                     *, init_delay=0.010, max_delay=1.0, error=True):
+    """
+    Wait strategy that applies exponential backoff.
+
+    Run the loop body until "break" stops the loop. Sleep at each loop
+    iteration, but not at the first iteration. The sleep delay is doubled at
+    each iteration (up to *max_delay* seconds).
+
+    See busy_retry() documentation for the parameters usage.
+
+    Example raising an exception after SHORT_TIMEOUT seconds:
+
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+            if check():
+                break
+
+    Example of error=False usage:
+
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+            if check():
+                break
+        else:
+            raise RuntimeError('my custom error')
+    """
+
+    delay = init_delay
+    for _ in busy_retry(timeout, err_msg, error=error):
+        yield
+
+        time.sleep(delay)
+        delay = min(delay * 2, max_delay)
diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py
index 5d0ed9ea14ac7..f20aae8e21c66 100644
--- a/Lib/test/test__xxsubinterpreters.py
+++ b/Lib/test/test__xxsubinterpreters.py
@@ -45,12 +45,11 @@ def _wait_for_interp_to_run(interp, timeout=None):
     # run subinterpreter eariler than the main thread in multiprocess.
     if timeout is None:
         timeout = support.SHORT_TIMEOUT
-    start_time = time.monotonic()
-    deadline = start_time + timeout
-    while not interpreters.is_running(interp):
-        if time.monotonic() > deadline:
-            raise RuntimeError('interp is not running')
-        time.sleep(0.010)
+    for _ in support.sleeping_retry(timeout, error=False):
+        if interpreters.is_running(interp):
+            break
+    else:
+        raise RuntimeError('interp is not running')
 
 
 @contextlib.contextmanager
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 6f3b4609232bb..c12165366bb25 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -256,12 +256,12 @@ def test_initializer(self):
             else:
                 with self.assertRaises(BrokenExecutor):
                     future.result()
+
             # At some point, the executor should break
-            t1 = time.monotonic()
-            while not self.executor._broken:
-                if time.monotonic() - t1 > 5:
-                    self.fail("executor not broken after 5 s.")
-                time.sleep(0.01)
+            for _ in support.sleeping_retry(5, "executor not broken"):
+                if self.executor._broken:
+                    break
+
             # ... and from this point submit() is guaranteed to fail
             with self.assertRaises(BrokenExecutor):
                 self.executor.submit(get_init_status)
diff --git a/Lib/test/test_multiprocessing_main_handling.py b/Lib/test/test_multiprocessing_main_handling.py
index 510d8d3a7597e..35e9cd64fa6c0 100644
--- a/Lib/test/test_multiprocessing_main_handling.py
+++ b/Lib/test/test_multiprocessing_main_handling.py
@@ -40,6 +40,7 @@
 import sys
 import time
 from multiprocessing import Pool, set_start_method
+from test import support
 
 # We use this __main__ defined function in the map call below in order to
 # check that multiprocessing in correctly running the unguarded
@@ -59,13 +60,11 @@ def f(x):
     results = []
     with Pool(5) as pool:
         pool.map_async(f, [1, 2, 3], callback=results.extend)
-        start_time = time.monotonic()
-        while not results:
-            time.sleep(0.05)
-            # up to 1 min to report the results
-            dt = time.monotonic() - start_time
-            if dt > 60.0:
-                raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
+
+        # up to 1 min to report the results
+        for _ in support.sleeping_retry(60, "Timed out waiting for results"):
+            if results:
+                break
 
     results.sort()
     print(start_method, "->", results)
@@ -86,19 +85,17 @@ def f(x):
 import sys
 import time
 from multiprocessing import Pool, set_start_method
+from test import support
 
 start_method = sys.argv[1]
 set_start_method(start_method)
 results = []
 with Pool(5) as pool:
     pool.map_async(int, [1, 4, 9], callback=results.extend)
-    start_time = time.monotonic()
-    while not results:
-        time.sleep(0.05)
-        # up to 1 min to report the results
-        dt = time.monotonic() - start_time
-        if dt > 60.0:
-            raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
+    # up to 1 min to report the results
+    for _ in support.sleeping_retry(60, "Timed out waiting for results"):
+        if results:
+            break
 
 results.sort()
 print(start_method, "->", results)
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 6aa529b062000..a1d074a56cf65 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -812,13 +812,14 @@ def test_itimer_virtual(self):
         signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
         signal.setitimer(self.itimer, 0.3, 0.2)
 
-        start_time = time.monotonic()
-        while time.monotonic() - start_time < 60.0:
+        for _ in support.busy_retry(60.0, error=False):
             # use up some virtual time by doing real work
             _ = pow(12345, 67890, 10000019)
             if signal.getitimer(self.itimer) == (0.0, 0.0):
-                break # sig_vtalrm handler stopped this itimer
-        else: # Issue 8424
+                # sig_vtalrm handler stopped this itimer
+                break
+        else:
+            # bpo-8424
             self.skipTest("timeout: likely cause: machine too slow or load too "
                           "high")
 
@@ -832,13 +833,14 @@ def test_itimer_prof(self):
         signal.signal(signal.SIGPROF, self.sig_prof)
         signal.setitimer(self.itimer, 0.2, 0.2)
 
-        start_time = time.monotonic()
-        while time.monotonic() - start_time < 60.0:
+        for _ in support.busy_retry(60.0, error=False):
             # do some work
             _ = pow(12345, 67890, 10000019)
             if signal.getitimer(self.itimer) == (0.0, 0.0):
-                break # sig_prof handler stopped this itimer
-        else: # Issue 8424
+                # sig_prof handler stopped this itimer
+                break
+        else:
+            # bpo-8424
             self.skipTest("timeout: likely cause: machine too slow or load too "
                           "high")
 
@@ -1307,8 +1309,6 @@ def handler(signum, frame):
         self.setsig(signal.SIGALRM, handler)  # for ITIMER_REAL
 
         expected_sigs = 0
-        deadline = time.monotonic() + support.SHORT_TIMEOUT
-
         while expected_sigs < N:
             # Hopefully the SIGALRM will be received somewhere during
             # initial processing of SIGUSR1.
@@ -1317,8 +1317,9 @@ def handler(signum, frame):
 
             expected_sigs += 2
             # Wait for handlers to run to avoid signal coalescing
-            while len(sigs) < expected_sigs and time.monotonic() < deadline:
-                time.sleep(1e-5)
+            for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+                if len(sigs) >= expected_sigs:
+                    break
 
         # All ITIMER_REAL signals should have been delivered to the
         # Python handler
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 6a66c167933de..3acafbdaa6ee4 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -2262,11 +2262,8 @@ def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
         # A simple IO loop. Call func(*args) depending on the error we get
         # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
         timeout = kwargs.get('timeout', support.SHORT_TIMEOUT)
-        deadline = time.monotonic() + timeout
         count = 0
-        while True:
-            if time.monotonic() > deadline:
-                self.fail("timeout")
+        for _ in support.busy_retry(timeout):
             errno = None
             count += 1
             try:
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index 23bcceedd71b2..7738ca5e9b433 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -9,7 +9,6 @@
 import sys
 import tempfile
 import textwrap
-import time
 import unittest
 import warnings
 
@@ -461,18 +460,12 @@ def test_reap_children(self):
             # child process: do nothing, just exit
             os._exit(0)
 
-        t0 = time.monotonic()
-        deadline = time.monotonic() + support.SHORT_TIMEOUT
-
         was_altered = support.environment_altered
         try:
             support.environment_altered = False
             stderr = io.StringIO()
 
-            while True:
-                if time.monotonic() > deadline:
-                    self.fail("timeout")
-
+            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                 with support.swap_attr(support.print_warning, 'orig_stderr', stderr):
                     support.reap_children()
 
@@ -481,9 +474,6 @@ def test_reap_children(self):
                 if support.environment_altered:
                     break
 
-                # loop until the child process completed
-                time.sleep(0.100)
-
             msg = "Warning -- reap_children() reaped child process %s" % pid
             self.assertIn(msg, stderr.getvalue())
             self.assertTrue(support.environment_altered)
diff --git a/Lib/test/test_wait3.py b/Lib/test/test_wait3.py
index 4ec7690ac19bb..15d66ae825abf 100644
--- a/Lib/test/test_wait3.py
+++ b/Lib/test/test_wait3.py
@@ -4,7 +4,6 @@
 import os
 import subprocess
 import sys
-import time
 import unittest
 from test.fork_wait import ForkWait
 from test import support
@@ -20,14 +19,12 @@ def wait_impl(self, cpid, *, exitcode):
         # This many iterations can be required, since some previously run
         # tests (e.g. test_ctypes) could have spawned a lot of children
         # very quickly.
-        deadline = time.monotonic() + support.SHORT_TIMEOUT
-        while time.monotonic() <= deadline:
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
             # wait3() shouldn't hang, but some of the buildbots seem to hang
             # in the forking tests.  This is an attempt to fix the problem.
             spid, status, rusage = os.wait3(os.WNOHANG)
             if spid == cpid:
                 break
-            time.sleep(0.1)
 
         self.assertEqual(spid, cpid)
         self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
diff --git a/Lib/test/test_wait4.py b/Lib/test/test_wait4.py
index 24f1aaec60c56..f66c0db1c20e6 100644
--- a/Lib/test/test_wait4.py
+++ b/Lib/test/test_wait4.py
@@ -2,7 +2,6 @@
 """
 
 import os
-import time
 import sys
 import unittest
 from test.fork_wait import ForkWait
@@ -22,14 +21,12 @@ def wait_impl(self, cpid, *, exitcode):
             # Issue #11185: wait4 is broken on AIX and will always return 0
             # with WNOHANG.
             option = 0
-        deadline = time.monotonic() + support.SHORT_TIMEOUT
-        while time.monotonic() <= deadline:
+        for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
             # wait4() shouldn't hang, but some of the buildbots seem to hang
             # in the forking tests.  This is an attempt to fix the problem.
             spid, status, rusage = os.wait4(cpid, option)
             if spid == cpid:
                 break
-            time.sleep(0.1)
         self.assertEqual(spid, cpid)
         self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
         self.assertTrue(rusage)



More information about the Python-checkins mailing list