[Python-checkins] bpo-36889: Document asyncio Stream and StreamServer (GH-14203)

Andrew Svetlov webhook-mailer at python.org
Mon Jun 24 14:17:06 EDT 2019


https://github.com/python/cpython/commit/6793cce155f8875b10efd746cb0b34cb72263af7
commit: 6793cce155f8875b10efd746cb0b34cb72263af7
branch: master
author: Xtreak <tir.karthi at gmail.com>
committer: Andrew Svetlov <andrew.svetlov at gmail.com>
date: 2019-06-24T21:16:58+03:00
summary:

bpo-36889: Document asyncio Stream and StreamServer (GH-14203)

files:
M Doc/library/asyncio-eventloop.rst
M Doc/library/asyncio-protocol.rst
M Doc/library/asyncio-stream.rst

diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst
index f763fd5f036d..8f7974be66ea 100644
--- a/Doc/library/asyncio-eventloop.rst
+++ b/Doc/library/asyncio-eventloop.rst
@@ -1625,8 +1625,7 @@ Wait until a file descriptor received some data using the
      :meth:`loop.create_connection` method.
 
    * Another similar :ref:`example <asyncio_example_create_connection-streams>`
-     using the high-level :func:`asyncio.open_connection` function
-     and streams.
+     using the high-level :func:`asyncio.connect` function and streams.
 
 
 .. _asyncio_example_unix_signals:
diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst
index f08738dd62bb..3e5a4dd8b891 100644
--- a/Doc/library/asyncio-protocol.rst
+++ b/Doc/library/asyncio-protocol.rst
@@ -810,7 +810,7 @@ data, and waits until the connection is closed::
 .. seealso::
 
    The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
-   example uses the high-level :func:`asyncio.open_connection` function.
+   example uses the high-level :func:`asyncio.connect` function.
 
 
 .. _asyncio-udp-echo-server-protocol:
@@ -977,7 +977,7 @@ Wait until a socket receives data using the
 
    The :ref:`register an open socket to wait for data using streams
    <asyncio_example_create_connection-streams>` example uses high-level streams
-   created by the :func:`open_connection` function in a coroutine.
+   created by the :func:`asyncio.connect` function in a coroutine.
 
 .. _asyncio_example_subprocess_proto:
 
diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst
index 28ca5d5f3396..dfe520de56bf 100644
--- a/Doc/library/asyncio-stream.rst
+++ b/Doc/library/asyncio-stream.rst
@@ -18,17 +18,12 @@ streams::
     import asyncio
 
     async def tcp_echo_client(message):
-        reader, writer = await asyncio.open_connection(
-            '127.0.0.1', 8888)
+        async with asyncio.connect('127.0.0.1', 8888) as stream:
+            print(f'Send: {message!r}')
+            await stream.write(message.encode())
 
-        print(f'Send: {message!r}')
-        await writer.write(message.encode())
-
-        data = await reader.read(100)
-        print(f'Received: {data.decode()!r}')
-
-        print('Close the connection')
-        await writer.close()
+            data = await stream.read(100)
+            print(f'Received: {data.decode()!r}')
 
     asyncio.run(tcp_echo_client('Hello World!'))
 
@@ -42,6 +37,32 @@ The following top-level asyncio functions can be used to create
 and work with streams:
 
 
