[Python-checkins] cpython (merge 3.2 -> default): Add tests for the atexit hook in concurrent.futures (part of #11635)

antoine.pitrou python-checkins at python.org
Thu Mar 24 15:49:12 CET 2011


http://hg.python.org/cpython/rev/d6bbde982c1c
changeset:   68894:d6bbde982c1c
parent:      68892:c177faafec51
parent:      68893:76a898433a02
user:        Antoine Pitrou <solipsis at pitrou.net>
date:        Thu Mar 24 15:48:26 2011 +0100
summary:
  Add tests for the atexit hook in concurrent.futures (part of #11635)

files:
  Lib/test/test_concurrent_futures.py |  55 ++++++++++++----
  1 files changed, 40 insertions(+), 15 deletions(-)


diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -9,6 +9,9 @@
 # without thread support.
 test.support.import_module('threading')
 
+from test.script_helper import assert_python_ok
+
+import sys
 import threading
 import time
 import unittest
@@ -43,9 +46,30 @@
     time.sleep(t)
     raise Exception('this is an exception')
 
+def sleep_and_print(t, msg):
+    time.sleep(t)
+    print(msg)
+    sys.stdout.flush()
+
 
 class ExecutorMixin:
     worker_count = 5
+
+    def setUp(self):
+        self.t1 = time.time()
+        try:
+            self.executor = self.executor_type(max_workers=self.worker_count)
+        except NotImplementedError as e:
+            self.skipTest(str(e))
+        self._prime_executor()
+
+    def tearDown(self):
+        self.executor.shutdown(wait=True)
+        dt = time.time() - self.t1
+        if test.support.verbose:
+            print("%.2fs" % dt, end=' ')
+        self.assertLess(dt, 60, "synchronization issue: test lasted too long")
+
     def _prime_executor(self):
         # Make sure that the executor is ready to do work before running the
         # tests. This should reduce the probability of timeouts in the tests.
@@ -57,24 +81,11 @@
 
 
 class ThreadPoolMixin(ExecutorMixin):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_workers=5)
-        self._prime_executor()
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+    executor_type = futures.ThreadPoolExecutor
 
 
 class ProcessPoolMixin(ExecutorMixin):
-    def setUp(self):
-        try:
-            self.executor = futures.ProcessPoolExecutor(max_workers=5)
-        except NotImplementedError as e:
-            self.skipTest(str(e))
-        self._prime_executor()
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+    executor_type = futures.ProcessPoolExecutor
 
 
 class ExecutorShutdownTest(unittest.TestCase):
@@ -84,6 +95,20 @@
                           self.executor.submit,
                           pow, 2, 5)
 
+    def test_interpreter_shutdown(self):
+        # Test the atexit hook for shutdown of worker threads and processes
+        rc, out, err = assert_python_ok('-c', """if 1:
+            from concurrent.futures import {executor_type}
+            from time import sleep
+            from test.test_concurrent_futures import sleep_and_print
+            t = {executor_type}(5)
+            t.submit(sleep_and_print, 1.0, "apple")
+            """.format(executor_type=self.executor_type.__name__))
+        # Errors in atexit hooks don't change the process exit code, check
+        # stderr manually.
+        self.assertFalse(err)
+        self.assertEqual(out.strip(), b"apple")
+
 
 class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
     def _prime_executor(self):

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list