[Python-checkins] r87673 - python/branches/py3k/Lib/test/test_concurrent_futures.py

brian.quinlan python-checkins at python.org
Mon Jan 3 03:56:40 CET 2011


Author: brian.quinlan
Date: Mon Jan  3 03:56:39 2011
New Revision: 87673

Log:
Removes the 'Call' class which is used to control execution order and is unreliable on Windows

Modified:
   python/branches/py3k/Lib/test/test_concurrent_futures.py

Modified: python/branches/py3k/Lib/test/test_concurrent_futures.py
==============================================================================
--- python/branches/py3k/Lib/test/test_concurrent_futures.py	(original)
+++ python/branches/py3k/Lib/test/test_concurrent_futures.py	Mon Jan  3 03:56:39 2011
@@ -9,24 +9,16 @@
 # without thread support.
 test.support.import_module('threading')
 
-import io
-import logging
-import multiprocessing
-import sys
 import threading
 import time
 import unittest
 
-if sys.platform.startswith('win'):
-    import ctypes
-    import ctypes.wintypes
-
 from concurrent import futures
 from concurrent.futures._base import (
-    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
-    LOGGER, wait)
+    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
 import concurrent.futures.process
 
+
 def create_future(state=PENDING, exception=None, result=None):
     f = Future()
     f._state = state
@@ -34,6 +26,7 @@
     f._result = result
     return f
 
+
 PENDING_FUTURE = create_future(state=PENDING)
 RUNNING_FUTURE = create_future(state=RUNNING)
 CANCELLED_FUTURE = create_future(state=CANCELLED)
@@ -41,166 +34,65 @@
 EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
 SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
 
+
 def mul(x, y):
     return x * y
 
-class Call(object):
-    """A call that can be submitted to a future.Executor for testing.
-
-    The call signals when it is called and waits for an event before finishing.
-    """
-    CALL_LOCKS = {}
-    def _create_event(self):
-        if sys.platform.startswith('win'):
-            class SECURITY_ATTRIBUTES(ctypes.Structure):
-                _fields_ = [("nLength", ctypes.wintypes.DWORD),
-                            ("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
-                            ("bInheritHandle", ctypes.wintypes.BOOL)]
-
-            s = SECURITY_ATTRIBUTES()
-            s.nLength = ctypes.sizeof(s)
-            s.lpSecurityDescriptor = None
-            s.bInheritHandle = True
-
-            handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
-                                                         True,
-                                                         False,
-                                                         None)
-            assert handle is not None
-            return handle
-        else:
-            event = self.Event[0]()
-            self.CALL_LOCKS[id(event)] = event
-            return id(event)
-
-    def _wait_on_event(self, handle):
-        if sys.platform.startswith('win'):
-            # WaitForSingleObject returns 0 if handle is signaled.
-            r = ctypes.windll.kernel32.WaitForSingleObject(handle, 60 * 1000)
-            if r != 0:
-                message = (
-                    'WaitForSingleObject({}, ...) failed with {}, '
-                    'GetLastError() = {}'.format(
-                            handle, r, ctypes.GetLastError()))
-                logging.critical(message)
-                assert False, message
-        else:
-            self.CALL_LOCKS[handle].wait()
-
-    def _signal_event(self, handle):
-        if sys.platform.startswith('win'):
-            r = ctypes.windll.kernel32.SetEvent(handle)  # Returns 0 on failure.
-            if r == 0:
-                message = (
-                    'SetEvent({}) failed with {}, GetLastError() = {}'.format(
-                            handle, r, ctypes.GetLastError()))
-                logging.critical(message)
-                assert False, message
-        else:
-            self.CALL_LOCKS[handle].set()
-
-    def __init__(self, Event, manual_finish=False, result=42):
-        self.Event = Event
-        self._called_event = self._create_event()
-        self._can_finish = self._create_event()
 
-        self._result = result
+def sleep_and_raise(t):
+    time.sleep(t)
+    raise Exception('this is an exception')
 
-        if not manual_finish:
-            self._signal_event(self._can_finish)
 