+.. coroutinefunction:: connect(host=None, port=None, \*, \
+                               limit=2**16, ssl=None, family=0, \
+                               proto=0, flags=0, sock=None, local_addr=None, \
+                               server_hostname=None, ssl_handshake_timeout=None, \
+                               happy_eyeballs_delay=None, interleave=None)
+
+   Connect to TCP socket on *host* : *port* address and return a :class:`Stream`
+   object of mode :attr:`StreamMode.READWRITE`.
+
+
+   *limit* determines the buffer size limit used by the returned :class:`Stream`
+   instance. By default the *limit* is set to 64 KiB.
+
+   The rest of the arguments are passed directly to :meth:`loop.create_connection`.
+
+   The function can be used with ``await`` to get a connected stream::
+
+       stream = await asyncio.connect('127.0.0.1', 8888)
+
+   The function can also be used as an async context manager::
+
+       async with asyncio.connect('127.0.0.1', 8888) as stream:
+           ...
+
+   .. versionadded:: 3.8
+
 .. coroutinefunction:: open_connection(host=None, port=None, \*, \
                           loop=None, limit=None, ssl=None, family=0, \
                           proto=0, flags=0, sock=None, local_addr=None, \
@@ -69,10 +90,10 @@ and work with streams:
 
    .. deprecated-removed:: 3.8 3.10
 
-      `open_connection()` is deprecated in favor of `connect()`.
+      `open_connection()` is deprecated in favor of :func:`connect`.
 
 .. coroutinefunction:: start_server(client_connected_cb, host=None, \
-                          port=None, \*, loop=None, limit=None, \
+                          port=None, \*, loop=None, limit=2**16, \
                           family=socket.AF_UNSPEC, \
                           flags=socket.AI_PASSIVE, sock=None, \
                           backlog=100, ssl=None, reuse_address=None, \
@@ -106,11 +127,58 @@ and work with streams:
 
    .. deprecated-removed:: 3.8 3.10
 
-      `start_server()` is deprecated if favor of `StreamServer()`
+      `start_server()` is deprecated if favor of :class:`StreamServer`
+
+.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16)
+
+   Takes a :term:`file-like object <file object>` *pipe* to return a
+   :class:`Stream` object of the mode :attr:`StreamMode.READ` that has
+   similar API of :class:`StreamReader`. It can also be used as an async context manager.
+
+   *limit* determines the buffer size limit used by the returned :class:`Stream`
+   instance. By default the limit is set to 64 KiB.
 
+   .. versionadded:: 3.8
+
+.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16)
+
+   Takes a :term:`file-like object <file object>` *pipe* to return a
+   :class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has
+   similar API of :class:`StreamWriter`. It can also be used as an async context manager.
+
+   *limit* determines the buffer size limit used by the returned :class:`Stream`
+   instance. By default the limit is set to 64 KiB.
+
+   .. versionadded:: 3.8
 
 .. rubric:: Unix Sockets
 
+.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \
+                           sock=None, server_hostname=None, \
+                           ssl_handshake_timeout=None)
+
+   Establish a Unix socket connection to socket with *path* address and
+   return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE`
+   that can be used as a reader and a writer.
+
+   *limit* determines the buffer size limit used by the returned :class:`Stream`
+   instance. By default the *limit* is set to 64 KiB.
+
+   The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`.
+
+   The function can be used with ``await`` to get a connected stream::
+
+       stream = await asyncio.connect_unix('/tmp/example.sock')
+
+   The function can also be used as an async context manager::
+
+       async with asyncio.connect_unix('/tmp/example.sock') as stream:
+           ...
+
+   .. availability:: Unix.
+
+   .. versionadded:: 3.8
+
 .. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
                         limit=None, ssl=None, sock=None, \
                         server_hostname=None, ssl_handshake_timeout=None)
@@ -134,7 +202,7 @@ and work with streams:
 
    .. deprecated-removed:: 3.8 3.10
 
-      `open_unix_connection()` is deprecated if favor of `connect_unix()`.
+      ``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`.
 
 
 .. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
@@ -160,11 +228,176 @@ and work with streams:
 
    .. deprecated-removed:: 3.8 3.10
 
-      `start_unix_server()` is deprecated in favor of `UnixStreamServer()`.
+      ``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`.
 
 
 ---------
 
