[Python-checkins] cpython (merge 3.4 -> default): Merge with 3.4

victor.stinner python-checkins at python.org
Sat Jul 12 03:14:59 CEST 2014


http://hg.python.org/cpython/rev/5af54ed3af02
changeset:   91656:5af54ed3af02
parent:      91653:5a299c3ec120
parent:      91655:1852d0e5e4ef
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Sat Jul 12 03:12:30 2014 +0200
summary:
  Merge with 3.4

files:
  Lib/asyncio/base_events.py                    |  56 +++++-
  Lib/asyncio/proactor_events.py                |  36 +++-
  Lib/asyncio/selector_events.py                |  91 ++++++++-
  Lib/asyncio/unix_events.py                    |  36 +++
  Lib/asyncio/windows_events.py                 |  12 +
  Lib/test/test_asyncio/test_proactor_events.py |   7 +-
  Lib/test/test_asyncio/test_selector_events.py |  12 +-
  7 files changed, 220 insertions(+), 30 deletions(-)


diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -94,6 +94,9 @@
         self._active_count = 0
         self._waiters = []
 
+    def __repr__(self):
+        return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
+
     def _attach(self):
         assert self.sockets is not None
         self._active_count += 1
@@ -110,8 +113,6 @@
             return
         self.sockets = None
         for sock in sockets:
-            # closing sockets will call asynchronously the _detach() method
-            # which calls _wakeup() for the last socket
             self._loop._stop_serving(sock)
         if self._active_count == 0:
             self._wakeup()
@@ -276,6 +277,8 @@
             raise RuntimeError("cannot close a running event loop")
         if self._closed:
             return
+        if self._debug:
+            logger.debug("Close %r", self)
         self._closed = True
         self._ready.clear()
         self._scheduled.clear()
@@ -402,10 +405,39 @@
     def set_default_executor(self, executor):
         self._default_executor = executor
 
+    def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
+        msg = ["%s:%r" % (host, port)]
+        if family:
+            msg.append('family=%r' % family)
+        if type:
+            msg.append('type=%r' % type)
+        if proto:
+            msg.append('proto=%r' % proto)
+        if flags:
+            msg.append('flags=%r' % flags)
+        msg = ', '.join(msg)
+        logger.debug('Get addresss info %s', msg)
+
+        t0 = self.time()
+        addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
+        dt = self.time() - t0
+
+        msg = ('Getting addresss info %s took %.3f ms: %r'
+               % (msg, dt * 1e3, addrinfo))
+        if dt >= self.slow_callback_duration:
+            logger.info(msg)
+        else:
+            logger.debug(msg)
+        return addrinfo
+
     def getaddrinfo(self, host, port, *,
                     family=0, type=0, proto=0, flags=0):
-        return self.run_in_executor(None, socket.getaddrinfo,
-                                    host, port, family, type, proto, flags)
+        if self._debug:
+            return self.run_in_executor(None, self._getaddrinfo_debug,
+                                        host, port, family, type, proto, flags)
+        else:
+            return self.run_in_executor(None, socket.getaddrinfo,
+                                        host, port, family, type, proto, flags)
 
     def getnameinfo(self, sockaddr, flags=0):
         return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
@@ -492,6 +524,8 @@
                             sock.close()
                             sock = None
                             continue
+                    if self._debug:
+                        logger.debug("connect %r to %r", sock, address)
                     yield from self.sock_connect(sock, address)
                 except OSError as exc:
                     if sock is not None:
@@ -524,6 +558,9 @@
 
         transport, protocol = yield from self._create_connection_transport(
             sock, protocol_factory, ssl, server_hostname)
+        if self._debug:
+            logger.debug("connected to %s:%r: (%r, %r)",
+                         host, port, transport, protocol)
         return transport, protocol
 
     @coroutine
@@ -614,6 +651,15 @@
         waiter = futures.Future(loop=self)
         transport = self._make_datagram_transport(sock, protocol, r_addr,
                                                   waiter)
