[Python-checkins] cpython (merge 3.4 -> default): Merge 3.4 (asyncio)

victor.stinner python-checkins at python.org
Wed Jan 21 23:41:53 CET 2015


https://hg.python.org/cpython/rev/15671d3aaca3
changeset:   94232:15671d3aaca3
parent:      94227:0893b9ee44ea
parent:      94231:fb8a093db8b1
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Wed Jan 21 23:40:19 2015 +0100
summary:
  Merge 3.4 (asyncio)

files:
  Lib/asyncio/base_events.py                    |    2 +-
  Lib/asyncio/proactor_events.py                |   12 +-
  Lib/asyncio/selector_events.py                |    1 -
  Lib/asyncio/windows_events.py                 |  168 +++++++--
  Lib/test/test_asyncio/test_proactor_events.py |    5 +-
  Lib/test/test_asyncio/test_streams.py         |   10 +-
  Modules/overlapped.c                          |   25 +
  7 files changed, 173 insertions(+), 50 deletions(-)


diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -636,7 +636,7 @@
 
         try:
             yield from waiter
-        except Exception as exc:
+        except Exception:
             transport.close()
             raise
 
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -463,9 +463,15 @@
             if f is not None:
                 f.result()  # may raise
             f = self._proactor.recv(self._ssock, 4096)
-        except:
-            self.close()
-            raise
+        except futures.CancelledError:
+            # _close_self_pipe() has been called, stop waiting for data
+            return
+        except Exception as exc:
+            self.call_exception_handler({
+                'message': 'Error on reading from the event loop self pipe',
+                'exception': exc,
+                'loop': self,
+            })
         else:
             self._self_reading_future = f
             f.add_done_callback(self._loop_self_reading)
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -10,7 +10,6 @@
 import errno
 import functools
 import socket
-import sys
 try:
     import ssl
 except ImportError:  # pragma: no cover
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -78,20 +78,23 @@
         self._ov = None
 
 
-class _WaitHandleFuture(futures.Future):
+class _BaseWaitHandleFuture(futures.Future):
     """Subclass of Future which represents a wait handle."""
 
-    def __init__(self, iocp, ov, handle, wait_handle, *, loop=None):
+    def __init__(self, ov, handle, wait_handle, *, loop=None):
         super().__init__(loop=loop)
         if self._source_traceback:
             del self._source_traceback[-1]
-        # iocp and ov are only used by cancel() to notify IocpProactor
-        # that the wait was cancelled
-        self._iocp = iocp
+        # Keep a reference to the Overlapped object to keep it alive until the
+        # wait is unregistered
         self._ov = ov
         self._handle = handle
         self._wait_handle = wait_handle
 