+StreamServer
+============
+
+.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \
+                        limit=2**16, family=socket.AF_UNSPEC, \
+                        flags=socket.AI_PASSIVE, sock=None, backlog=100, \
+                        ssl=None, reuse_address=None, reuse_port=None, \
+                        ssl_handshake_timeout=None, shutdown_timeout=60)
+
+   The *client_connected_cb* callback is called whenever a new client
+   connection is established.  It receives a :class:`Stream` object of the
+   mode :attr:`StreamMode.READWRITE`.
+
+   *client_connected_cb* can be a plain callable or a
+   :ref:`coroutine function <coroutine>`; if it is a coroutine function,
+   it will be automatically scheduled as a :class:`Task`.
+
+   *limit* determines the buffer size limit used by the
+   returned :class:`Stream` instance.  By default the *limit*
+   is set to 64 KiB.
+
+   The rest of the arguments are passed directly to
+   :meth:`loop.create_server`.
+
+   .. coroutinemethod:: start_serving()
+
+      Binds to the given host and port to start the server.
+
+   .. coroutinemethod:: serve_forever()
+
+      Start accepting connections until the coroutine is cancelled.
+      Cancellation of ``serve_forever`` task causes the server
+      to be closed.
+
+      This method can be called if the server is already accepting
+      connections.  Only one ``serve_forever`` task can exist per
+      one *Server* object.
+
+   .. method:: is_serving()
+
+      Returns ``True`` if the server is bound and currently serving.
+
+   .. method:: bind()
+
+      Bind the server to the given *host* and *port*. This method is
+      automatically called during ``__aenter__`` when :class:`StreamServer` is
+      used as an async context manager.
+
+   .. method:: is_bound()
+
+      Return ``True`` if the server is bound.
+
+   .. coroutinemethod:: abort()
+
+      Closes the connection and cancels all pending tasks.
+
+   .. coroutinemethod:: close()
+
+      Closes the connection. This method is automatically called during
+      ``__aexit__`` when :class:`StreamServer` is used as an async context
+      manager.
+
+   .. attribute:: sockets
+
+      Returns a tuple of socket objects the server is bound to.
+
+   .. versionadded:: 3.8
+
+
+UnixStreamServer
+================
+
+.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \
+                            limit=2**16, sock=None, backlog=100, \
+                            ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60)
+
+   The *client_connected_cb* callback is called whenever a new client
+   connection is established.  It receives a :class:`Stream` object of the
+   mode :attr:`StreamMode.READWRITE`.
+
+   *client_connected_cb* can be a plain callable or a
+   :ref:`coroutine function <coroutine>`; if it is a coroutine function,
+   it will be automatically scheduled as a :class:`Task`.
+
+   *limit* determines the buffer size limit used by the
+   returned :class:`Stream` instance.  By default the *limit*
+   is set to 64 KiB.
+
+   The rest of the arguments are passed directly to
+   :meth:`loop.create_unix_server`.
+
+   .. coroutinemethod:: start_serving()
+
+      Binds to the given host and port to start the server.
+
+   .. method:: is_serving()
+
+      Returns ``True`` if the server is bound and currently serving.
+
+   .. method:: bind()
+
+      Bind the server to the given *host* and *port*. This method is
+      automatically called during ``__aenter__`` when :class:`UnixStreamServer` is
+      used as an async context manager.
+
+   .. method:: is_bound()
+
+      Return ``True`` if the server is bound.
+
+   .. coroutinemethod:: abort()
+
+      Closes the connection and cancels all pending tasks.
+
+   .. coroutinemethod:: close()
+
+      Closes the connection. This method is automatically called during
+      ``__aexit__`` when :class:`UnixStreamServer` is used as an async context
+      manager.
+
+   .. attribute:: sockets
+
+      Returns a tuple of socket objects the server is bound to.
+
+   .. availability:: Unix.
+
+   .. versionadded:: 3.8
+
+Stream
+======
+
+.. class:: Stream
+
+   Represents a Stream object that provides APIs to read and write data
+   to the IO stream . It includes the API provided by :class:`StreamReader`
+   and :class:`StreamWriter`.
+
+   Do not instantiate *Stream* objects directly; use API like :func:`connect`
+   and :class:`StreamServer` instead.
+
+   .. versionadded:: 3.8
+
+
+StreamMode
+==========
+
+.. class:: StreamMode
+
+   A subclass of :class:`enum.Flag` that defines a set of values that can be
+   used to determine the ``mode`` of :class:`Stream` objects.
+
+   .. data:: READ
+
+   The stream object is readable and provides the API of :class:`StreamReader`.
+
+   .. data:: WRITE
+
+   The stream object is writeable and provides the API of :class:`StreamWriter`.
+
+   .. data:: READWRITE
+
+   The stream object is readable and writeable and provides the API of both
+   :class:`StreamReader` and :class:`StreamWriter`.
+
+  .. versionadded:: 3.8
+
 
 StreamReader
 ============
@@ -366,22 +599,17 @@ Examples
 TCP echo client using streams
 -----------------------------
 
-TCP echo client using the :func:`asyncio.open_connection` function::
+TCP echo client using the :func:`asyncio.connect` function::
 
     import asyncio
 
     async def tcp_echo_client(message):