-    def wait_on_called(self):
-        self._wait_on_event(self._called_event)
-
-    def set_can(self):
-        self._signal_event(self._can_finish)
-
-    def __call__(self):
-        self._signal_event(self._called_event)
-        self._wait_on_event(self._can_finish)
-
-        return self._result
-
-    def close(self):
-        self.set_can()
-        if sys.platform.startswith('win'):
-            ctypes.windll.kernel32.CloseHandle(self._called_event)
-            ctypes.windll.kernel32.CloseHandle(self._can_finish)
-            self._called_event = None
-            self._can_finish = None
-        else:
-            del self.CALL_LOCKS[self._called_event]
-            del self.CALL_LOCKS[self._can_finish]
-
-class ExceptionCall(Call):
-    def __call__(self):
-        self._signal_event(self._called_event)
-        self._wait_on_event(self._can_finish)
-        raise ZeroDivisionError()
-
-class MapCall(Call):
-    def __init__(self, Event, result=42):
-        super().__init__(Event, manual_finish=True, result=result)
-
-    def __call__(self, manual_finish):
-        if manual_finish:
-            super().__call__()
-        return self._result
-
-class ExecutorShutdownTest(unittest.TestCase):
-    def test_run_after_shutdown(self):
-        self.executor.shutdown()
-        self.assertRaises(RuntimeError,
-                          self.executor.submit,
-                          pow, 2, 5)
+class ExecutorMixin:
+    worker_count = 5
+    def _prime_executor(self):
+        # Make sure that the executor is ready to do work before running the
+        # tests. This should reduce the probability of timeouts in the tests.
+        futures = [self.executor.submit(time.sleep, 0.1)
+                   for _ in range(self.worker_count)]
 
+        for f in futures:
+            f.result()
 
-    def _start_some_futures(self):
-        call1 = Call(self.Event, manual_finish=True)
-        call2 = Call(self.Event, manual_finish=True)
-        call3 = Call(self.Event, manual_finish=True)
 
-        try:
-            self.executor.submit(call1)
-            self.executor.submit(call2)
-            self.executor.submit(call3)
-
-            call1.wait_on_called()
-            call2.wait_on_called()
-            call3.wait_on_called()
-
-            call1.set_can()
-            call2.set_can()
-            call3.set_can()
-        finally:
-            call1.close()
-            call2.close()
-            call3.close()
-
-class ThreadPoolMixin:
-    # wrap in tuple to prevent creation of instance methods
-    Event = (threading.Event,)
+class ThreadPoolMixin(ExecutorMixin):
     def setUp(self):
         self.executor = futures.ThreadPoolExecutor(max_workers=5)
+        self._prime_executor()
 
     def tearDown(self):
         self.executor.shutdown(wait=True)
 
-class ProcessPoolMixin:
-    # wrap in tuple to prevent creation of instance methods
-    Event = (multiprocessing.Event,)
+
+class ProcessPoolMixin(ExecutorMixin):
     def setUp(self):
         try:
             self.executor = futures.ProcessPoolExecutor(max_workers=5)
         except NotImplementedError as e:
             self.skipTest(str(e))
+        self._prime_executor()
 
     def tearDown(self):
         self.executor.shutdown(wait=True)
 
+
+class ExecutorShutdownTest(unittest.TestCase):
+    def test_run_after_shutdown(self):
+        self.executor.shutdown()
+        self.assertRaises(RuntimeError,
+                          self.executor.submit,
+                          pow, 2, 5)
+
+
 class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
+    def _prime_executor(self):
+        pass
+
     def test_threads_terminate(self):
-        self._start_some_futures()
+        self.executor.submit(mul, 21, 2)
+        self.executor.submit(mul, 6, 7)
+        self.executor.submit(mul, 3, 14)
         self.assertEqual(len(self.executor._threads), 3)
         self.executor.shutdown()
         for t in self.executor._threads:
@@ -224,9 +116,15 @@
         for t in threads:
             t.join()
 
+
 class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
+    def _prime_executor(self):
+        pass
+
     def test_processes_terminate(self):
-        self._start_some_futures()
+        self.executor.submit(mul, 21, 2)
+        self.executor.submit(mul, 6, 7)
+        self.executor.submit(mul, 3, 14)
         self.assertEqual(len(self.executor._processes), 5)
         processes = self.executor._processes
         self.executor.shutdown()
@@ -236,11 +134,11 @@
 
     def test_context_manager_shutdown(self):
         with futures.ProcessPoolExecutor(max_workers=5) as e:
-            executor = e
+            processes = e._processes
             self.assertEqual(list(e.map(abs, range(-5, 5))),
                              [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
 
-        for p in self.executor._processes:
+        for p in processes:
             p.join()
 
     def test_del_shutdown(self):
@@ -256,288 +154,158 @@
 
 class WaitTests(unittest.TestCase):
     def test_first_completed(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
-
-        call1 = Call(self.Event, manual_finish=True)
-        call2 = Call(self.Event, manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
+        future1 = self.executor.submit(mul, 21, 2)
+        future2 = self.executor.submit(time.sleep, 5)
 
-            t = threading.Thread(target=wait_test)
-            t.start()
-            done, not_done = futures.wait(
-                    [CANCELLED_FUTURE, future1, future2],
-                     return_when=futures.FIRST_COMPLETED)
-
-            self.assertEqual(set([future1]), done)
-            self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
-        finally:
-            call1.close()
-            call2.close()
-
-    def test_first_completed_one_already_completed(self):
-        call1 = Call(self.Event, manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
+        done, not_done = futures.wait(
+                [CANCELLED_FUTURE, future1, future2],
+                 return_when=futures.FIRST_COMPLETED)
+
+        self.assertEqual(set([future1]), done)
+        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
+
+    def test_first_completed_some_already_completed(self):
+        future1 = self.executor.submit(time.sleep, 2)
+
+        finished, pending = futures.wait(
+                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
+                 return_when=futures.FIRST_COMPLETED)
 
-            finished, pending = futures.wait(
-                     [SUCCESSFUL_FUTURE, future1],
-                     return_when=futures.FIRST_COMPLETED)
-
-            self.assertEqual(set([SUCCESSFUL_FUTURE]), finished)
-            self.assertEqual(set([future1]), pending)
-        finally:
-            call1.close()
+        self.assertEqual(
+                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
+                finished)
+        self.assertEqual(set([future1]), pending)
 
     def test_first_exception(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
-            call2.set_can()
-
-        call1 = Call(self.Event, manual_finish=True)
-        call2 = ExceptionCall(self.Event, manual_finish=True)
-        call3 = Call(self.Event, manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
-            future3 = self.executor.submit(call3)
-
-            t = threading.Thread(target=wait_test)
-            t.start()
-            finished, pending = futures.wait(
-                    [future1, future2, future3],
-                    return_when=futures.FIRST_EXCEPTION)
-
-            self.assertEqual(set([future1, future2]), finished)
-            self.assertEqual(set([future3]), pending)
-        finally:
-            call1.close()
-            call2.close()
-            call3.close()
+        future1 = self.executor.submit(mul, 2, 21)
+        future2 = self.executor.submit(sleep_and_raise, 5)
+        future3 = self.executor.submit(time.sleep, 10)
+
+        finished, pending = futures.wait(
+                [future1, future2, future3],
+                return_when=futures.FIRST_EXCEPTION)
 
-    def test_first_exception_some_already_complete(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
+        self.assertEqual(set([future1, future2]), finished)
+        self.assertEqual(set([future3]), pending)
 
-        call1 = ExceptionCall(self.Event, manual_finish=True)
-        call2 = Call(self.Event, manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
+    def test_first_exception_some_already_complete(self):
+        future1 = self.executor.submit(divmod, 21, 0)
+        future2 = self.executor.submit(time.sleep, 5)
 
-            t = threading.Thread(target=wait_test)
-            t.start()
-            finished, pending = futures.wait(
-                    [SUCCESSFUL_FUTURE,
-                     CANCELLED_FUTURE,
-                     CANCELLED_AND_NOTIFIED_FUTURE,
-                     future1, future2],
-                    return_when=futures.FIRST_EXCEPTION)
-
-            self.assertEqual(set([SUCCESSFUL_FUTURE,
-                                  CANCELLED_AND_NOTIFIED_FUTURE,
-                                  future1]), finished)
-            self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
-
-
-        finally:
-            call1.close()
-            call2.close()
+        finished, pending = futures.wait(
+                [SUCCESSFUL_FUTURE,
+                 CANCELLED_FUTURE,
+                 CANCELLED_AND_NOTIFIED_FUTURE,
+                 future1, future2],
+                return_when=futures.FIRST_EXCEPTION)
+
+        self.assertEqual(set([SUCCESSFUL_FUTURE,
+                              CANCELLED_AND_NOTIFIED_FUTURE,
+                              future1]), finished)
+        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
 
     def test_first_exception_one_already_failed(self):
-        call1 = Call(self.Event, manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
+        future1 = self.executor.submit(time.sleep, 2)
 
-            finished, pending = futures.wait(
-                     [EXCEPTION_FUTURE, future1],
-                     return_when=futures.FIRST_EXCEPTION)
-
-            self.assertEqual(set([EXCEPTION_FUTURE]), finished)
-            self.assertEqual(set([future1]), pending)
-        finally:
-            call1.close()
+        finished, pending = futures.wait(
+                 [EXCEPTION_FUTURE, future1],
+                 return_when=futures.FIRST_EXCEPTION)
 
-    def test_all_completed(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
-            call2.set_can()
+        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
+        self.assertEqual(set([future1]), pending)
 
-        call1 = Call(self.Event, manual_finish=True)
-        call2 = Call(self.Event, manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
+    def test_all_completed(self):
+        future1 = self.executor.submit(divmod, 2, 0)
+        future2 = self.executor.submit(mul, 2, 21)
 
-            t = threading.Thread(target=wait_test)
-            t.start()
-            finished, pending = futures.wait(
-                    [future1, future2],
-                    return_when=futures.ALL_COMPLETED)
-
-            self.assertEqual(set([future1, future2]), finished)
-            self.assertEqual(set(), pending)
-        finally:
-            call1.close()
-            call2.close()
-
-    @unittest.skip # XXX skip the test for now as it hangs
-    def test_all_completed_some_already_completed(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-
-            future4.cancel()
-            call1.set_can()
-            call2.set_can()
-            call3.set_can()
-
-        self.assertLessEqual(
-                futures.process.EXTRA_QUEUED_CALLS,
-                1,
-               'this test assumes that future4 will be cancelled before it is '
-               'queued to run - which might not be the case if '
-               'ProcessPoolExecutor is too aggresive in scheduling futures')
-        call1 = Call(self.Event, manual_finish=True)
-        call2 = Call(self.Event, manual_finish=True)
-        call3 = Call(self.Event, manual_finish=True)
-        call4 = Call(self.Event, manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
-            future3 = self.executor.submit(call3)
-            future4 = self.executor.submit(call4)
-
-            t = threading.Thread(target=wait_test)
-            t.start()
-            finished, pending = futures.wait(
-                    [SUCCESSFUL_FUTURE,
-                     CANCELLED_AND_NOTIFIED_FUTURE,
-                     future1, future2, future3, future4],
-                    return_when=futures.ALL_COMPLETED)
-
-            self.assertEqual(set([SUCCESSFUL_FUTURE,
-                                  CANCELLED_AND_NOTIFIED_FUTURE,
-                                  future1, future2, future3, future4]),
-                             finished)
-            self.assertEqual(set(), pending)
-        finally:
-            call1.close()
-            call2.close()
-            call3.close()
-            call4.close()
+        finished, pending = futures.wait(
+                [SUCCESSFUL_FUTURE,
+                 CANCELLED_AND_NOTIFIED_FUTURE,
+                 EXCEPTION_FUTURE,
+                 future1,
+                 future2],
+                return_when=futures.ALL_COMPLETED)
+
+        self.assertEqual(set([SUCCESSFUL_FUTURE,
+                              CANCELLED_AND_NOTIFIED_FUTURE,
+                              EXCEPTION_FUTURE,
+                              future1,
+                              future2]), finished)
+        self.assertEqual(set(), pending)
 
     def test_timeout(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
-
-        call1 = Call(self.Event, manual_finish=True)
-        call2 = Call(self.Event, manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
-
-            t = threading.Thread(target=wait_test)
-            t.start()
-            finished, pending = futures.wait(
-                    [CANCELLED_AND_NOTIFIED_FUTURE,
-                     EXCEPTION_FUTURE,
-                     SUCCESSFUL_FUTURE,
-                     future1, future2],
-                    timeout=5,
-                    return_when=futures.ALL_COMPLETED)
-
-            self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
-                                  EXCEPTION_FUTURE,
-                                  SUCCESSFUL_FUTURE,
-                                  future1]), finished)
-            self.assertEqual(set([future2]), pending)
-
+        future1 = self.executor.submit(mul, 6, 7)
+        future2 = self.executor.submit(time.sleep, 10)
 
-        finally:
-            call1.close()
-            call2.close()
+        finished, pending = futures.wait(
+                [CANCELLED_AND_NOTIFIED_FUTURE,
+                 EXCEPTION_FUTURE,
+                 SUCCESSFUL_FUTURE,
+                 future1, future2],
+                timeout=5,
+                return_when=futures.ALL_COMPLETED)
+
+        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+                              EXCEPTION_FUTURE,
+                              SUCCESSFUL_FUTURE,
+                              future1]), finished)
+        self.assertEqual(set([future2]), pending)
 
 
 class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
     pass
 
+
 class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
     pass
 
+
 class AsCompletedTests(unittest.TestCase):
     # TODO(brian at sweetapp.com): Should have a test with a non-zero timeout.
     def test_no_timeout(self):
-        def wait_test():
-            while not future1._waiters:
-                pass
-            call1.set_can()
-            call2.set_can()
+        future1 = self.executor.submit(mul, 2, 21)
+        future2 = self.executor.submit(mul, 7, 6)
 
-        call1 = Call(self.Event, manual_finish=True)
-        call2 = Call(self.Event, manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            future2 = self.executor.submit(call2)
+        completed = set(futures.as_completed(
+                [CANCELLED_AND_NOTIFIED_FUTURE,
+                 EXCEPTION_FUTURE,
+                 SUCCESSFUL_FUTURE,
+                 future1, future2]))
+        self.assertEqual(set(
+                [CANCELLED_AND_NOTIFIED_FUTURE,
+                 EXCEPTION_FUTURE,
+                 SUCCESSFUL_FUTURE,
+                 future1, future2]),
+                completed)
 
-            t = threading.Thread(target=wait_test)
-            t.start()
-            completed = set(futures.as_completed(
-                    [CANCELLED_AND_NOTIFIED_FUTURE,
-                     EXCEPTION_FUTURE,
-                     SUCCESSFUL_FUTURE,
-                     future1, future2]))
-            self.assertEqual(set(
+    def test_zero_timeout(self):
+        future1 = self.executor.submit(time.sleep, 2)
+        completed_futures = set()
+        try:
+            for future in futures.as_completed(
                     [CANCELLED_AND_NOTIFIED_FUTURE,
                      EXCEPTION_FUTURE,
                      SUCCESSFUL_FUTURE,
-                     future1, future2]),
-                    completed)
-        finally:
-            call1.close()
-            call2.close()
+                     future1],
+                    timeout=0):
+                completed_futures.add(future)
+        except futures.TimeoutError:
+            pass
+
+        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+                              EXCEPTION_FUTURE,
+                              SUCCESSFUL_FUTURE]),
+                         completed_futures)
 
-    def test_zero_timeout(self):
-        call1 = Call(self.Event, manual_finish=True)
-        try:
-            future1 = self.executor.submit(call1)
-            completed_futures = set()
-            try:
-                for future in futures.as_completed(
-                        [CANCELLED_AND_NOTIFIED_FUTURE,
-                         EXCEPTION_FUTURE,
-                         SUCCESSFUL_FUTURE,
-                         future1],
-                        timeout=0):
-                    completed_futures.add(future)
-            except futures.TimeoutError:
-                pass
-
-            self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
-                                  EXCEPTION_FUTURE,
-                                  SUCCESSFUL_FUTURE]),
-                             completed_futures)
-        finally:
-            call1.close()
 
 class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
     pass
 
+
 class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
     pass
 
+
 class ExecutorTest(unittest.TestCase):
     # Executor.shutdown() and context manager usage is tested by
     # ExecutorShutdownTest.
@@ -562,28 +330,27 @@
 
     def test_map_timeout(self):
         results = []
-        timeout_call = MapCall(self.Event)
         try:
-            try:
-                for i in self.executor.map(timeout_call,
-                                           [False, False, True],
-                                           timeout=5):
-                    results.append(i)
-            except futures.TimeoutError:
-                pass
-            else:
-                self.fail('expected TimeoutError')
-        finally:
-            timeout_call.close()
+            for i in self.executor.map(time.sleep,
+                                       [0, 0, 10],
+                                       timeout=5):
+                results.append(i)
+        except futures.TimeoutError:
+            pass
+        else:
+            self.fail('expected TimeoutError')
+
+        self.assertEqual([None, None], results)
 
-        self.assertEqual([42, 42], results)
 
 class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
     pass
 
+
 class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
     pass
 
+
 class FutureTests(unittest.TestCase):
     def test_done_callback_with_result(self):
         callback_result = None


More information about the Python-checkins mailing list