[Python-checkins] cpython (merge 3.2 -> default): Issue #11815: Remove dead code in concurrent.futures (since a blocking Queue

antoine.pitrou python-checkins at python.org
Tue Apr 12 17:50:31 CEST 2011


http://hg.python.org/cpython/rev/eb751e3cb753
changeset:   69269:eb751e3cb753
parent:      69265:5e87dd117f74
parent:      69268:bfc586c558ed
user:        Antoine Pitrou <solipsis at pitrou.net>
date:        Tue Apr 12 17:50:20 2011 +0200
summary:
  Issue #11815: Remove dead code in concurrent.futures (since a blocking Queue
cannot raise queue.Empty).

files:
  Lib/concurrent/futures/process.py |  67 ++++++------------
  Lib/concurrent/futures/thread.py  |  12 +--
  2 files changed, 28 insertions(+), 51 deletions(-)


diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -104,7 +104,7 @@
         self.args = args
         self.kwargs = kwargs
 
-def _process_worker(call_queue, result_queue, shutdown):
+def _process_worker(call_queue, result_queue):
     """Evaluates calls from call_queue and places the results in result_queue.
 
     This worker is run in a separate process.
@@ -118,24 +118,19 @@
             worker that it should exit when call_queue is empty.
     """
     while True:
+        call_item = call_queue.get(block=True)
+        if call_item is None:
+            # Wake up queue management thread
+            result_queue.put(None)
+            return
         try:
-            call_item = call_queue.get(block=True)
-        except queue.Empty:
-            if shutdown.is_set():
-                return
+            r = call_item.fn(*call_item.args, **call_item.kwargs)
+        except BaseException as e:
+            result_queue.put(_ResultItem(call_item.work_id,
+                                         exception=e))
         else:
-            if call_item is None:
-                # Wake up queue management thread
-                result_queue.put(None)
-                return
-            try:
-                r = call_item.fn(*call_item.args, **call_item.kwargs)
-            except BaseException as e:
-                result_queue.put(_ResultItem(call_item.work_id,
-                                             exception=e))
-            else:
-                result_queue.put(_ResultItem(call_item.work_id,
-                                             result=r))
+            result_queue.put(_ResultItem(call_item.work_id,
+                                         result=r))
 
 def _add_call_item_to_queue(pending_work_items,
                             work_ids,
@@ -179,8 +174,7 @@
                               pending_work_items,
                               work_ids_queue,
                               call_queue,
-                              result_queue,
-                              shutdown_process_event):
+                              result_queue):
     """Manages the communication between this process and the worker processes.
 
     This function is run in a local thread.
@@ -198,9 +192,6 @@
             derived from _WorkItems for processing by the process workers.
         result_queue: A multiprocessing.Queue of _ResultItems generated by the
             process workers.
-        shutdown_process_event: A multiprocessing.Event used to signal the
-            process workers that they should exit when their work queue is
-            empty.
     """
     nb_shutdown_processes = 0
     def shutdown_one_process():
@@ -213,20 +204,16 @@
                                 work_ids_queue,
                                 call_queue)
 
-        try:
-            result_item = result_queue.get(block=True)
-        except queue.Empty:
-            pass
-        else:
-            if result_item is not None:
-                work_item = pending_work_items[result_item.work_id]
-                del pending_work_items[result_item.work_id]
+        result_item = result_queue.get(block=True)
+        if result_item is not None:
+            work_item = pending_work_items[result_item.work_id]
+            del pending_work_items[result_item.work_id]
 
-                if result_item.exception:
-                    work_item.future.set_exception(result_item.exception)
-                else:
-                    work_item.future.set_result(result_item.result)
-                continue
+            if result_item.exception:
+                work_item.future.set_exception(result_item.exception)
+            else:
+                work_item.future.set_result(result_item.result)
+            continue
         # If we come here, we either got a timeout or were explicitly woken up.
         # In either case, check whether we should start shutting down.
         executor = executor_reference()
@@ -238,8 +225,6 @@
             # Since no new work items can be added, it is safe to shutdown
             # this thread if there are no pending work items.
             if not pending_work_items:
-                shutdown_process_event.set()
-
                 while nb_shutdown_processes < len(processes):
                     shutdown_one_process()
                 # If .join() is not called on the created processes then
@@ -306,7 +291,6 @@
 
         # Shutdown is a two-step process.
         self._shutdown_thread = False
-        self._shutdown_process_event = multiprocessing.Event()
         self._shutdown_lock = threading.Lock()
         self._queue_count = 0
         self._pending_work_items = {}
@@ -324,8 +308,7 @@
                           self._pending_work_items,
                           self._work_ids,
                           self._call_queue,
-                          self._result_queue,
-                          self._shutdown_process_event))
+                          self._result_queue))
             self._queue_management_thread.daemon = True
             self._queue_management_thread.start()
             _threads_queues[self._queue_management_thread] = self._result_queue
@@ -335,8 +318,7 @@
             p = multiprocessing.Process(
                     target=_process_worker,
                     args=(self._call_queue,
-                          self._result_queue,
-                          self._shutdown_process_event))
+                          self._result_queue))
             p.start()
             self._processes.add(p)
 
@@ -372,7 +354,6 @@
         self._queue_management_thread = None
         self._call_queue = None
         self._result_queue = None
-        self._shutdown_process_event = None
         self._processes = None
     shutdown.__doc__ = _base.Executor.shutdown.__doc__
 
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -60,14 +60,10 @@
 def _worker(executor_reference, work_queue):
     try:
         while True:
-            try:
-                work_item = work_queue.get(block=True)
-            except queue.Empty:
-                pass
-            else:
-                if work_item is not None:
-                    work_item.run()
-                    continue
+            work_item = work_queue.get(block=True)
+            if work_item is not None:
+                work_item.run()
+                continue
             executor = executor_reference()
             # Exit if:
             #   - The interpreter is shutting down OR

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list