-        reader, writer = await asyncio.open_connection(
-            '127.0.0.1', 8888)
-
-        print(f'Send: {message!r}')
-        writer.write(message.encode())
+        async with asyncio.connect('127.0.0.1', 8888) as stream:
+            print(f'Send: {message!r}')
+            await stream.write(message.encode())
 
-        data = await reader.read(100)
-        print(f'Received: {data.decode()!r}')
-
-        print('Close the connection')
-        writer.close()
+            data = await stream.read(100)
+            print(f'Received: {data.decode()!r}')
 
     asyncio.run(tcp_echo_client('Hello World!'))
 
@@ -397,32 +625,28 @@ TCP echo client using the :func:`asyncio.open_connection` function::
 TCP echo server using streams
 -----------------------------
 
-TCP echo server using the :func:`asyncio.start_server` function::
+TCP echo server using the :class:`asyncio.StreamServer` class::
 
     import asyncio
 
-    async def handle_echo(reader, writer):
-        data = await reader.read(100)
+    async def handle_echo(stream):
+        data = await stream.read(100)
         message = data.decode()
-        addr = writer.get_extra_info('peername')
+        addr = stream.get_extra_info('peername')
 
         print(f"Received {message!r} from {addr!r}")
 
         print(f"Send: {message!r}")
-        writer.write(data)
-        await writer.drain()
+        await stream.write(data)
 
         print("Close the connection")
-        writer.close()
+        await stream.close()
 
     async def main():
-        server = await asyncio.start_server(
-            handle_echo, '127.0.0.1', 8888)
-
-        addr = server.sockets[0].getsockname()
-        print(f'Serving on {addr}')
-
-        async with server:
+        async with asyncio.StreamServer(
+                handle_echo, '127.0.0.1', 8888) as server:
+            addr = server.sockets[0].getsockname()
+            print(f'Serving on {addr}')
             await server.serve_forever()
 
     asyncio.run(main())
@@ -446,11 +670,9 @@ Simple example querying HTTP headers of the URL passed on the command line::
     async def print_http_headers(url):
         url = urllib.parse.urlsplit(url)
         if url.scheme == 'https':
-            reader, writer = await asyncio.open_connection(
-                url.hostname, 443, ssl=True)
+            stream = await asyncio.connect(url.hostname, 443, ssl=True)
         else:
-            reader, writer = await asyncio.open_connection(
-                url.hostname, 80)
+            stream = await asyncio.connect(url.hostname, 80)
 
         query = (
             f"HEAD {url.path or '/'} HTTP/1.0\r\n"
@@ -458,18 +680,14 @@ Simple example querying HTTP headers of the URL passed on the command line::
             f"\r\n"
         )
 
-        writer.write(query.encode('latin-1'))
-        while True:
-            line = await reader.readline()
-            if not line:
-                break
-
+        stream.write(query.encode('latin-1'))
+        while (line := await stream.readline()):
             line = line.decode('latin1').rstrip()
             if line:
                 print(f'HTTP header> {line}')
 
         # Ignore the body, close the socket
-        writer.close()
+        await stream.close()
 
     url = sys.argv[1]
     asyncio.run(print_http_headers(url))
@@ -490,7 +708,7 @@ Register an open socket to wait for data using streams
 ------------------------------------------------------
 
 Coroutine waiting until a socket receives data using the
-:func:`open_connection` function::
+:func:`asyncio.connect` function::
 
     import asyncio
     import socket
@@ -504,17 +722,15 @@ Coroutine waiting until a socket receives data using the
         rsock, wsock = socket.socketpair()
 
         # Register the open socket to wait for data.
-        reader, writer = await asyncio.open_connection(sock=rsock)
-
-        # Simulate the reception of data from the network
-        loop.call_soon(wsock.send, 'abc'.encode())
+        async with asyncio.connect(sock=rsock) as stream:
+            # Simulate the reception of data from the network
+            loop.call_soon(wsock.send, 'abc'.encode())
 
-        # Wait for data
-        data = await reader.read(100)
+            # Wait for data
+            data = await stream.read(100)
 
-        # Got data, we are done: close the socket
-        print("Received:", data.decode())
-        writer.close()
+            # Got data, we are done: close the socket
+            print("Received:", data.decode())
 
         # Close the second socket
         wsock.close()



More information about the Python-checkins mailing list