+        if self._debug:
+            if local_addr:
+                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
+                            "created: (%r, %r)",
+                            local_addr, remote_addr, transport, protocol)
+            else:
+                logger.debug("Datagram endpoint remote_addr=%r created: "
+                             "(%r, %r)",
+                             remote_addr, transport, protocol)
         yield from waiter
         return transport, protocol
 
@@ -694,6 +740,8 @@
             sock.listen(backlog)
             sock.setblocking(False)
             self._start_serving(protocol_factory, sock, ssl, server)
+        if self._debug:
+            logger.info("%r is serving", server)
         return server
 
     @coroutine
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -41,6 +41,23 @@
             # wait until protocol.connection_made() has been called
             self._loop.call_soon(waiter._set_result_unless_cancelled, None)
 
+    def __repr__(self):
+        info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()]
+        if self._read_fut is not None:
+            ov = "pending" if self._read_fut.ov.pending else "completed"
+            info.append('read=%s' % ov)
+        if self._write_fut is not None:
+            if self._write_fut.ov.pending:
+                info.append("write=pending=%s" % self._pending_write)
+            else:
+                info.append("write=completed")
+        if self._buffer:
+            bufsize = len(self._buffer)
+            info.append('write_bufsize=%s' % bufsize)
+        if self._eof_written:
+            info.append('EOF written')
+        return '<%s>' % ' '.join(info)
+
     def _set_extra(self, sock):
         self._extra['pipe'] = sock
 
@@ -55,7 +72,10 @@
             self._read_fut.cancel()
 
     def _fatal_error(self, exc, message='Fatal error on pipe transport'):