+        # Should we call UnregisterWaitEx() if the wait completes
+        # or is cancelled?
+        self._registered = True
+
     def _poll(self):
         # non-blocking wait: use a timeout of 0 millisecond
         return (_winapi.WaitForSingleObject(self._handle, 0) ==
@@ -99,21 +102,32 @@
 
     def _repr_info(self):
         info = super()._repr_info()
-        info.insert(1, 'handle=%#x' % self._handle)
-        if self._wait_handle:
+        info.append('handle=%#x' % self._handle)
+        if self._handle is not None:
             state = 'signaled' if self._poll() else 'waiting'
-            info.insert(1, 'wait_handle=<%s, %#x>'
-                           % (state, self._wait_handle))
+            info.append(state)
+        if self._wait_handle is not None:
+            info.append('wait_handle=%#x' % self._wait_handle)
         return info
 
+    def _unregister_wait_cb(self, fut):
+        # The wait was unregistered: it's not safe to destroy the Overlapped
+        # object
+        self._ov = None
+
     def _unregister_wait(self):
-        if self._wait_handle is None:
+        if not self._registered:
             return
+        self._registered = False
+
         try:
             _overlapped.UnregisterWait(self._wait_handle)
         except OSError as exc:
-            # ERROR_IO_PENDING is not an error, the wait was unregistered
-            if exc.winerror != _overlapped.ERROR_IO_PENDING:
+            self._wait_handle = None
+            if exc.winerror == _overlapped.ERROR_IO_PENDING:
+                # ERROR_IO_PENDING is not an error, the wait was unregistered
+                self._unregister_wait_cb(None)
+            elif exc.winerror != _overlapped.ERROR_IO_PENDING:
                 context = {
                     'message': 'Failed to unregister the wait handle',
                     'exception': exc,
@@ -122,26 +136,91 @@
                 if self._source_traceback:
                     context['source_traceback'] = self._source_traceback
                 self._loop.call_exception_handler(context)
-        self._wait_handle = None
-        self._iocp = None
-        self._ov = None
+        else:
+            self._wait_handle = None
+            self._unregister_wait_cb(None)
 
     def cancel(self):
-        result = super().cancel()
-        if self._ov is not None:
-            # signal the cancellation to the overlapped object
-            _overlapped.PostQueuedCompletionStatus(self._iocp, True,
-                                                   0, self._ov.address)
         self._unregister_wait()
-        return result
+        return super().cancel()
 
     def set_exception(self, exception):
+        self._unregister_wait()
         super().set_exception(exception)
-        self._unregister_wait()
 
     def set_result(self, result):
+        self._unregister_wait()
         super().set_result(result)
-        self._unregister_wait()
+
+
+class _WaitCancelFuture(_BaseWaitHandleFuture):
+    """Subclass of Future which represents a wait for the cancellation of a
+    _WaitHandleFuture using an event.
+    """
+
+    def __init__(self, ov, event, wait_handle, *, loop=None):
+        super().__init__(ov, event, wait_handle, loop=loop)
+
+        self._done_callback = None
+
+    def _schedule_callbacks(self):
+        super(_WaitCancelFuture, self)._schedule_callbacks()
+        if self._done_callback is not None:
+            self._done_callback(self)
+
+
+class _WaitHandleFuture(_BaseWaitHandleFuture):
+    def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
+        super().__init__(ov, handle, wait_handle, loop=loop)
+        self._proactor = proactor
+        self._unregister_proactor = True
+        self._event = _overlapped.CreateEvent(None, True, False, None)
+        self._event_fut = None
+
+    def _unregister_wait_cb(self, fut):
+        if self._event is not None:
+            _winapi.CloseHandle(self._event)
+            self._event = None
+            self._event_fut = None
+
+        # If the wait was cancelled, the wait may never be signalled, so
+        # it's required to unregister it. Otherwise, IocpProactor.close() will
+        # wait forever for an event which will never come.
+        #
+        # If the IocpProactor already received the event, it's safe to call
+        # _unregister() because we kept a reference to the Overlapped object
+        # which is used as an unique key.
+        self._proactor._unregister(self._ov)
+        self._proactor = None
+
+        super()._unregister_wait_cb(fut)
+
+    def _unregister_wait(self):
+        if not self._registered:
+            return
+        self._registered = False
+
+        try:
+            _overlapped.UnregisterWaitEx(self._wait_handle, self._event)
+        except OSError as exc:
+            self._wait_handle = None
+            if exc.winerror == _overlapped.ERROR_IO_PENDING:
+                # ERROR_IO_PENDING is not an error, the wait was unregistered
+                self._unregister_wait_cb(None)
+            elif exc.winerror != _overlapped.ERROR_IO_PENDING:
+                context = {
+                    'message': 'Failed to unregister the wait handle',
+                    'exception': exc,
+                    'future': self,
+                }
+                if self._source_traceback:
+                    context['source_traceback'] = self._source_traceback
+                self._loop.call_exception_handler(context)
+        else:
+            self._wait_handle = None
+            self._event_fut = self._proactor._wait_cancel(
+                                                self._event,
+                                                self._unregister_wait_cb)
 
 
 class PipeServer(object):
@@ -291,6 +370,7 @@
             _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
         self._cache = {}
         self._registered = weakref.WeakSet()
+        self._unregistered = []
         self._stopped_serving = weakref.WeakSet()
 
     def __repr__(self):
@@ -438,6 +518,16 @@
         Return a Future object. The result of the future is True if the wait
         completed, or False if the wait did not complete (on timeout).
         """
+        return self._wait_for_handle(handle, timeout, False)
+
+    def _wait_cancel(self, event, done_callback):
+        fut = self._wait_for_handle(event, None, True)
+        # add_done_callback() cannot be used because the wait may only complete
+        # in IocpProactor.close(), while the event loop is not running.
+        fut._done_callback = done_callback
+        return fut
+
+    def _wait_for_handle(self, handle, timeout, _is_cancel):
         if timeout is None:
             ms = _winapi.INFINITE
         else:
@@ -447,9 +537,13 @@
 
         # We only create ov so we can use ov.address as a key for the cache.
         ov = _overlapped.Overlapped(NULL)
-        wh = _overlapped.RegisterWaitWithQueue(
+        wait_handle = _overlapped.RegisterWaitWithQueue(
             handle, self._iocp, ov.address, ms)
-        f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop)
+        if _is_cancel:
+            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
+        else:
+            f = _WaitHandleFuture(ov, handle, wait_handle, self,
+                                  loop=self._loop)
         if f._source_traceback:
             del f._source_traceback[-1]
 
@@ -462,14 +556,6 @@
             # False even though we have not timed out.
             return f._poll()
 
-        if f._poll():
-            try:
-                result = f._poll()
-            except OSError as exc:
-                f.set_exception(exc)
-            else:
-                f.set_result(result)
-
         self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
         return f
 
@@ -521,6 +607,15 @@
             self._cache[ov.address] = (f, ov, obj, callback)
         return f
 
+    def _unregister(self, ov):
+        """Unregister an overlapped object.
+
+        Call this method when its future has been cancelled. The event can
+        already be signalled (pending in the proactor event queue). It is also
+        safe if the event is never signalled (because it was cancelled).
+        """
+        self._unregistered.append(ov)
+
     def _get_accept_socket(self, family):
         s = socket.socket(family)
         s.settimeout(0)
@@ -541,7 +636,7 @@
         while True:
             status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
             if status is None:
-                return
+                break
             ms = 0
 
             err, transferred, key, address = status
@@ -576,6 +671,11 @@
                     f.set_result(value)
                     self._results.append(f)
 
+        # Remove unregisted futures
+        for ov in self._unregistered:
+            self._cache.pop(ov.address, None)
+        self._unregistered.clear()
+
     def _stop_serving(self, obj):
         # obj is a socket or pipe handle.  It will be closed in
         # BaseProactorEventLoop._stop_serving() which will make any
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -523,9 +523,10 @@
 
     def test_loop_self_reading_exception(self):
         self.loop.close = mock.Mock()
+        self.loop.call_exception_handler = mock.Mock()
         self.proactor.recv.side_effect = OSError()
-        self.assertRaises(OSError, self.loop._loop_self_reading)
-        self.assertTrue(self.loop.close.called)
+        self.loop._loop_self_reading()
+        self.assertTrue(self.loop.call_exception_handler.called)
 
     def test_write_to_self(self):
         self.loop._write_to_self()
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -415,10 +415,6 @@
         def set_err():
             stream.set_exception(ValueError())
 
-        @asyncio.coroutine
-        def readline():
-            yield from stream.readline()
-
         t1 = asyncio.Task(stream.readline(), loop=self.loop)
         t2 = asyncio.Task(set_err(), loop=self.loop)
 
@@ -429,11 +425,7 @@
     def test_exception_cancel(self):
         stream = asyncio.StreamReader(loop=self.loop)
 
-        @asyncio.coroutine
-        def read_a_line():
-            yield from stream.readline()
-
-        t = asyncio.Task(read_a_line(), loop=self.loop)
+        t = asyncio.Task(stream.readline(), loop=self.loop)
         test_utils.run_briefly(self.loop)
         t.cancel()
         test_utils.run_briefly(self.loop)
diff --git a/Modules/overlapped.c b/Modules/overlapped.c
--- a/Modules/overlapped.c
+++ b/Modules/overlapped.c
@@ -309,6 +309,29 @@
     Py_RETURN_NONE;
 }
 
+PyDoc_STRVAR(
+    UnregisterWaitEx_doc,
+    "UnregisterWaitEx(WaitHandle, Event) -> None\n\n"
+    "Unregister wait handle.\n");
+
+static PyObject *
+overlapped_UnregisterWaitEx(PyObject *self, PyObject *args)
+{
+    HANDLE WaitHandle, Event;
+    BOOL ret;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE, &WaitHandle, &Event))
+        return NULL;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = UnregisterWaitEx(WaitHandle, Event);
+    Py_END_ALLOW_THREADS
+
+    if (!ret)
+        return SetFromWindowsErr(0);
+    Py_RETURN_NONE;
+}
+
 /*
  * Event functions -- currently only used by tests
  */
@@ -1319,6 +1342,8 @@
      METH_VARARGS, RegisterWaitWithQueue_doc},
     {"UnregisterWait", overlapped_UnregisterWait,
      METH_VARARGS, UnregisterWait_doc},
+    {"UnregisterWaitEx", overlapped_UnregisterWaitEx,
+     METH_VARARGS, UnregisterWaitEx_doc},
     {"CreateEvent", overlapped_CreateEvent,
      METH_VARARGS, CreateEvent_doc},
     {"SetEvent", overlapped_SetEvent,

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list