-        if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+        if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+            if self._loop.get_debug():
+                logger.debug("%r: %s", self, message, exc_info=True)
+        else:
             self._loop.call_exception_handler({
                 'message': message,
                 'exception': exc,
@@ -108,7 +128,6 @@
     def __init__(self, loop, sock, protocol, waiter=None,
                  extra=None, server=None):
         super().__init__(loop, sock, protocol, waiter, extra, server)
-        self._read_fut = None
         self._paused = False
         self._loop.call_soon(self._loop_reading)
 
@@ -118,6 +137,8 @@
         if self._paused:
             raise RuntimeError('Already paused')
         self._paused = True
+        if self._loop.get_debug():
+            logger.debug("%r pauses reading", self)
 
     def resume_reading(self):
         if not self._paused:
@@ -126,6 +147,8 @@
         if self._closing:
             return
         self._loop.call_soon(self._loop_reading, self._read_fut)
+        if self._loop.get_debug():
+            logger.debug("%r resumes reading", self)
 
     def _loop_reading(self, fut=None):
         if self._paused:
@@ -166,6 +189,8 @@
             if data:
                 self._protocol.data_received(data)
             elif data is not None:
+                if self._loop.get_debug():
+                    logger.debug("%r received EOF", self)
                 keep_open = self._protocol.eof_received()
                 if not keep_open:
                     self.close()
@@ -401,7 +426,9 @@
         self._ssock.setblocking(False)
         self._csock.setblocking(False)
         self._internal_fds += 1
-        self.call_soon(self._loop_self_reading)
+        # don't check the current loop because _make_self_pipe() is called
+        # from the event loop constructor
+        self._call_soon(self._loop_self_reading, (), check_loop=False)
 
     def _loop_self_reading(self, f=None):
         try:
@@ -426,6 +453,9 @@
             try:
                 if f is not None:
                     conn, addr = f.result()
+                    if self._debug:
+                        logger.debug("%r got a new connection from %r: %r",
+                                     server, addr, conn)
                     protocol = protocol_factory()
                     self._make_socket_transport(
                         conn, protocol,
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -23,6 +23,17 @@
 from .log import logger
 
 
+def _test_selector_event(selector, fd, event):
+    # Test if the selector is monitoring 'event' events
+    # for the file descriptor 'fd'.
+    try:
+        key = selector.get_key(fd)
+    except KeyError:
+        return False
+    else:
+        return bool(key.events & event)
+
+
 class BaseSelectorEventLoop(base_events.BaseEventLoop):
     """Selector event loop.
 
@@ -116,6 +127,9 @@
                            sslcontext=None, server=None):
         try:
             conn, addr = sock.accept()
+            if self._debug:
+                logger.debug("%r got a new connection from %r: %r",
+                             server, addr, conn)
             conn.setblocking(False)
         except (BlockingIOError, InterruptedError, ConnectionAbortedError):
             pass  # False alarm.
@@ -419,6 +433,26 @@
         if self._server is not None:
             self._server._attach()
 
+    def __repr__(self):
+        info = [self.__class__.__name__, 'fd=%s' % self._sock_fd]
+        polling = _test_selector_event(self._loop._selector,
+                                       self._sock_fd, selectors.EVENT_READ)
+        if polling:
+            info.append('read=polling')
+        else:
+            info.append('read=idle')
+
+        polling = _test_selector_event(self._loop._selector,
+                                       self._sock_fd, selectors.EVENT_WRITE)
+        if polling:
+            state = 'polling'
+        else:
+            state = 'idle'
+
+        bufsize = self.get_write_buffer_size()
+        info.append('write=<%s, bufsize=%s>' % (state, bufsize))
+        return '<%s>' % ' '.join(info)
+
     def abort(self):
         self._force_close(None)
 
@@ -433,7 +467,10 @@
 
     def _fatal_error(self, exc, message='Fatal error on transport'):
         # Should be called from exception handler only.
-        if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+        if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+            if self._loop.get_debug():
+                logger.debug("%r: %s", self, message, exc_info=True)
+        else:
             self._loop.call_exception_handler({
                 'message': message,
                 'exception': exc,
@@ -492,6 +529,8 @@
             raise RuntimeError('Already paused')
         self._paused = True
         self._loop.remove_reader(self._sock_fd)
+        if self._loop.get_debug():
+            logger.debug("%r pauses reading", self)
 
     def resume_reading(self):
         if not self._paused:
@@ -500,6 +539,8 @@
         if self._closing:
             return
         self._loop.add_reader(self._sock_fd, self._read_ready)
+        if self._loop.get_debug():
+            logger.debug("%r resumes reading", self)
 
     def _read_ready(self):
         try:
@@ -512,6 +553,8 @@
             if data:
                 self._protocol.data_received(data)
             else:
+                if self._loop.get_debug():
+                    logger.debug("%r received EOF", self)
                 keep_open = self._protocol.eof_received()
                 if keep_open:
                     # We're keeping the connection open so the
@@ -638,31 +681,37 @@
         # SSL-specific extra info.  (peercert is set later)
         self._extra.update(sslcontext=sslcontext)
 
-        self._on_handshake()
+        if self._loop.get_debug():
+            logger.debug("%r starts SSL handshake", self)
+            start_time = self._loop.time()
+        else:
+            start_time = None
+        self._on_handshake(start_time)
 
-    def _on_handshake(self):
+    def _on_handshake(self, start_time):
         try:
             self._sock.do_handshake()
         except ssl.SSLWantReadError:
-            self._loop.add_reader(self._sock_fd, self._on_handshake)
+            self._loop.add_reader(self._sock_fd,
+                                  self._on_handshake, start_time)
             return
         except ssl.SSLWantWriteError:
-            self._loop.add_writer(self._sock_fd, self._on_handshake)
+            self._loop.add_writer(self._sock_fd,
+                                  self._on_handshake, start_time)
             return
-        except Exception as exc:
+        except BaseException as exc:
+            if self._loop.get_debug():
+                logger.warning("%r: SSL handshake failed",
+                               self, exc_info=True)
             self._loop.remove_reader(self._sock_fd)
             self._loop.remove_writer(self._sock_fd)
             self._sock.close()
             if self._waiter is not None:
                 self._waiter.set_exception(exc)
-            return
-        except BaseException as exc:
-            self._loop.remove_reader(self._sock_fd)
-            self._loop.remove_writer(self._sock_fd)
-            self._sock.close()
-            if self._waiter is not None:
-                self._waiter.set_exception(exc)
-            raise
+            if isinstance(exc, Exception):
+                return
+            else:
+                raise
 
         self._loop.remove_reader(self._sock_fd)
         self._loop.remove_writer(self._sock_fd)
@@ -676,6 +725,10 @@
                 try:
                     ssl.match_hostname(peercert, self._server_hostname)
                 except Exception as exc:
+                    if self._loop.get_debug():
+                        logger.warning("%r: SSL handshake failed "
+                                       "on matching the hostname",
+                                       self, exc_info=True)
                     self._sock.close()
                     if self._waiter is not None:
                         self._waiter.set_exception(exc)
@@ -696,6 +749,10 @@
             self._loop.call_soon(self._waiter._set_result_unless_cancelled,
                                  None)
 
+        if self._loop.get_debug():
+            dt = self._loop.time() - start_time
+            logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
+
     def pause_reading(self):
         # XXX This is a bit icky, given the comment at the top of
         # _read_ready().  Is it possible to evoke a deadlock?  I don't
@@ -709,6 +766,8 @@
             raise RuntimeError('Already paused')
         self._paused = True
         self._loop.remove_reader(self._sock_fd)
+        if self._loop.get_debug():
+            logger.debug("%r pauses reading", self)
 
     def resume_reading(self):
         if not self._paused:
@@ -717,6 +776,8 @@
         if self._closing:
             return
         self._loop.add_reader(self._sock_fd, self._read_ready)
+        if self._loop.get_debug():
+            logger.debug("%r resumes reading", self)
 
     def _read_ready(self):
         if self._write_wants_read:
@@ -741,6 +802,8 @@
                 self._protocol.data_received(data)
             else:
                 try:
+                    if self._loop.get_debug():
+                        logger.debug("%r received EOF", self)
                     keep_open = self._protocol.eof_received()
                     if keep_open:
                         logger.warning('returning true from eof_received() '
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -16,6 +16,7 @@
 from . import constants
 from . import events
 from . import selector_events
+from . import selectors
 from . import transports
 from .coroutines import coroutine
 from .log import logger
@@ -272,6 +273,20 @@
             # wait until protocol.connection_made() has been called
             self._loop.call_soon(waiter._set_result_unless_cancelled, None)
 
+    def __repr__(self):
+        info = [self.__class__.__name__, 'fd=%s' % self._fileno]
+        if self._pipe is not None:
+            polling = selector_events._test_selector_event(
+                          self._loop._selector,
+                          self._fileno, selectors.EVENT_READ)
+            if polling:
+                info.append('polling')
+            else:
+                info.append('idle')
+        else:
+            info.append('closed')
+        return '<%s>' % ' '.join(info)
+
     def _read_ready(self):
         try:
             data = os.read(self._fileno, self.max_size)
@@ -283,6 +298,8 @@
             if data:
                 self._protocol.data_received(data)
             else:
+                if self._loop.get_debug():
+                    logger.info("%r was closed by peer", self)
                 self._closing = True
                 self._loop.remove_reader(self._fileno)
                 self._loop.call_soon(self._protocol.eof_received)
@@ -357,11 +374,30 @@
             # wait until protocol.connection_made() has been called
             self._loop.call_soon(waiter._set_result_unless_cancelled, None)
 
+    def __repr__(self):
+        info = [self.__class__.__name__, 'fd=%s' % self._fileno]
+        if self._pipe is not None:
+            polling = selector_events._test_selector_event(
+                          self._loop._selector,
+                          self._fileno, selectors.EVENT_WRITE)
+            if polling:
+                info.append('polling')
+            else:
+                info.append('idle')
+
+            bufsize = self.get_write_buffer_size()
+            info.append('bufsize=%s' % bufsize)
+        else:
+            info.append('closed')
+        return '<%s>' % ' '.join(info)
+
     def get_write_buffer_size(self):
         return sum(len(data) for data in self._buffer)
 
     def _read_ready(self):
         # Pipe was closed by peer.
+        if self._loop.get_debug():
+            logger.info("%r was closed by peer", self)
         if self._buffer:
             self._close(BrokenPipeError())
         else:
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -40,6 +40,18 @@
         super().__init__(loop=loop)
         self.ov = ov
 
+    def __repr__(self):
+        info = [self._state.lower()]
+        if self.ov.pending:
+            info.append('overlapped=pending')
+        else:
+            info.append('overlapped=completed')
+        if self._state == futures._FINISHED:
+            info.append(self._format_result())
+        if self._callbacks:
+            info.append(self._format_callbacks())
+        return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
+
     def cancel(self):
         try:
             self.ov.cancel()
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -358,16 +358,17 @@
         self.loop = EventLoop(self.proactor)
         self.set_event_loop(self.loop, cleanup=False)
 
-    @mock.patch.object(BaseProactorEventLoop, 'call_soon')
+    @mock.patch.object(BaseProactorEventLoop, '_call_soon')
     @mock.patch.object(BaseProactorEventLoop, '_socketpair')
-    def test_ctor(self, socketpair, call_soon):
+    def test_ctor(self, socketpair, _call_soon):
         ssock, csock = socketpair.return_value = (
             mock.Mock(), mock.Mock())
         loop = BaseProactorEventLoop(self.proactor)
         self.assertIs(loop._ssock, ssock)
         self.assertIs(loop._csock, csock)
         self.assertEqual(loop._internal_fds, 1)
-        call_soon.assert_called_with(loop._loop_self_reading)
+        _call_soon.assert_called_with(loop._loop_self_reading, (),
+                                      check_loop=False)
 
     def test_close_self_pipe(self):
         self.loop._close_self_pipe()
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -1092,15 +1092,15 @@
         self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError
         transport = _SelectorSslTransport(
             self.loop, self.sock, self.protocol, self.sslcontext)
-        transport._on_handshake()
-        self.loop.assert_reader(1, transport._on_handshake)
+        transport._on_handshake(None)
+        self.loop.assert_reader(1, transport._on_handshake, None)
 
     def test_on_handshake_writer_retry(self):
         self.sslsock.do_handshake.side_effect = ssl.SSLWantWriteError
         transport = _SelectorSslTransport(
             self.loop, self.sock, self.protocol, self.sslcontext)
-        transport._on_handshake()
-        self.loop.assert_writer(1, transport._on_handshake)
+        transport._on_handshake(None)
+        self.loop.assert_writer(1, transport._on_handshake, None)
 
     def test_on_handshake_exc(self):
         exc = ValueError()
@@ -1108,7 +1108,7 @@
         transport = _SelectorSslTransport(
             self.loop, self.sock, self.protocol, self.sslcontext)
         transport._waiter = asyncio.Future(loop=self.loop)
-        transport._on_handshake()
+        transport._on_handshake(None)
         self.assertTrue(self.sslsock.close.called)
         self.assertTrue(transport._waiter.done())
         self.assertIs(exc, transport._waiter.exception())
@@ -1119,7 +1119,7 @@
         transport._waiter = asyncio.Future(loop=self.loop)
         exc = BaseException()
         self.sslsock.do_handshake.side_effect = exc
-        self.assertRaises(BaseException, transport._on_handshake)
+        self.assertRaises(BaseException, transport._on_handshake, None)
         self.assertTrue(self.sslsock.close.called)
         self.assertTrue(transport._waiter.done())
         self.assertIs(exc, transport._waiter.exception())

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list