[Python-checkins] bpo-38242: Revert "bpo-36889: Merge asyncio streams (GH-13251)" (#16482) (#16485)

Yury Selivanov webhook-mailer at python.org
Mon Sep 30 01:30:21 EDT 2019


https://github.com/python/cpython/commit/1c19d656a79a00f58361ceb61c0a6d1faf90c686
commit: 1c19d656a79a00f58361ceb61c0a6d1faf90c686
branch: 3.8
author: Yury Selivanov <yury at edgedb.com>
committer: GitHub <noreply at github.com>
date: 2019-09-29T22:30:17-07:00
summary:

bpo-38242: Revert "bpo-36889: Merge asyncio streams (GH-13251)" (#16482) (#16485)

See https://bugs.python.org/issue38242 for more details

files:
A Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst
M Doc/library/asyncio-api-index.rst
M Doc/library/asyncio-eventloop.rst
M Doc/library/asyncio-protocol.rst
M Doc/library/asyncio-stream.rst
M Lib/asyncio/__init__.py
M Lib/asyncio/streams.py
M Lib/asyncio/subprocess.py
M Lib/test/test_asyncio/test_buffered_proto.py
M Lib/test/test_asyncio/test_pep492.py
M Lib/test/test_asyncio/test_streams.py
M Lib/test/test_asyncio/test_subprocess.py
M Lib/test/test_asyncio/test_windows_events.py

diff --git a/Doc/library/asyncio-api-index.rst b/Doc/library/asyncio-api-index.rst
index 716cf09dc99e..d5b5659abc65 100644
--- a/Doc/library/asyncio-api-index.rst
+++ b/Doc/library/asyncio-api-index.rst
@@ -132,47 +132,23 @@ High-level APIs to work with network IO.
     :widths: 50 50
     :class: full-width-table
 
-    * - ``await`` :func:`connect`
-      -  Establish a TCP connection to send and receive data.
-
     * - ``await`` :func:`open_connection`
-      -  Establish a TCP connection. (Deprecated in favor of :func:`connect`)
-
-    * - ``await`` :func:`connect_unix`
-      -  Establish a Unix socket connection to send and receive data.
+      -  Establish a TCP connection.
 
     * - ``await`` :func:`open_unix_connection`
-      -  Establish a Unix socket connection. (Deprecated in favor of :func:`connect_unix`)
-
-    * - :class:`StreamServer`
-      - Start a TCP server.
+      -  Establish a Unix socket connection.
 
     * - ``await`` :func:`start_server`
-      - Start a TCP server. (Deprecated in favor of :class:`StreamServer`)
-
-    * - :class:`UnixStreamServer`
-      - Start a Unix socket server.
+      - Start a TCP server.
 
     * - ``await`` :func:`start_unix_server`
-      - Start a Unix socket server. (Deprecated in favor of :class:`UnixStreamServer`)
-
-    * - :func:`connect_read_pipe`
-      - Establish a connection to :term:`file-like object <file object>` *pipe*
-        to receive data.
-
-    * - :func:`connect_write_pipe`
-      - Establish a connection to :term:`file-like object <file object>` *pipe*
-        to send data.
-
-    * - :class:`Stream`
-      - Stream is a single object combining APIs of :class:`StreamReader` and
-        :class:`StreamWriter`.
+      - Start a Unix socket server.
 
     * - :class:`StreamReader`
-      - High-level async/await object to receive network data. (Deprecated in favor of :class:`Stream`)
+      - High-level async/await object to receive network data.
 
     * - :class:`StreamWriter`
-      - High-level async/await object to send network data. (Deprecated in favor of :class:`Stream`)
+      - High-level async/await object to send network data.
 
 
 .. rubric:: Examples
diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst
index 8f7974be66ea..f763fd5f036d 100644
--- a/Doc/library/asyncio-eventloop.rst
+++ b/Doc/library/asyncio-eventloop.rst
@@ -1625,7 +1625,8 @@ Wait until a file descriptor received some data using the
      :meth:`loop.create_connection` method.
 
    * Another similar :ref:`example <asyncio_example_create_connection-streams>`
-     using the high-level :func:`asyncio.connect` function and streams.
+     using the high-level :func:`asyncio.open_connection` function
+     and streams.
 
 
 .. _asyncio_example_unix_signals:
diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst
index cb0317ea20c1..67ca12108168 100644
--- a/Doc/library/asyncio-protocol.rst
+++ b/Doc/library/asyncio-protocol.rst
@@ -809,7 +809,7 @@ data, and waits until the connection is closed::
 .. seealso::
 
    The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
-   example uses the high-level :func:`asyncio.connect` function.
+   example uses the high-level :func:`asyncio.open_connection` function.
 
 
 .. _asyncio-udp-echo-server-protocol:
@@ -978,7 +978,7 @@ Wait until a socket receives data using the
 
    The :ref:`register an open socket to wait for data using streams
    <asyncio_example_create_connection-streams>` example uses high-level streams
-   created by the :func:`asyncio.connect` function in a coroutine.
+   created by the :func:`open_connection` function in a coroutine.
 
 .. _asyncio_example_subprocess_proto:
 
diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst
index feebd227eb8c..471e6e9099c2 100644
--- a/Doc/library/asyncio-stream.rst
+++ b/Doc/library/asyncio-stream.rst
@@ -18,12 +18,19 @@ streams::
     import asyncio
 
     async def tcp_echo_client(message):
-        async with asyncio.connect('127.0.0.1', 8888) as stream:
-            print(f'Send: {message!r}')
-            await stream.write(message.encode())
+        reader, writer = await asyncio.open_connection(
+            '127.0.0.1', 8888)
 
-            data = await stream.read(100)
-            print(f'Received: {data.decode()!r}')
+        print(f'Send: {message!r}')
+        writer.write(message.encode())
+        await writer.drain()
+
+        data = await reader.read(100)
+        print(f'Received: {data.decode()!r}')
+
+        print('Close the connection')
+        writer.close()
+        await writer.wait_closed()
 
     asyncio.run(tcp_echo_client('Hello World!'))
 
@@ -37,31 +44,6 @@ The following top-level asyncio functions can be used to create
 and work with streams:
 
 
-.. coroutinefunction:: connect(host=None, port=None, \*, \
-                               limit=2**16, ssl=None, family=0, \
-                               proto=0, flags=0, sock=None, local_addr=None, \
-                               server_hostname=None, ssl_handshake_timeout=None, \
-                               happy_eyeballs_delay=None, interleave=None)
-
-   Connect to TCP socket on *host* : *port* address and return a :class:`Stream`
-   object of mode :attr:`StreamMode.READWRITE`.
-
-   *limit* determines the buffer size limit used by the returned :class:`Stream`
-   instance. By default the *limit* is set to 64 KiB.
-
-   The rest of the arguments are passed directly to :meth:`loop.create_connection`.
-
-   The function can be used with ``await`` to get a connected stream::
-
-       stream = await asyncio.connect('127.0.0.1', 8888)
-
-   The function can also be used as an async context manager::
-
-       async with asyncio.connect('127.0.0.1', 8888) as stream:
-           ...
-
-   .. versionadded:: 3.8
-
 .. coroutinefunction:: open_connection(host=None, port=None, \*, \
                           loop=None, limit=None, ssl=None, family=0, \
                           proto=0, flags=0, sock=None, local_addr=None, \
@@ -87,12 +69,8 @@ and work with streams:
 
       The *ssl_handshake_timeout* parameter.
 
-   .. deprecated-removed:: 3.8 3.10
-
-      `open_connection()` is deprecated in favor of :func:`connect`.
-
 .. coroutinefunction:: start_server(client_connected_cb, host=None, \
-                          port=None, \*, loop=None, limit=2**16, \
+                          port=None, \*, loop=None, limit=None, \
                           family=socket.AF_UNSPEC, \
                           flags=socket.AI_PASSIVE, sock=None, \
                           backlog=100, ssl=None, reuse_address=None, \
@@ -124,60 +102,9 @@ and work with streams:
 
       The *ssl_handshake_timeout* and *start_serving* parameters.
 
-   .. deprecated-removed:: 3.8 3.10
-
-      `start_server()` is deprecated if favor of :class:`StreamServer`
-
-.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16)
-
-   Takes a :term:`file-like object <file object>` *pipe* to return a
-   :class:`Stream` object of the mode :attr:`StreamMode.READ` that has
-   similar API of :class:`StreamReader`. It can also be used as an async context manager.
-
-   *limit* determines the buffer size limit used by the returned :class:`Stream`
-   instance. By default the limit is set to 64 KiB.
-
-   .. versionadded:: 3.8
-
-.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16)
-
-   Takes a :term:`file-like object <file object>` *pipe* to return a
-   :class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has
-   similar API of :class:`StreamWriter`. It can also be used as an async context manager.
-
-   *limit* determines the buffer size limit used by the returned :class:`Stream`
-   instance. By default the limit is set to 64 KiB.
-
-   .. versionadded:: 3.8
 
 .. rubric:: Unix Sockets
 
-.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \
-                           sock=None, server_hostname=None, \
-                           ssl_handshake_timeout=None)
-
-   Establish a Unix socket connection to socket with *path* address and
-   return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE`
-   that can be used as a reader and a writer.
-
-   *limit* determines the buffer size limit used by the returned :class:`Stream`
-   instance. By default the *limit* is set to 64 KiB.
-
-   The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`.
-
-   The function can be used with ``await`` to get a connected stream::
-
-       stream = await asyncio.connect_unix('/tmp/example.sock')
-
-   The function can also be used as an async context manager::
-
-       async with asyncio.connect_unix('/tmp/example.sock') as stream:
-           ...
-
-   .. availability:: Unix.
-
-   .. versionadded:: 3.8
-
 .. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
                         limit=None, ssl=None, sock=None, \
                         server_hostname=None, ssl_handshake_timeout=None)
@@ -199,10 +126,6 @@ and work with streams:
 
       The *path* parameter can now be a :term:`path-like object`
 
-   .. deprecated-removed:: 3.8 3.10
-
-      ``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`.
-
 
 .. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
                           \*, loop=None, limit=None, sock=None, \
@@ -225,349 +148,6 @@ and work with streams:
 
       The *path* parameter can now be a :term:`path-like object`.
 
-   .. deprecated-removed:: 3.8 3.10
-
-      ``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`.
-
-
----------
-
-StreamServer
-============
-
-.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \
-                        limit=2**16, family=socket.AF_UNSPEC, \
-                        flags=socket.AI_PASSIVE, sock=None, backlog=100, \
-                        ssl=None, reuse_address=None, reuse_port=None, \
-                        ssl_handshake_timeout=None, shutdown_timeout=60)
-
-   The *client_connected_cb* callback is called whenever a new client
-   connection is established.  It receives a :class:`Stream` object of the
-   mode :attr:`StreamMode.READWRITE`.
-
-   *client_connected_cb* can be a plain callable or a
-   :ref:`coroutine function <coroutine>`; if it is a coroutine function,
-   it will be automatically scheduled as a :class:`Task`.
-
-   *limit* determines the buffer size limit used by the
-   returned :class:`Stream` instance.  By default the *limit*
-   is set to 64 KiB.
-
-   The rest of the arguments are passed directly to
-   :meth:`loop.create_server`.
-
-   .. coroutinemethod:: start_serving()
-
-      Binds to the given host and port to start the server.
-
-   .. coroutinemethod:: serve_forever()
-
-      Start accepting connections until the coroutine is cancelled.
-      Cancellation of ``serve_forever`` task causes the server
-      to be closed.
-
-      This method can be called if the server is already accepting
-      connections.  Only one ``serve_forever`` task can exist per
-      one *Server* object.
-
-   .. method:: is_serving()
-
-      Returns ``True`` if the server is bound and currently serving.
-
-   .. method:: bind()
-
-      Bind the server to the given *host* and *port*. This method is
-      automatically called during ``__aenter__`` when :class:`StreamServer` is
-      used as an async context manager.
-
-   .. method:: is_bound()
-
-      Return ``True`` if the server is bound.
-
-   .. coroutinemethod:: abort()
-
-      Closes the connection and cancels all pending tasks.
-
-   .. coroutinemethod:: close()
-
-      Closes the connection. This method is automatically called during
-      ``__aexit__`` when :class:`StreamServer` is used as an async context
-      manager.
-
-   .. attribute:: sockets
-
-      Returns a tuple of socket objects the server is bound to.
-
-   .. versionadded:: 3.8
-
-
-UnixStreamServer
-================
-
-.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \
-                            limit=2**16, sock=None, backlog=100, \
-                            ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60)
-
-   The *client_connected_cb* callback is called whenever a new client
-   connection is established.  It receives a :class:`Stream` object of the
-   mode :attr:`StreamMode.READWRITE`.
-
-   *client_connected_cb* can be a plain callable or a
-   :ref:`coroutine function <coroutine>`; if it is a coroutine function,
-   it will be automatically scheduled as a :class:`Task`.
-
-   *limit* determines the buffer size limit used by the
-   returned :class:`Stream` instance.  By default the *limit*
-   is set to 64 KiB.
-
-   The rest of the arguments are passed directly to
-   :meth:`loop.create_unix_server`.
-
-   .. coroutinemethod:: start_serving()
-
-      Binds to the given host and port to start the server.
-
-   .. method:: is_serving()
-
-      Returns ``True`` if the server is bound and currently serving.
-
-   .. method:: bind()
-
-      Bind the server to the given *host* and *port*. This method is
-      automatically called during ``__aenter__`` when :class:`UnixStreamServer` is
-      used as an async context manager.
-
-   .. method:: is_bound()
-
-      Return ``True`` if the server is bound.
-
-   .. coroutinemethod:: abort()
-
-      Closes the connection and cancels all pending tasks.
-
-   .. coroutinemethod:: close()
-
-      Closes the connection. This method is automatically called during
-      ``__aexit__`` when :class:`UnixStreamServer` is used as an async context
-      manager.
-
-   .. attribute:: sockets
-
-      Returns a tuple of socket objects the server is bound to.
-
-   .. availability:: Unix.
-
-   .. versionadded:: 3.8
-
-Stream
-======
-
-.. class:: Stream
-
-   Represents a Stream object that provides APIs to read and write data
-   to the IO stream . It includes the API provided by :class:`StreamReader`
-   and :class:`StreamWriter`. It can also be used as :term:`asynchronous iterator`
-   where :meth:`readline` is used. It raises :exc:`StopAsyncIteration` when
-   :meth:`readline` returns empty data.
-
-   Do not instantiate *Stream* objects directly; use API like :func:`connect`
-   and :class:`StreamServer` instead.
-
-   .. versionadded:: 3.8
-
-   .. attribute:: mode
-
-      Returns the mode of the stream which is a :class:`StreamMode` value. It could
-      be one of the below:
-
-      * :attr:`StreamMode.READ` - Connection can receive data.
-      * :attr:`StreamMode.WRITE` - Connection can send data.
-      * :attr:`StreamMode.READWRITE` - Connection can send and receive data.
-
-   .. coroutinemethod:: abort()
-
-      Aborts the connection immediately, without waiting for the send buffer to drain.
-
-   .. method:: at_eof()
-
-      Return ``True`` if the buffer is empty.
-
-   .. method:: can_write_eof()
-
-      Return *True* if the underlying transport supports
-      the :meth:`write_eof` method, *False* otherwise.
-
-   .. method:: close()
-
-      The method closes the stream and the underlying socket.
-
-      It is possible to directly await on the `close()` method::
-
-         await stream.close()
-
-      The ``await`` pauses the current coroutine until the stream and the underlying
-      socket are closed (and SSL shutdown is performed for a secure connection).
-
-   .. coroutinemethod:: drain()
-
-      Wait until it is appropriate to resume writing to the stream.
-      Example::
-
-          stream.write(data)
-          await stream.drain()
-
-      This is a flow control method that interacts with the underlying
-      IO write buffer.  When the size of the buffer reaches
-      the high watermark, *drain()* blocks until the size of the
-      buffer is drained down to the low watermark and writing can
-      be resumed.  When there is nothing to wait for, the :meth:`drain`
-      returns immediately.
-
-      .. deprecated:: 3.8
-
-      It is recommended to directly await on the `write()` method instead::
-
-         await stream.write(data)
-
-   .. method:: get_extra_info(name, default=None)
-
-      Access optional transport information; see
-      :meth:`BaseTransport.get_extra_info` for details.
-
-   .. method:: is_closing()
-
-      Return ``True`` if the stream is closed or in the process of
-      being closed.
-
-   .. coroutinemethod:: read(n=-1)
-
-      Read up to *n* bytes.  If *n* is not provided, or set to ``-1``,
-      read until EOF and return all read bytes.
-
-      If EOF was received and the internal buffer is empty,
-      return an empty ``bytes`` object.
-
-   .. coroutinemethod:: readexactly(n)
-
-      Read exactly *n* bytes.
-
-      Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
-      can be read.  Use the :attr:`IncompleteReadError.partial`
-      attribute to get the partially read data.
-
-   .. coroutinemethod:: readline()
-
-      Read one line, where "line" is a sequence of bytes
-      ending with ``\n``.
-
-      If EOF is received and ``\n`` was not found, the method
-      returns partially read data.
-
-      If EOF is received and the internal buffer is empty,
-      return an empty ``bytes`` object.
-
-   .. coroutinemethod:: readuntil(separator=b'\\n')
-
-      Read data from the stream until *separator* is found.
-
-      On success, the data and separator will be removed from the
-      internal buffer (consumed). Returned data will include the
-      separator at the end.
-
-      If the amount of data read exceeds the configured stream limit, a
-      :exc:`LimitOverrunError` exception is raised, and the data
-      is left in the internal buffer and can be read again.
-
-      If EOF is reached before the complete separator is found,
-      an :exc:`IncompleteReadError` exception is raised, and the internal
-      buffer is reset.  The :attr:`IncompleteReadError.partial` attribute
-      may contain a portion of the separator.
-
-   .. coroutinemethod:: sendfile(file, offset=0, count=None, *, fallback=True)
-
-      Sends a *file* over the stream using an optimized syscall if available.
-
-      For other parameters meaning please see :meth:`AbstractEventloop.sendfile`.
-
-   .. coroutinemethod:: start_tls(sslcontext, *, server_hostname=None, \
-                                  ssl_handshake_timeout=None)
-
-      Upgrades the existing transport-based connection to TLS.
-
-      For other parameters meaning please see :meth:`AbstractEventloop.start_tls`.
-
-   .. coroutinemethod:: wait_closed()
-
-      Wait until the stream is closed.
-
-      Should be called after :meth:`close` to wait until the underlying
-      connection is closed.
-
-   .. coroutinemethod:: write(data)
-
-      Write *data* to the underlying socket; wait until the data is sent, e.g.::
-
-         await stream.write(data)
-
-   .. method:: write(data)
-
-      The method attempts to write the *data* to the underlying socket immediately.
-      If that fails, the data is queued in an internal write buffer until it can be
-      sent. :meth:`drain` can be used to flush the underlying buffer once writing is
-      available::
-
-         stream.write(data)
-         await stream.drain()
-
-      .. deprecated:: 3.8
-
-      It is recommended to directly await on the `write()` method instead::
-
-          await stream.write(data)
-
-   .. method:: writelines(data)
-
-      The method writes a list (or any iterable) of bytes to the underlying socket
-      immediately.
-      If that fails, the data is queued in an internal write buffer until it can be
-      sent.
-
-      It is possible to directly await on the `writelines()` method::
-
-         await stream.writelines(lines)
-
-      The ``await`` pauses the current coroutine until the data is written to the
-      socket.
-
-   .. method:: write_eof()
-
-      Close the write end of the stream after the buffered write
-      data is flushed.
-
-
-StreamMode
-==========
-
-.. class:: StreamMode
-
-   A subclass of :class:`enum.Flag` that defines a set of values that can be
-   used to determine the ``mode`` of :class:`Stream` objects.
-
-   .. data:: READ
-
-   The stream object is readable and provides the API of :class:`StreamReader`.
-
-   .. data:: WRITE
-
-   The stream object is writeable and provides the API of :class:`StreamWriter`.
-
-   .. data:: READWRITE
-
-   The stream object is readable and writeable and provides the API of both
-   :class:`StreamReader` and :class:`StreamWriter`.
-
-  .. versionadded:: 3.8
-
 
 StreamReader
 ============
@@ -629,7 +209,8 @@ StreamReader
 
    .. method:: at_eof()
 
-      Return ``True`` if the buffer is empty.
+      Return ``True`` if the buffer is empty and :meth:`feed_eof`
+      was called.
 
 
 StreamWriter
@@ -650,22 +231,11 @@ StreamWriter
       If that fails, the data is queued in an internal write buffer until it can be
       sent.
 
-      Starting with Python 3.8, it is possible to directly await on the `write()`
-      method::
-
-         await stream.write(data)
-
-      The ``await`` pauses the current coroutine until the data is written to the
-      socket.
-
-      Below is an equivalent code that works with Python <= 3.7::
+      The method should be used along with the ``drain()`` method::
 
          stream.write(data)
          await stream.drain()
 
-      .. versionchanged:: 3.8
-         Support ``await stream.write(...)`` syntax.
-
    .. method:: writelines(data)
 
       The method writes a list (or any iterable) of bytes to the underlying socket
@@ -673,42 +243,20 @@ StreamWriter
       If that fails, the data is queued in an internal write buffer until it can be
       sent.
 
-      Starting with Python 3.8, it is possible to directly await on the `writelines()`
-      method::
-
-         await stream.writelines(lines)
-
-      The ``await`` pauses the current coroutine until the data is written to the
-      socket.
-
-      Below is an equivalent code that works with Python <= 3.7::
+      The method should be used along with the ``drain()`` method::
 
          stream.writelines(lines)
          await stream.drain()
 
-      .. versionchanged:: 3.8
-         Support ``await stream.writelines()`` syntax.
-
    .. method:: close()
 
       The method closes the stream and the underlying socket.
 
-      Starting with Python 3.8, it is possible to directly await on the `close()`
-      method::
-
-         await stream.close()
-
-      The ``await`` pauses the current coroutine until the stream and the underlying
-      socket are closed (and SSL shutdown is performed for a secure connection).
-
-      Below is an equivalent code that works with Python <= 3.7::
+      The method should be used along with the ``wait_closed()`` method::
 
          stream.close()
          await stream.wait_closed()
 
-      .. versionchanged:: 3.8
-         Support ``await stream.close()`` syntax.
-
    .. method:: can_write_eof()
 
       Return *True* if the underlying transport supports
@@ -768,17 +316,22 @@ Examples
 TCP echo client using streams
 -----------------------------
 
-TCP echo client using the :func:`asyncio.connect` function::
+TCP echo client using the :func:`asyncio.open_connection` function::
 
     import asyncio
 
     async def tcp_echo_client(message):
-        async with asyncio.connect('127.0.0.1', 8888) as stream:
-            print(f'Send: {message!r}')
-            await stream.write(message.encode())
+        reader, writer = await asyncio.open_connection(
+            '127.0.0.1', 8888)
+
+        print(f'Send: {message!r}')
+        writer.write(message.encode())
+
+        data = await reader.read(100)
+        print(f'Received: {data.decode()!r}')
 
-            data = await stream.read(100)
-            print(f'Received: {data.decode()!r}')
+        print('Close the connection')
+        writer.close()
 
     asyncio.run(tcp_echo_client('Hello World!'))
 
@@ -794,28 +347,32 @@ TCP echo client using the :func:`asyncio.connect` function::
 TCP echo server using streams
 -----------------------------
 
-TCP echo server using the :class:`asyncio.StreamServer` class::
+TCP echo server using the :func:`asyncio.start_server` function::
 
     import asyncio
 
-    async def handle_echo(stream):
-        data = await stream.read(100)
+    async def handle_echo(reader, writer):
+        data = await reader.read(100)
         message = data.decode()
-        addr = stream.get_extra_info('peername')
+        addr = writer.get_extra_info('peername')
 
         print(f"Received {message!r} from {addr!r}")
 
         print(f"Send: {message!r}")
-        await stream.write(data)
+        writer.write(data)
+        await writer.drain()
 
         print("Close the connection")
-        await stream.close()
+        writer.close()
 
     async def main():
-        async with asyncio.StreamServer(
-                handle_echo, '127.0.0.1', 8888) as server:
-            addr = server.sockets[0].getsockname()
-            print(f'Serving on {addr}')
+        server = await asyncio.start_server(
+            handle_echo, '127.0.0.1', 8888)
+
+        addr = server.sockets[0].getsockname()
+        print(f'Serving on {addr}')
+
+        async with server:
             await server.serve_forever()
 
     asyncio.run(main())
@@ -839,9 +396,11 @@ Simple example querying HTTP headers of the URL passed on the command line::
     async def print_http_headers(url):
         url = urllib.parse.urlsplit(url)
         if url.scheme == 'https':
-            stream = await asyncio.connect(url.hostname, 443, ssl=True)
+            reader, writer = await asyncio.open_connection(
+                url.hostname, 443, ssl=True)
         else:
-            stream = await asyncio.connect(url.hostname, 80)
+            reader, writer = await asyncio.open_connection(
+                url.hostname, 80)
 
         query = (
             f"HEAD {url.path or '/'} HTTP/1.0\r\n"
@@ -849,14 +408,18 @@ Simple example querying HTTP headers of the URL passed on the command line::
             f"\r\n"
         )
 
-        stream.write(query.encode('latin-1'))
-        while (line := await stream.readline()):
+        writer.write(query.encode('latin-1'))
+        while True:
+            line = await reader.readline()
+            if not line:
+                break
+
             line = line.decode('latin1').rstrip()
             if line:
                 print(f'HTTP header> {line}')
 
         # Ignore the body, close the socket
-        await stream.close()
+        writer.close()
 
     url = sys.argv[1]
     asyncio.run(print_http_headers(url))
@@ -877,7 +440,7 @@ Register an open socket to wait for data using streams
 ------------------------------------------------------
 
 Coroutine waiting until a socket receives data using the
-:func:`asyncio.connect` function::
+:func:`open_connection` function::
 
     import asyncio
     import socket
@@ -891,15 +454,17 @@ Coroutine waiting until a socket receives data using the
         rsock, wsock = socket.socketpair()
 
         # Register the open socket to wait for data.
-        async with asyncio.connect(sock=rsock) as stream:
-            # Simulate the reception of data from the network
-            loop.call_soon(wsock.send, 'abc'.encode())
+        reader, writer = await asyncio.open_connection(sock=rsock)
+
+        # Simulate the reception of data from the network
+        loop.call_soon(wsock.send, 'abc'.encode())
 
-            # Wait for data
-            data = await stream.read(100)
+        # Wait for data
+        data = await reader.read(100)
 
-            # Got data, we are done: close the socket
-            print("Received:", data.decode())
+        # Got data, we are done: close the socket
+        print("Received:", data.decode())
+        writer.close()
 
         # Close the second socket
         wsock.close()
diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py
index a6a29dbfecd5..28c2e2c429f3 100644
--- a/Lib/asyncio/__init__.py
+++ b/Lib/asyncio/__init__.py
@@ -3,7 +3,6 @@
 # flake8: noqa
 
 import sys
-import warnings
 
 # This relies on each of the submodules having an __all__ variable.
 from .base_events import *
@@ -44,40 +43,3 @@
 else:
     from .unix_events import *  # pragma: no cover
     __all__ += unix_events.__all__
-
-
-__all__ += ('StreamReader', 'StreamWriter', 'StreamReaderProtocol')  # deprecated
-
-
-def __getattr__(name):
-    global StreamReader, StreamWriter, StreamReaderProtocol
-    if name == 'StreamReader':
-        warnings.warn("StreamReader is deprecated since Python 3.8 "
-                      "in favor of Stream, and scheduled for removal "
-                      "in Python 3.10",
-                      DeprecationWarning,
-                      stacklevel=2)
-        from .streams import StreamReader as sr
-        StreamReader = sr
-        return StreamReader
-    if name == 'StreamWriter':
-        warnings.warn("StreamWriter is deprecated since Python 3.8 "
-                      "in favor of Stream, and scheduled for removal "
-                      "in Python 3.10",
-                      DeprecationWarning,
-                      stacklevel=2)
-        from .streams import StreamWriter as sw
-        StreamWriter = sw
-        return StreamWriter
-    if name == 'StreamReaderProtocol':
-        warnings.warn("Using asyncio internal class StreamReaderProtocol "
-                      "is deprecated since Python 3.8 "
-                      " and scheduled for removal "
-                      "in Python 3.10",
-                      DeprecationWarning,
-                      stacklevel=2)
-        from .streams import StreamReaderProtocol as srp
-        StreamReaderProtocol = srp
-        return StreamReaderProtocol
-
-    raise AttributeError(f"module {__name__} has no attribute {name}")
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 20c642a001d2..795530e6f69e 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -1,19 +1,14 @@
 __all__ = (
-    'Stream', 'StreamMode',
-    'open_connection', 'start_server',
-    'connect', 'connect_read_pipe', 'connect_write_pipe',
-    'StreamServer')
+    'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
+    'open_connection', 'start_server')
 
-import enum
 import socket
 import sys
 import warnings
 import weakref
 
 if hasattr(socket, 'AF_UNIX'):
-    __all__ += ('open_unix_connection', 'start_unix_server',
-                'connect_unix',
-                'UnixStreamServer')
+    __all__ += ('open_unix_connection', 'start_unix_server')
 
 from . import coroutines
 from . import events
@@ -21,155 +16,12 @@
 from . import format_helpers
 from . import protocols
 from .log import logger
-from . import tasks
+from .tasks import sleep
 
 
 _DEFAULT_LIMIT = 2 ** 16  # 64 KiB
 
 
-class StreamMode(enum.Flag):
-    READ = enum.auto()
-    WRITE = enum.auto()
-    READWRITE = READ | WRITE
-
-
-def _ensure_can_read(mode):
-    if not mode & StreamMode.READ:
-        raise RuntimeError("The stream is write-only")
-
-
-def _ensure_can_write(mode):
-    if not mode & StreamMode.WRITE:
-        raise RuntimeError("The stream is read-only")
-
-
-class _ContextManagerHelper:
-    __slots__ = ('_awaitable', '_result')
-
-    def __init__(self, awaitable):
-        self._awaitable = awaitable
-        self._result = None
-
-    def __await__(self):
-        return self._awaitable.__await__()
-
-    async def __aenter__(self):
-        ret = await self._awaitable
-        result = await ret.__aenter__()
-        self._result = result
-        return result
-
-    async def __aexit__(self, exc_type, exc_val, exc_tb):
-        return await self._result.__aexit__(exc_type, exc_val, exc_tb)
-
-
-def connect(host=None, port=None, *,
-            limit=_DEFAULT_LIMIT,
-            ssl=None, family=0, proto=0,
-            flags=0, sock=None, local_addr=None,
-            server_hostname=None,
-            ssl_handshake_timeout=None,
-            happy_eyeballs_delay=None, interleave=None):
-    """Connect to TCP socket on *host* : *port* address to send and receive data.
-
-    *limit* determines the buffer size limit used by the returned `Stream`
-    instance. By default the *limit* is set to 64 KiB.
-
-    The rest of the arguments are passed directly to `loop.create_connection()`.
-    """
-    # Design note:
-    # Don't use decorator approach but explicit non-async
-    # function to fail fast and explicitly
-    # if passed arguments don't match the function signature
-    return _ContextManagerHelper(_connect(host, port, limit,
-                                          ssl, family, proto,
-                                          flags, sock, local_addr,
-                                          server_hostname,
-                                          ssl_handshake_timeout,
-                                          happy_eyeballs_delay,
-                                          interleave))
-
-
-async def _connect(host, port,
-                  limit,
-                  ssl, family, proto,
-                  flags, sock, local_addr,
-                  server_hostname,
-                  ssl_handshake_timeout,
-                  happy_eyeballs_delay, interleave):
-    loop = events.get_running_loop()
-    stream = Stream(mode=StreamMode.READWRITE,
-                    limit=limit,
-                    loop=loop,
-                    _asyncio_internal=True)
-    await loop.create_connection(
-        lambda: _StreamProtocol(stream, loop=loop,
-                                _asyncio_internal=True),
-        host, port,
-        ssl=ssl, family=family, proto=proto,
-        flags=flags, sock=sock, local_addr=local_addr,
-        server_hostname=server_hostname,
-        ssl_handshake_timeout=ssl_handshake_timeout,
-        happy_eyeballs_delay=happy_eyeballs_delay, interleave=interleave)
-    return stream
-
-
-def connect_read_pipe(pipe, *, limit=_DEFAULT_LIMIT):
-    """Establish a connection to a file-like object *pipe* to receive data.
-
-    Takes a file-like object *pipe* to return a Stream object of the mode
-    StreamMode.READ that has similar API of StreamReader. It can also be used
-    as an async context manager.
-    """
-
-    # Design note:
-    # Don't use decorator approach but explicit non-async
-    # function to fail fast and explicitly
-    # if passed arguments don't match the function signature
-    return _ContextManagerHelper(_connect_read_pipe(pipe, limit))
-
-
-async def _connect_read_pipe(pipe, limit):
-    loop = events.get_running_loop()
-    stream = Stream(mode=StreamMode.READ,
-                    limit=limit,
-                    loop=loop,
-                    _asyncio_internal=True)
-    await loop.connect_read_pipe(
-        lambda: _StreamProtocol(stream, loop=loop,
-                                _asyncio_internal=True),
-        pipe)
-    return stream
-
-
-def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT):
-    """Establish a connection to a file-like object *pipe* to send data.
-
-    Takes a file-like object *pipe* to return a Stream object of the mode
-    StreamMode.WRITE that has similar API of StreamWriter. It can also be used
-    as an async context manager.
-    """
-
-    # Design note:
-    # Don't use decorator approach but explicit non-async
-    # function to fail fast and explicitly
-    # if passed arguments don't match the function signature
-    return _ContextManagerHelper(_connect_write_pipe(pipe, limit))
-
-
-async def _connect_write_pipe(pipe, limit):
-    loop = events.get_running_loop()
-    stream = Stream(mode=StreamMode.WRITE,
-                    limit=limit,
-                    loop=loop,
-                    _asyncio_internal=True)
-    await loop.connect_write_pipe(
-        lambda: _StreamProtocol(stream, loop=loop,
-                                _asyncio_internal=True),
-        pipe)
-    return stream
-
-
 async def open_connection(host=None, port=None, *,
                           loop=None, limit=_DEFAULT_LIMIT, **kwds):
     """A wrapper for create_connection() returning a (reader, writer) pair.
@@ -189,11 +41,6 @@ def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT):
     StreamReaderProtocol classes, just copy the code -- there's
     really nothing special here except some convenience.)
     """
-    warnings.warn("open_connection() is deprecated since Python 3.8 "
-                  "in favor of connect(), and scheduled for removal "
-                  "in Python 3.10",
-                  DeprecationWarning,
-                  stacklevel=2)
     if loop is None:
         loop = events.get_event_loop()
     else:
@@ -201,7 +48,7 @@ def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT):
                       "and scheduled for removal in Python 3.10.",
                       DeprecationWarning, stacklevel=2)
     reader = StreamReader(limit=limit, loop=loop)
-    protocol = StreamReaderProtocol(reader, loop=loop, _asyncio_internal=True)
+    protocol = StreamReaderProtocol(reader, loop=loop)
     transport, _ = await loop.create_connection(
         lambda: protocol, host, port, **kwds)
     writer = StreamWriter(transport, protocol, reader, loop)
@@ -231,11 +78,6 @@ def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT):
     The return value is the same as loop.create_server(), i.e. a
     Server object which can be used to stop the service.
     """
-    warnings.warn("start_server() is deprecated since Python 3.8 "
-                  "in favor of StreamServer(), and scheduled for removal "
-                  "in Python 3.10",
-                  DeprecationWarning,
-                  stacklevel=2)
     if loop is None:
         loop = events.get_event_loop()
     else:
@@ -246,201 +88,18 @@ def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT):
     def factory():
         reader = StreamReader(limit=limit, loop=loop)
         protocol = StreamReaderProtocol(reader, client_connected_cb,
-                                        loop=loop,
-                                        _asyncio_internal=True)
+                                        loop=loop)
         return protocol
 
     return await loop.create_server(factory, host, port, **kwds)
 
 
-class _BaseStreamServer:
-    # Design notes.
-    # StreamServer and UnixStreamServer are exposed as FINAL classes,
-    # not function factories.
-    # async with serve(host, port) as server:
-    #      server.start_serving()
-    # looks ugly.
-    # The class doesn't provide API for enumerating connected streams
-    # It can be a subject for improvements in Python 3.9
-
-    _server_impl = None
-
-    def __init__(self, client_connected_cb,
-                 /,
-                 limit=_DEFAULT_LIMIT,
-                 shutdown_timeout=60,
-                 _asyncio_internal=False):
-        if not _asyncio_internal:
-            raise RuntimeError("_ServerStream is a private asyncio class")
-        self._client_connected_cb = client_connected_cb
-        self._limit = limit
-        self._loop = events.get_running_loop()
-        self._streams = {}
-        self._shutdown_timeout = shutdown_timeout
-
-    def __init_subclass__(cls):
-        if not cls.__module__.startswith('asyncio.'):
-            raise TypeError(f"asyncio.{cls.__name__} "
-                            "class cannot be inherited from")
-
-    async def bind(self):
-        if self._server_impl is not None:
-            return
-        self._server_impl = await self._bind()
-
-    def is_bound(self):
-        return self._server_impl is not None
-
-    @property
-    def sockets(self):
-        # multiple value for socket bound to both IPv4 and IPv6 families
-        if self._server_impl is None:
-            return ()
-        return self._server_impl.sockets
-
-    def is_serving(self):
-        if self._server_impl is None:
-            return False
-        return self._server_impl.is_serving()
-
-    async def start_serving(self):
-        await self.bind()
-        await self._server_impl.start_serving()
-
-    async def serve_forever(self):
-        await self.start_serving()
-        await self._server_impl.serve_forever()
-
-    async def close(self):
-        if self._server_impl is None:
-            return
-        self._server_impl.close()
-        streams = list(self._streams.keys())
-        active_tasks = list(self._streams.values())
-        if streams:
-            await tasks.wait([stream.close() for stream in streams])
-        await self._server_impl.wait_closed()
-        self._server_impl = None
-        await self._shutdown_active_tasks(active_tasks)
-
-    async def abort(self):
-        if self._server_impl is None:
-            return
-        self._server_impl.close()
-        streams = list(self._streams.keys())
-        active_tasks = list(self._streams.values())
-        if streams:
-            await tasks.wait([stream.abort() for stream in streams])
-        await self._server_impl.wait_closed()
-        self._server_impl = None
-        await self._shutdown_active_tasks(active_tasks)
-
-    async def __aenter__(self):
-        await self.bind()
-        return self
-
-    async def __aexit__(self, exc_type, exc_value, exc_tb):
-        await self.close()
-
-    def _attach(self, stream, task):
-        self._streams[stream] = task
-
-    def _detach(self, stream, task):
-        del self._streams[stream]
-
-    async def _shutdown_active_tasks(self, active_tasks):
-        if not active_tasks:
-            return
-        # NOTE: tasks finished with exception are reported
-        # by the Task.__del__() method.
-        done, pending = await tasks.wait(active_tasks,
-                                         timeout=self._shutdown_timeout)
-        if not pending:
-            return
-        for task in pending:
-            task.cancel()
-        done, pending = await tasks.wait(pending,
-                                         timeout=self._shutdown_timeout)
-        for task in pending:
-            self._loop.call_exception_handler({
-                "message": (f'{task!r} ignored cancellation request '
-                            f'from a closing {self!r}'),
-                "stream_server": self
-            })
-
-    def __repr__(self):
-        ret = [f'{self.__class__.__name__}']
-        if self.is_serving():
-            ret.append('serving')
-        if self.sockets:
-            ret.append(f'sockets={self.sockets!r}')
-        return '<' + ' '.join(ret) + '>'
-
-    def __del__(self, _warn=warnings.warn):
-        if self._server_impl is not None:
-            _warn(f"unclosed stream server {self!r}",
-                  ResourceWarning, source=self)
-            self._server_impl.close()
-
-
-class StreamServer(_BaseStreamServer):
-
-    def __init__(self, client_connected_cb, /, host=None, port=None, *,
-                 limit=_DEFAULT_LIMIT,
-                 family=socket.AF_UNSPEC,
-                 flags=socket.AI_PASSIVE, sock=None, backlog=100,
-                 ssl=None, reuse_address=None, reuse_port=None,
-                 ssl_handshake_timeout=None,
-                 shutdown_timeout=60):
-        super().__init__(client_connected_cb,
-                         limit=limit,
-                         shutdown_timeout=shutdown_timeout,
-                         _asyncio_internal=True)
-        self._host = host
-        self._port = port
-        self._family = family
-        self._flags = flags
-        self._sock = sock
-        self._backlog = backlog
-        self._ssl = ssl
-        self._reuse_address = reuse_address
-        self._reuse_port = reuse_port
-        self._ssl_handshake_timeout = ssl_handshake_timeout
-
-    async def _bind(self):
-        def factory():
-            protocol = _ServerStreamProtocol(self,
-                                             self._limit,
-                                             self._client_connected_cb,
-                                             loop=self._loop,
-                                             _asyncio_internal=True)
-            return protocol
-        return await self._loop.create_server(
-            factory,
-            self._host,
-            self._port,
-            start_serving=False,
-            family=self._family,
-            flags=self._flags,
-            sock=self._sock,
-            backlog=self._backlog,
-            ssl=self._ssl,
-            reuse_address=self._reuse_address,
-            reuse_port=self._reuse_port,
-            ssl_handshake_timeout=self._ssl_handshake_timeout)
-
-
 if hasattr(socket, 'AF_UNIX'):
     # UNIX Domain Sockets are supported on this platform
 
     async def open_unix_connection(path=None, *,
                                    loop=None, limit=_DEFAULT_LIMIT, **kwds):
         """Similar to `open_connection` but works with UNIX Domain Sockets."""
-        warnings.warn("open_unix_connection() is deprecated since Python 3.8 "
-                      "in favor of connect_unix(), and scheduled for removal "
-                      "in Python 3.10",
-                      DeprecationWarning,
-                      stacklevel=2)
         if loop is None:
             loop = events.get_event_loop()
         else:
@@ -448,62 +107,15 @@ def factory():
                           "and scheduled for removal in Python 3.10.",
                           DeprecationWarning, stacklevel=2)
         reader = StreamReader(limit=limit, loop=loop)
-        protocol = StreamReaderProtocol(reader, loop=loop,
-                                        _asyncio_internal=True)
+        protocol = StreamReaderProtocol(reader, loop=loop)
         transport, _ = await loop.create_unix_connection(
             lambda: protocol, path, **kwds)
         writer = StreamWriter(transport, protocol, reader, loop)
         return reader, writer
 
-
-    def connect_unix(path=None, *,
-                     limit=_DEFAULT_LIMIT,
-                     ssl=None, sock=None,
-                     server_hostname=None,
-                     ssl_handshake_timeout=None):
-        """Similar to `connect()` but works with UNIX Domain Sockets."""
-        # Design note:
-        # Don't use decorator approach but explicit non-async
-        # function to fail fast and explicitly
-        # if passed arguments don't match the function signature
-        return _ContextManagerHelper(_connect_unix(path,
-                                                   limit,
-                                                   ssl, sock,
-                                                   server_hostname,
-                                                   ssl_handshake_timeout))
-
-
-    async def _connect_unix(path,
-                           limit,
-                           ssl, sock,
-                           server_hostname,
-                           ssl_handshake_timeout):
-        """Similar to `connect()` but works with UNIX Domain Sockets."""
-        loop = events.get_running_loop()
-        stream = Stream(mode=StreamMode.READWRITE,
-                        limit=limit,
-                        loop=loop,
-                        _asyncio_internal=True)
-        await loop.create_unix_connection(
-            lambda: _StreamProtocol(stream,
-                                    loop=loop,
-                                    _asyncio_internal=True),
-            path,
-            ssl=ssl,
-            sock=sock,
-            server_hostname=server_hostname,
-            ssl_handshake_timeout=ssl_handshake_timeout)
-        return stream
-
-
     async def start_unix_server(client_connected_cb, path=None, *,
                                 loop=None, limit=_DEFAULT_LIMIT, **kwds):
         """Similar to `start_server` but works with UNIX Domain Sockets."""
-        warnings.warn("start_unix_server() is deprecated since Python 3.8 "
-                      "in favor of UnixStreamServer(), and scheduled "
-                      "for removal in Python 3.10",
-                      DeprecationWarning,
-                      stacklevel=2)
         if loop is None:
             loop = events.get_event_loop()
         else:
@@ -514,48 +126,11 @@ def connect_unix(path=None, *,
         def factory():
             reader = StreamReader(limit=limit, loop=loop)
             protocol = StreamReaderProtocol(reader, client_connected_cb,
-                                            loop=loop,
-                                            _asyncio_internal=True)
+                                            loop=loop)
             return protocol
 
         return await loop.create_unix_server(factory, path, **kwds)
 
-    class UnixStreamServer(_BaseStreamServer):
-
-        def __init__(self, client_connected_cb, /, path=None, *,
-                     limit=_DEFAULT_LIMIT,
-                     sock=None,
-                     backlog=100,
-                     ssl=None,
-                     ssl_handshake_timeout=None,
-                     shutdown_timeout=60):
-            super().__init__(client_connected_cb,
-                             limit=limit,
-                             shutdown_timeout=shutdown_timeout,
-                             _asyncio_internal=True)
-            self._path = path
-            self._sock = sock
-            self._backlog = backlog
-            self._ssl = ssl
-            self._ssl_handshake_timeout = ssl_handshake_timeout
-
-        async def _bind(self):
-            def factory():
-                protocol = _ServerStreamProtocol(self,
-                                                 self._limit,
-                                                 self._client_connected_cb,
-                                                 loop=self._loop,
-                                                 _asyncio_internal=True)
-                return protocol
-            return await self._loop.create_unix_server(
-                factory,
-                self._path,
-                start_serving=False,
-                sock=self._sock,
-                backlog=self._backlog,
-                ssl=self._ssl,
-                ssl_handshake_timeout=self._ssl_handshake_timeout)
-
 
 class FlowControlMixin(protocols.Protocol):
     """Reusable flow control logic for StreamWriter.drain().
@@ -567,20 +142,11 @@ class FlowControlMixin(protocols.Protocol):
     StreamWriter.drain() must wait for _drain_helper() coroutine.
     """
 
-    def __init__(self, loop=None, *, _asyncio_internal=False):
+    def __init__(self, loop=None):
         if loop is None:
             self._loop = events.get_event_loop()
         else:
             self._loop = loop
-        if not _asyncio_internal:
-            # NOTE:
-            # Avoid inheritance from FlowControlMixin
-            # Copy-paste the code to your project
-            # if you need flow control helpers
-            warnings.warn(f"{self.__class__} should be instaniated "
-                          "by asyncio internals only, "
-                          "please avoid its creation from user code",
-                          DeprecationWarning)
         self._paused = False
         self._drain_waiter = None
         self._connection_lost = False
@@ -634,8 +200,6 @@ def _get_close_waiter(self, stream):
         raise NotImplementedError
 
 
-# begin legacy stream APIs
-
 class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
     """Helper class to adapt between Protocol and StreamReader.
 
@@ -645,47 +209,103 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
     call inappropriate methods of the protocol.)
     """
 
-    def __init__(self, stream_reader, client_connected_cb=None, loop=None,
-                 *, _asyncio_internal=False):
-        super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
-        self._stream_reader = stream_reader
+    _source_traceback = None
+
+    def __init__(self, stream_reader, client_connected_cb=None, loop=None):
+        super().__init__(loop=loop)
+        if stream_reader is not None:
+            self._stream_reader_wr = weakref.ref(stream_reader,
+                                                 self._on_reader_gc)
+            self._source_traceback = stream_reader._source_traceback
+        else:
+            self._stream_reader_wr = None
+        if client_connected_cb is not None:
+            # This is a stream created by the `create_server()` function.
+            # Keep a strong reference to the reader until a connection
+            # is established.
+            self._strong_reader = stream_reader
+        self._reject_connection = False
         self._stream_writer = None
+        self._transport = None
         self._client_connected_cb = client_connected_cb
         self._over_ssl = False
         self._closed = self._loop.create_future()
 
+    def _on_reader_gc(self, wr):
+        transport = self._transport
+        if transport is not None:
+            # connection_made was called
+            context = {
+                'message': ('An open stream object is being garbage '
+                            'collected; call "stream.close()" explicitly.')
+            }
+            if self._source_traceback:
+                context['source_traceback'] = self._source_traceback
+            self._loop.call_exception_handler(context)
+            transport.abort()
+        else:
+            self._reject_connection = True
+        self._stream_reader_wr = None
+
+    @property
+    def _stream_reader(self):
+        if self._stream_reader_wr is None:
+            return None
+        return self._stream_reader_wr()
+
     def connection_made(self, transport):
-        self._stream_reader.set_transport(transport)
+        if self._reject_connection:
+            context = {
+                'message': ('An open stream was garbage collected prior to '
+                            'establishing network connection; '
+                            'call "stream.close()" explicitly.')
+            }
+            if self._source_traceback:
+                context['source_traceback'] = self._source_traceback
+            self._loop.call_exception_handler(context)
+            transport.abort()
+            return
+        self._transport = transport
+        reader = self._stream_reader
+        if reader is not None:
+            reader.set_transport(transport)
         self._over_ssl = transport.get_extra_info('sslcontext') is not None
         if self._client_connected_cb is not None:
             self._stream_writer = StreamWriter(transport, self,
-                                               self._stream_reader,
+                                               reader,
                                                self._loop)
-            res = self._client_connected_cb(self._stream_reader,
+            res = self._client_connected_cb(reader,
                                             self._stream_writer)
             if coroutines.iscoroutine(res):
                 self._loop.create_task(res)
+            self._strong_reader = None
 
     def connection_lost(self, exc):
-        if self._stream_reader is not None:
+        reader = self._stream_reader
+        if reader is not None:
             if exc is None:
-                self._stream_reader.feed_eof()
+                reader.feed_eof()
             else:
-                self._stream_reader.set_exception(exc)
+                reader.set_exception(exc)
         if not self._closed.done():
             if exc is None:
                 self._closed.set_result(None)
             else:
                 self._closed.set_exception(exc)
         super().connection_lost(exc)
-        self._stream_reader = None
+        self._stream_reader_wr = None
         self._stream_writer = None
+        self._transport = None
 
     def data_received(self, data):
-        self._stream_reader.feed_data(data)
+        reader = self._stream_reader
+        if reader is not None:
+            reader.feed_data(data)
 
     def eof_received(self):
-        self._stream_reader.feed_eof()
+        reader = self._stream_reader
+        if reader is not None:
+            reader.feed_eof()
         if self._over_ssl:
             # Prevent a warning in SSLProtocol.eof_received:
             # "returning true from eof_received()
@@ -693,6 +313,9 @@ def eof_received(self):
             return False
         return True
 
+    def _get_close_waiter(self, stream):
+        return self._closed
+
     def __del__(self):
         # Prevent reports about unhandled exceptions.
         # Better than self._closed._log_traceback = False hack
@@ -718,6 +341,8 @@ def __init__(self, transport, protocol, reader, loop):
         assert reader is None or isinstance(reader, StreamReader)
         self._reader = reader
         self._loop = loop
+        self._complete_fut = self._loop.create_future()
+        self._complete_fut.set_result(None)
 
     def __repr__(self):
         info = [self.__class__.__name__, f'transport={self._transport!r}']
@@ -748,7 +373,7 @@ def is_closing(self):
         return self._transport.is_closing()
 
     async def wait_closed(self):
-        await self._protocol._closed
+        await self._protocol._get_close_waiter(self)
 
     def get_extra_info(self, name, default=None):
         return self._transport.get_extra_info(name, default)
@@ -766,18 +391,23 @@ def get_extra_info(self, name, default=None):
             if exc is not None:
                 raise exc
         if self._transport.is_closing():
+            # Wait for protocol.connection_lost() call
+            # Raise connection closing error if any,
+            # ConnectionResetError otherwise
             # Yield to the event loop so connection_lost() may be
             # called.  Without this, _drain_helper() would return
             # immediately, and code that calls
             #     write(...); await drain()
             # in a loop would never call connection_lost(), so it
             # would not see an error when the socket is closed.
-            await tasks.sleep(0, loop=self._loop)
+            await sleep(0)
         await self._protocol._drain_helper()
 
 
 class StreamReader:
 
+    _source_traceback = None
+
     def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
         # The line length limit is  a security feature;
         # it also doubles as half the buffer limit.
@@ -796,6 +426,9 @@ def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
         self._exception = None
         self._transport = None
         self._paused = False
+        if self._loop.get_debug():
+            self._source_traceback = format_helpers.extract_stack(
+                sys._getframe(1))
 
     def __repr__(self):
         info = ['StreamReader']
@@ -1123,706 +756,3 @@ def __aiter__(self):
         if val == b'':
             raise StopAsyncIteration
         return val
-
-
-# end legacy stream APIs
-
-
-class _BaseStreamProtocol(FlowControlMixin, protocols.Protocol):
-    """Helper class to adapt between Protocol and StreamReader.
-
-    (This is a helper class instead of making StreamReader itself a
-    Protocol subclass, because the StreamReader has other potential
-    uses, and to prevent the user of the StreamReader to accidentally
-    call inappropriate methods of the protocol.)
-    """
-
-    _stream = None  # initialized in derived classes
-
-    def __init__(self, loop=None,
-                 *, _asyncio_internal=False):
-        super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
-        self._transport = None
-        self._over_ssl = False
-        self._closed = self._loop.create_future()
-
-    def connection_made(self, transport):
-        self._transport = transport
-        self._over_ssl = transport.get_extra_info('sslcontext') is not None
-
-    def connection_lost(self, exc):
-        stream = self._stream
-        if stream is not None:
-            if exc is None:
-                stream._feed_eof()
-            else:
-                stream._set_exception(exc)
-        if not self._closed.done():
-            if exc is None:
-                self._closed.set_result(None)
-            else:
-                self._closed.set_exception(exc)
-        super().connection_lost(exc)
-        self._transport = None
-
-    def data_received(self, data):
-        stream = self._stream
-        if stream is not None:
-            stream._feed_data(data)
-
-    def eof_received(self):
-        stream = self._stream
-        if stream is not None:
-            stream._feed_eof()
-        if self._over_ssl:
-            # Prevent a warning in SSLProtocol.eof_received:
-            # "returning true from eof_received()
-            # has no effect when using ssl"
-            return False
-        return True
-
-    def _get_close_waiter(self, stream):
-        return self._closed
-
-    def __del__(self):
-        # Prevent reports about unhandled exceptions.
-        # Better than self._closed._log_traceback = False hack
-        closed = self._get_close_waiter(self._stream)
-        if closed.done() and not closed.cancelled():
-            closed.exception()
-
-
-class _StreamProtocol(_BaseStreamProtocol):
-    _source_traceback = None
-
-    def __init__(self, stream, loop=None,
-                 *, _asyncio_internal=False):
-        super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
-        self._source_traceback = stream._source_traceback
-        self._stream_wr = weakref.ref(stream, self._on_gc)
-        self._reject_connection = False
-
-    def _on_gc(self, wr):
-        transport = self._transport
-        if transport is not None:
-            # connection_made was called
-            context = {
-                'message': ('An open stream object is being garbage '
-                            'collected; call "stream.close()" explicitly.')
-            }
-            if self._source_traceback:
-                context['source_traceback'] = self._source_traceback
-            self._loop.call_exception_handler(context)
-            transport.abort()
-        else:
-            self._reject_connection = True
-        self._stream_wr = None
-
-    @property
-    def _stream(self):
-        if self._stream_wr is None:
-            return None
-        return self._stream_wr()
-
-    def connection_made(self, transport):
-        if self._reject_connection:
-            context = {
-                'message': ('An open stream was garbage collected prior to '
-                            'establishing network connection; '
-                            'call "stream.close()" explicitly.')
-            }
-            if self._source_traceback:
-                context['source_traceback'] = self._source_traceback
-            self._loop.call_exception_handler(context)
-            transport.abort()
-            return
-        super().connection_made(transport)
-        stream = self._stream
-        if stream is None:
-            return
-        stream._set_transport(transport)
-        stream._protocol = self
-
-    def connection_lost(self, exc):
-        super().connection_lost(exc)
-        self._stream_wr = None
-
-
-class _ServerStreamProtocol(_BaseStreamProtocol):
-    def __init__(self, server, limit, client_connected_cb, loop=None,
-                 *, _asyncio_internal=False):
-        super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
-        assert self._closed
-        self._client_connected_cb = client_connected_cb
-        self._limit = limit
-        self._server = server
-        self._task = None
-
-    def connection_made(self, transport):
-        super().connection_made(transport)
-        stream = Stream(mode=StreamMode.READWRITE,
-                        transport=transport,
-                        protocol=self,
-                        limit=self._limit,
-                        loop=self._loop,
-                        is_server_side=True,
-                        _asyncio_internal=True)
-        self._stream = stream
-        # If self._client_connected_cb(self._stream) fails
-        # the exception is logged by transport
-        self._task = self._loop.create_task(
-            self._client_connected_cb(self._stream))
-        self._server._attach(stream, self._task)
-
-    def connection_lost(self, exc):
-        super().connection_lost(exc)
-        self._server._detach(self._stream, self._task)
-        self._stream = None
-
-
-class _OptionalAwait:
-    # The class doesn't create a coroutine
-    # if not awaited
-    # It prevents "coroutine is never awaited" message
-
-    __slots___ = ('_method',)
-
-    def __init__(self, method):
-        self._method = method
-
-    def __await__(self):
-        return self._method().__await__()
-
-
-class Stream:
-    """Wraps a Transport.
-
-    This exposes write(), writelines(), [can_]write_eof(),
-    get_extra_info() and close().  It adds drain() which returns an
-    optional Future on which you can wait for flow control.  It also
-    adds a transport property which references the Transport
-    directly.
-    """
-
-    _source_traceback = None
-
-    def __init__(self, mode, *,
-                 transport=None,
-                 protocol=None,
-                 loop=None,
-                 limit=_DEFAULT_LIMIT,
-                 is_server_side=False,
-                 _asyncio_internal=False):
-        if not _asyncio_internal:
-            raise RuntimeError(f"{self.__class__} should be instantiated "
-                               "by asyncio internals only")
-        self._mode = mode
-        self._transport = transport
-        self._protocol = protocol
-        self._is_server_side = is_server_side
-
-        # The line length limit is  a security feature;
-        # it also doubles as half the buffer limit.
-
-        if limit <= 0:
-            raise ValueError('Limit cannot be <= 0')
-
-        self._limit = limit
-        if loop is None:
-            self._loop = events.get_event_loop()
-        else:
-            self._loop = loop
-        self._buffer = bytearray()
-        self._eof = False    # Whether we're done.
-        self._waiter = None  # A future used by _wait_for_data()
-        self._exception = None
-        self._paused = False
-        self._complete_fut = self._loop.create_future()
-        self._complete_fut.set_result(None)
-
-        if self._loop.get_debug():
-            self._source_traceback = format_helpers.extract_stack(
-                sys._getframe(1))
-
-    def __repr__(self):
-        info = [self.__class__.__name__]
-        info.append(f'mode={self._mode}')
-        if self._buffer:
-            info.append(f'{len(self._buffer)} bytes')
-        if self._eof:
-            info.append('eof')
-        if self._limit != _DEFAULT_LIMIT:
-            info.append(f'limit={self._limit}')
-        if self._waiter:
-            info.append(f'waiter={self._waiter!r}')
-        if self._exception:
-            info.append(f'exception={self._exception!r}')
-        if self._transport:
-            info.append(f'transport={self._transport!r}')
-        if self._paused:
-            info.append('paused')
-        return '<{}>'.format(' '.join(info))
-
-    @property
-    def mode(self):
-        return self._mode
-
-    def is_server_side(self):
-        return self._is_server_side
-
-    @property
-    def transport(self):
-        warnings.warn("Stream.transport attribute is deprecated "
-                      "since Python 3.8 and is scheduled for removal in 3.10; "
-                      "it is an internal API",
-                      DeprecationWarning,
-                      stacklevel=2)
-        return self._transport
-
-    def write(self, data):
-        _ensure_can_write(self._mode)
-        self._transport.write(data)
-        return self._fast_drain()
-
-    def writelines(self, data):
-        _ensure_can_write(self._mode)
-        self._transport.writelines(data)
-        return self._fast_drain()
-
-    def _fast_drain(self):
-        # The helper tries to use fast-path to return already existing
-        # complete future object if underlying transport is not paused
-        # and actual waiting for writing resume is not needed
-        exc = self.exception()
-        if exc is not None:
-            fut = self._loop.create_future()
-            fut.set_exception(exc)
-            return fut
-        if not self._transport.is_closing():
-            if self._protocol._connection_lost:
-                fut = self._loop.create_future()
-                fut.set_exception(ConnectionResetError('Connection lost'))
-                return fut
-            if not self._protocol._paused:
-                # fast path, the stream is not paused
-                # no need to wait for resume signal
-                return self._complete_fut
-        return _OptionalAwait(self.drain)
-
-    def write_eof(self):
-        _ensure_can_write(self._mode)
-        return self._transport.write_eof()
-
-    def can_write_eof(self):
-        if not self._mode.is_write():
-            return False
-        return self._transport.can_write_eof()
-
-    def close(self):
-        self._transport.close()
-        return _OptionalAwait(self.wait_closed)
-
-    def is_closing(self):
-        return self._transport.is_closing()
-
-    async def abort(self):
-        self._transport.abort()
-        await self.wait_closed()
-
-    async def wait_closed(self):
-        await self._protocol._get_close_waiter(self)
-
-    def get_extra_info(self, name, default=None):
-        return self._transport.get_extra_info(name, default)
-
-    async def drain(self):
-        """Flush the write buffer.
-
-        The intended use is to write
-
-          w.write(data)
-          await w.drain()
-        """
-        _ensure_can_write(self._mode)
-        exc = self.exception()
-        if exc is not None:
-            raise exc
-        if self._transport.is_closing():
-            # Wait for protocol.connection_lost() call
-            # Raise connection closing error if any,
-            # ConnectionResetError otherwise
-            await tasks.sleep(0)
-        await self._protocol._drain_helper()
-
-    async def sendfile(self, file, offset=0, count=None, *, fallback=True):
-        await self.drain()  # check for stream mode and exceptions
-        return await self._loop.sendfile(self._transport, file,
-                                         offset, count, fallback=fallback)
-
-    async def start_tls(self, sslcontext, *,
-                        server_hostname=None,
-                        ssl_handshake_timeout=None):
-        await self.drain()  # check for stream mode and exceptions
-        transport = await self._loop.start_tls(
-            self._transport, self._protocol, sslcontext,
-            server_side=self._is_server_side,
-            server_hostname=server_hostname,
-            ssl_handshake_timeout=ssl_handshake_timeout)
-        self._transport = transport
-        self._protocol._transport = transport
-        self._protocol._over_ssl = True
-
-    def exception(self):
-        return self._exception
-
-    def set_exception(self, exc):
-        warnings.warn("Stream.set_exception() is deprecated "
-                      "since Python 3.8 and is scheduled for removal in 3.10; "
-                      "it is an internal API",
-                      DeprecationWarning,
-                      stacklevel=2)
-        self._set_exception(exc)
-
-    def _set_exception(self, exc):
-        self._exception = exc
-
-        waiter = self._waiter
-        if waiter is not None:
-            self._waiter = None
-            if not waiter.cancelled():
-                waiter.set_exception(exc)
-
-    def _wakeup_waiter(self):
-        """Wakeup read*() functions waiting for data or EOF."""
-        waiter = self._waiter
-        if waiter is not None:
-            self._waiter = None
-            if not waiter.cancelled():
-                waiter.set_result(None)
-
-    def set_transport(self, transport):
-        warnings.warn("Stream.set_transport() is deprecated "
-                      "since Python 3.8 and is scheduled for removal in 3.10; "
-                      "it is an internal API",
-                      DeprecationWarning,
-                      stacklevel=2)
-        self._set_transport(transport)
-
-    def _set_transport(self, transport):
-        if transport is self._transport:
-            return
-        assert self._transport is None, 'Transport already set'
-        self._transport = transport
-
-    def _maybe_resume_transport(self):
-        if self._paused and len(self._buffer) <= self._limit:
-            self._paused = False
-            self._transport.resume_reading()
-
-    def feed_eof(self):
-        warnings.warn("Stream.feed_eof() is deprecated "
-                      "since Python 3.8 and is scheduled for removal in 3.10; "
-                      "it is an internal API",
-                      DeprecationWarning,
-                      stacklevel=2)
-        self._feed_eof()
-
-    def _feed_eof(self):
-        self._eof = True
-        self._wakeup_waiter()
-
-    def at_eof(self):
-        """Return True if the buffer is empty and 'feed_eof' was called."""
-        return self._eof and not self._buffer
-
-    def feed_data(self, data):
-        warnings.warn("Stream.feed_data() is deprecated "
-                      "since Python 3.8 and is scheduled for removal in 3.10; "
-                      "it is an internal API",
-                      DeprecationWarning,
-                      stacklevel=2)
-        self._feed_data(data)
-
-    def _feed_data(self, data):
-        _ensure_can_read(self._mode)
-        assert not self._eof, 'feed_data after feed_eof'
-
-        if not data:
-            return
-
-        self._buffer.extend(data)
-        self._wakeup_waiter()
-
-        if (self._transport is not None and
-                not self._paused and
-                len(self._buffer) > 2 * self._limit):
-            try:
-                self._transport.pause_reading()
-            except NotImplementedError:
-                # The transport can't be paused.
-                # We'll just have to buffer all data.
-                # Forget the transport so we don't keep trying.
-                self._transport = None
-            else:
-                self._paused = True
-
-    async def _wait_for_data(self, func_name):
-        """Wait until feed_data() or feed_eof() is called.
-
-        If stream was paused, automatically resume it.
-        """
-        # StreamReader uses a future to link the protocol feed_data() method
-        # to a read coroutine. Running two read coroutines at the same time
-        # would have an unexpected behaviour. It would not possible to know
-        # which coroutine would get the next data.
-        if self._waiter is not None:
-            raise RuntimeError(
-                f'{func_name}() called while another coroutine is '
-                f'already waiting for incoming data')
-
-        assert not self._eof, '_wait_for_data after EOF'
-
-        # Waiting for data while paused will make deadlock, so prevent it.
-        # This is essential for readexactly(n) for case when n > self._limit.
-        if self._paused:
-            self._paused = False
-            self._transport.resume_reading()
-
-        self._waiter = self._loop.create_future()
-        try:
-            await self._waiter
-        finally:
-            self._waiter = None
-
-    async def readline(self):
-        """Read chunk of data from the stream until newline (b'\n') is found.
-
-        On success, return chunk that ends with newline. If only partial
-        line can be read due to EOF, return incomplete line without
-        terminating newline. When EOF was reached while no bytes read, empty
-        bytes object is returned.
-
-        If limit is reached, ValueError will be raised. In that case, if
-        newline was found, complete line including newline will be removed
-        from internal buffer. Else, internal buffer will be cleared. Limit is
-        compared against part of the line without newline.
-
-        If stream was paused, this function will automatically resume it if
-        needed.
-        """
-        _ensure_can_read(self._mode)
-        sep = b'\n'
-        seplen = len(sep)
-        try:
-            line = await self.readuntil(sep)
-        except exceptions.IncompleteReadError as e:
-            return e.partial
-        except exceptions.LimitOverrunError as e:
-            if self._buffer.startswith(sep, e.consumed):
-                del self._buffer[:e.consumed + seplen]
-            else:
-                self._buffer.clear()
-            self._maybe_resume_transport()
-            raise ValueError(e.args[0])
-        return line
-
-    async def readuntil(self, separator=b'\n'):
-        """Read data from the stream until ``separator`` is found.
-
-        On success, the data and separator will be removed from the
-        internal buffer (consumed). Returned data will include the
-        separator at the end.
-
-        Configured stream limit is used to check result. Limit sets the
-        maximal length of data that can be returned, not counting the
-        separator.
-
-        If an EOF occurs and the complete separator is still not found,
-        an IncompleteReadError exception will be raised, and the internal
-        buffer will be reset.  The IncompleteReadError.partial attribute
-        may contain the separator partially.
-
-        If the data cannot be read because of over limit, a
-        LimitOverrunError exception  will be raised, and the data
-        will be left in the internal buffer, so it can be read again.
-        """
-        _ensure_can_read(self._mode)
-        seplen = len(separator)
-        if seplen == 0:
-            raise ValueError('Separator should be at least one-byte string')
-
-        if self._exception is not None:
-            raise self._exception
-
-        # Consume whole buffer except last bytes, which length is
-        # one less than seplen. Let's check corner cases with
-        # separator='SEPARATOR':
-        # * we have received almost complete separator (without last
-        #   byte). i.e buffer='some textSEPARATO'. In this case we
-        #   can safely consume len(separator) - 1 bytes.
-        # * last byte of buffer is first byte of separator, i.e.
-        #   buffer='abcdefghijklmnopqrS'. We may safely consume
-        #   everything except that last byte, but this require to
-        #   analyze bytes of buffer that match partial separator.
-        #   This is slow and/or require FSM. For this case our
-        #   implementation is not optimal, since require rescanning
-        #   of data that is known to not belong to separator. In
-        #   real world, separator will not be so long to notice
-        #   performance problems. Even when reading MIME-encoded
-        #   messages :)
-
-        # `offset` is the number of bytes from the beginning of the buffer
-        # where there is no occurrence of `separator`.
-        offset = 0
-
-        # Loop until we find `separator` in the buffer, exceed the buffer size,
-        # or an EOF has happened.
-        while True:
-            buflen = len(self._buffer)
-
-            # Check if we now have enough data in the buffer for `separator` to
-            # fit.
-            if buflen - offset >= seplen:
-                isep = self._buffer.find(separator, offset)
-
-                if isep != -1:
-                    # `separator` is in the buffer. `isep` will be used later
-                    # to retrieve the data.
-                    break
-
-                # see upper comment for explanation.
-                offset = buflen + 1 - seplen
-                if offset > self._limit:
-                    raise exceptions.LimitOverrunError(
-                        'Separator is not found, and chunk exceed the limit',
-                        offset)
-
-            # Complete message (with full separator) may be present in buffer
-            # even when EOF flag is set. This may happen when the last chunk
-            # adds data which makes separator be found. That's why we check for
-            # EOF *ater* inspecting the buffer.
-            if self._eof:
-                chunk = bytes(self._buffer)
-                self._buffer.clear()
-                raise exceptions.IncompleteReadError(chunk, None)
-
-            # _wait_for_data() will resume reading if stream was paused.
-            await self._wait_for_data('readuntil')
-
-        if isep > self._limit:
-            raise exceptions.LimitOverrunError(
-                'Separator is found, but chunk is longer than limit', isep)
-
-        chunk = self._buffer[:isep + seplen]
-        del self._buffer[:isep + seplen]
-        self._maybe_resume_transport()
-        return bytes(chunk)
-
-    async def read(self, n=-1):
-        """Read up to `n` bytes from the stream.
-
-        If n is not provided, or set to -1, read until EOF and return all read
-        bytes. If the EOF was received and the internal buffer is empty, return
-        an empty bytes object.
-
-        If n is zero, return empty bytes object immediately.
-
-        If n is positive, this function try to read `n` bytes, and may return
-        less or equal bytes than requested, but at least one byte. If EOF was
-        received before any byte is read, this function returns empty byte
-        object.
-
-        Returned value is not limited with limit, configured at stream
-        creation.
-
-        If stream was paused, this function will automatically resume it if
-        needed.
-        """
-        _ensure_can_read(self._mode)
-
-        if self._exception is not None:
-            raise self._exception
-
-        if n == 0:
-            return b''
-
-        if n < 0:
-            # This used to just loop creating a new waiter hoping to
-            # collect everything in self._buffer, but that would
-            # deadlock if the subprocess sends more than self.limit
-            # bytes.  So just call self.read(self._limit) until EOF.
-            blocks = []
-            while True:
-                block = await self.read(self._limit)
-                if not block:
-                    break
-                blocks.append(block)
-            return b''.join(blocks)
-
-        if not self._buffer and not self._eof:
-            await self._wait_for_data('read')
-
-        # This will work right even if buffer is less than n bytes
-        data = bytes(self._buffer[:n])
-        del self._buffer[:n]
-
-        self._maybe_resume_transport()
-        return data
-
-    async def readexactly(self, n):
-        """Read exactly `n` bytes.
-
-        Raise an IncompleteReadError if EOF is reached before `n` bytes can be
-        read. The IncompleteReadError.partial attribute of the exception will
-        contain the partial read bytes.
-
-        if n is zero, return empty bytes object.
-
-        Returned value is not limited with limit, configured at stream
-        creation.
-
-        If stream was paused, this function will automatically resume it if
-        needed.
-        """
-        _ensure_can_read(self._mode)
-        if n < 0:
-            raise ValueError('readexactly size can not be less than zero')
-
-        if self._exception is not None:
-            raise self._exception
-
-        if n == 0:
-            return b''
-
-        while len(self._buffer) < n:
-            if self._eof:
-                incomplete = bytes(self._buffer)
-                self._buffer.clear()
-                raise exceptions.IncompleteReadError(incomplete, n)
-
-            await self._wait_for_data('readexactly')
-
-        if len(self._buffer) == n:
-            data = bytes(self._buffer)
-            self._buffer.clear()
-        else:
-            data = bytes(self._buffer[:n])
-            del self._buffer[:n]
-        self._maybe_resume_transport()
-        return data
-
-    def __aiter__(self):
-        _ensure_can_read(self._mode)
-        return self
-
-    async def __anext__(self):
-        val = await self.readline()
-        if val == b'':
-            raise StopAsyncIteration
-        return val
-
-    async def __aenter__(self):
-        return self
-
-    async def __aexit__(self, exc_type, exc_val, exc_tb):
-        await self.close()
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index ce504b8b0cee..c9506b158302 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -19,16 +19,14 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
                                protocols.SubprocessProtocol):
     """Like StreamReaderProtocol, but for a subprocess."""
 
-    def __init__(self, limit, loop, *, _asyncio_internal=False):
-        super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
+    def __init__(self, limit, loop):
+        super().__init__(loop=loop)
         self._limit = limit
         self.stdin = self.stdout = self.stderr = None
         self._transport = None
         self._process_exited = False
         self._pipe_fds = []
         self._stdin_closed = self._loop.create_future()
-        self._stdout_closed = self._loop.create_future()
-        self._stderr_closed = self._loop.create_future()
 
     def __repr__(self):
         info = [self.__class__.__name__]
@@ -42,35 +40,27 @@ def __repr__(self):
 
     def connection_made(self, transport):
         self._transport = transport
+
         stdout_transport = transport.get_pipe_transport(1)
         if stdout_transport is not None:
-            self.stdout = streams.Stream(mode=streams.StreamMode.READ,
-                                         transport=stdout_transport,
-                                         protocol=self,
-                                         limit=self._limit,
-                                         loop=self._loop,
-                                         _asyncio_internal=True)
-            self.stdout._set_transport(stdout_transport)
+            self.stdout = streams.StreamReader(limit=self._limit,
+                                               loop=self._loop)
+            self.stdout.set_transport(stdout_transport)
             self._pipe_fds.append(1)
 
         stderr_transport = transport.get_pipe_transport(2)
         if stderr_transport is not None:
-            self.stderr = streams.Stream(mode=streams.StreamMode.READ,
-                                         transport=stderr_transport,
-                                         protocol=self,
-                                         limit=self._limit,
-                                         loop=self._loop,
-                                         _asyncio_internal=True)
-            self.stderr._set_transport(stderr_transport)
+            self.stderr = streams.StreamReader(limit=self._limit,
+                                               loop=self._loop)
+            self.stderr.set_transport(stderr_transport)
             self._pipe_fds.append(2)
 
         stdin_transport = transport.get_pipe_transport(0)
         if stdin_transport is not None:
-            self.stdin = streams.Stream(mode=streams.StreamMode.WRITE,
-                                        transport=stdin_transport,
-                                        protocol=self,
-                                        loop=self._loop,
-                                        _asyncio_internal=True)
+            self.stdin = streams.StreamWriter(stdin_transport,
+                                              protocol=self,
+                                              reader=None,
+                                              loop=self._loop)
 
     def pipe_data_received(self, fd, data):
         if fd == 1:
@@ -80,7 +70,7 @@ def pipe_data_received(self, fd, data):
         else:
             reader = None
         if reader is not None:
-            reader._feed_data(data)
+            reader.feed_data(data)
 
     def pipe_connection_lost(self, fd, exc):
         if fd == 0:
@@ -101,9 +91,9 @@ def pipe_connection_lost(self, fd, exc):
             reader = None
         if reader is not None:
             if exc is None:
-                reader._feed_eof()
+                reader.feed_eof()
             else:
-                reader._set_exception(exc)
+                reader.set_exception(exc)
 
         if fd in self._pipe_fds:
             self._pipe_fds.remove(fd)
@@ -121,20 +111,10 @@ def _maybe_close_transport(self):
     def _get_close_waiter(self, stream):
         if stream is self.stdin:
             return self._stdin_closed
-        elif stream is self.stdout:
-            return self._stdout_closed
-        elif stream is self.stderr:
-            return self._stderr_closed
 
 
 class Process:
-    def __init__(self, transport, protocol, loop, *, _asyncio_internal=False):
-        if not _asyncio_internal:
-            warnings.warn(f"{self.__class__} should be instaniated "
-                          "by asyncio internals only, "
-                          "please avoid its creation from user code",
-                          DeprecationWarning)
-
+    def __init__(self, transport, protocol, loop):
         self._transport = transport
         self._protocol = protocol
         self._loop = loop
@@ -232,13 +212,12 @@ def kill(self):
         )
 
     protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
-                                                        loop=loop,
-                                                        _asyncio_internal=True)
+                                                        loop=loop)
     transport, protocol = await loop.subprocess_shell(
         protocol_factory,
         cmd, stdin=stdin, stdout=stdout,
         stderr=stderr, **kwds)
-    return Process(transport, protocol, loop, _asyncio_internal=True)
+    return Process(transport, protocol, loop)
 
 
 async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
@@ -253,11 +232,10 @@ def kill(self):
                       stacklevel=2
         )
     protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
-                                                        loop=loop,
-                                                        _asyncio_internal=True)
+                                                        loop=loop)
     transport, protocol = await loop.subprocess_exec(
         protocol_factory,
         program, *args,
         stdin=stdin, stdout=stdout,
         stderr=stderr, **kwds)
-    return Process(transport, protocol, loop, _asyncio_internal=True)
+    return Process(transport, protocol, loop)
diff --git a/Lib/test/test_asyncio/test_buffered_proto.py b/Lib/test/test_asyncio/test_buffered_proto.py
index b1531fb9343f..f24e363ebfcf 100644
--- a/Lib/test/test_asyncio/test_buffered_proto.py
+++ b/Lib/test/test_asyncio/test_buffered_proto.py
@@ -58,10 +58,9 @@ def on_buf(buf):
             writer.close()
             await writer.wait_closed()
 
-        with self.assertWarns(DeprecationWarning):
-            srv = self.loop.run_until_complete(
-                asyncio.start_server(
-                    on_server_client, '127.0.0.1', 0))
+        srv = self.loop.run_until_complete(
+            asyncio.start_server(
+                on_server_client, '127.0.0.1', 0))
 
         addr = srv.sockets[0].getsockname()
         self.loop.run_until_complete(
diff --git a/Lib/test/test_asyncio/test_pep492.py b/Lib/test/test_asyncio/test_pep492.py
index 58a609444285..a1f27dd5721c 100644
--- a/Lib/test/test_asyncio/test_pep492.py
+++ b/Lib/test/test_asyncio/test_pep492.py
@@ -95,11 +95,9 @@ class StreamReaderTests(BaseTest):
     def test_readline(self):
         DATA = b'line1\nline2\nline3'
 
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(DATA)
-        stream._feed_eof()
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(DATA)
+        stream.feed_eof()
 
         async def reader():
             data = []
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index 6325ee3983cc..b9413ab35fc5 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -1,8 +1,6 @@
 """Tests for streams.py."""
 
-import contextlib
 import gc
-import io
 import os
 import queue
 import pickle
@@ -18,7 +16,6 @@
     ssl = None
 
 import asyncio
-from asyncio.streams import _StreamProtocol, _ensure_can_read, _ensure_can_write
 from test.test_asyncio import utils as test_utils
 
 
@@ -26,24 +23,6 @@ def tearDownModule():
     asyncio.set_event_loop_policy(None)
 
 
-class StreamModeTests(unittest.TestCase):
-    def test__ensure_can_read_ok(self):
-        self.assertIsNone(_ensure_can_read(asyncio.StreamMode.READ))
-        self.assertIsNone(_ensure_can_read(asyncio.StreamMode.READWRITE))
-
-    def test__ensure_can_read_fail(self):
-        with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
-            _ensure_can_read(asyncio.StreamMode.WRITE)
-
-    def test__ensure_can_write_ok(self):
-        self.assertIsNone(_ensure_can_write(asyncio.StreamMode.WRITE))
-        self.assertIsNone(_ensure_can_write(asyncio.StreamMode.READWRITE))
-
-    def test__ensure_can_write_fail(self):
-        with self.assertRaisesRegex(RuntimeError, "The stream is read-only"):
-            _ensure_can_write(asyncio.StreamMode.READ)
-
-
 class StreamTests(test_utils.TestCase):
 
     DATA = b'line1\nline2\nline3\n'
@@ -63,8 +42,7 @@ def tearDown(self):
 
     @mock.patch('asyncio.streams.events')
     def test_ctor_global_loop(self, m_events):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader()
         self.assertIs(stream._loop, m_events.get_event_loop.return_value)
 
     def _basetest_open_connection(self, open_connection_fut):
@@ -100,8 +78,7 @@ def _basetest_open_connection_no_loop_ssl(self, open_connection_fut):
         self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
         try:
             with self.assertWarns(DeprecationWarning):
-                reader, writer = self.loop.run_until_complete(
-                    open_connection_fut)
+                reader, writer = self.loop.run_until_complete(open_connection_fut)
         finally:
             asyncio.set_event_loop(None)
         writer.write(b'GET / HTTP/1.0\r\n\r\n')
@@ -161,27 +138,21 @@ def test_open_unix_connection_error(self):
             self._basetest_open_connection_error(conn_fut)
 
     def test_feed_empty_data(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
 
-        stream._feed_data(b'')
+        stream.feed_data(b'')
         self.assertEqual(b'', stream._buffer)
 
     def test_feed_nonempty_data(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
 
-        stream._feed_data(self.DATA)
+        stream.feed_data(self.DATA)
         self.assertEqual(self.DATA, stream._buffer)
 
     def test_read_zero(self):
         # Read zero bytes.
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(self.DATA)
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(self.DATA)
 
         data = self.loop.run_until_complete(stream.read(0))
         self.assertEqual(b'', data)
@@ -189,13 +160,11 @@ def test_read_zero(self):
 
     def test_read(self):
         # Read bytes.
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
         read_task = self.loop.create_task(stream.read(30))
 
         def cb():
-            stream._feed_data(self.DATA)
+            stream.feed_data(self.DATA)
         self.loop.call_soon(cb)
 
         data = self.loop.run_until_complete(read_task)
@@ -204,11 +173,9 @@ def cb():
 
     def test_read_line_breaks(self):
         # Read bytes without line breaks.
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'line1')
-        stream._feed_data(b'line2')
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(b'line1')
+        stream.feed_data(b'line2')
 
         data = self.loop.run_until_complete(stream.read(5))
 
@@ -217,13 +184,11 @@ def test_read_line_breaks(self):
 
     def test_read_eof(self):
         # Read bytes, stop at eof.
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
         read_task = self.loop.create_task(stream.read(1024))
 
         def cb():
-            stream._feed_eof()
+            stream.feed_eof()
         self.loop.call_soon(cb)
 
         data = self.loop.run_until_complete(read_task)
@@ -232,15 +197,13 @@ def cb():
 
     def test_read_until_eof(self):
         # Read all bytes until eof.
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
         read_task = self.loop.create_task(stream.read(-1))
 
         def cb():
-            stream._feed_data(b'chunk1\n')
-            stream._feed_data(b'chunk2')
-            stream._feed_eof()
+            stream.feed_data(b'chunk1\n')
+            stream.feed_data(b'chunk2')
+            stream.feed_eof()
         self.loop.call_soon(cb)
 
         data = self.loop.run_until_complete(read_task)
@@ -249,34 +212,26 @@ def cb():
         self.assertEqual(b'', stream._buffer)
 
     def test_read_exception(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'line\n')
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(b'line\n')
 
         data = self.loop.run_until_complete(stream.read(2))
         self.assertEqual(b'li', data)
 
-        stream._set_exception(ValueError())
+        stream.set_exception(ValueError())
         self.assertRaises(
             ValueError, self.loop.run_until_complete, stream.read(2))
 
     def test_invalid_limit(self):
         with self.assertRaisesRegex(ValueError, 'imit'):
-            asyncio.Stream(mode=asyncio.StreamMode.READ,
-                           limit=0, loop=self.loop,
-                           _asyncio_internal=True)
+            asyncio.StreamReader(limit=0, loop=self.loop)
 
         with self.assertRaisesRegex(ValueError, 'imit'):
-            asyncio.Stream(mode=asyncio.StreamMode.READ,
-                           limit=-1, loop=self.loop,
-                           _asyncio_internal=True)
+            asyncio.StreamReader(limit=-1, loop=self.loop)
 
     def test_read_limit(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                limit=3, loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'chunk')
+        stream = asyncio.StreamReader(limit=3, loop=self.loop)
+        stream.feed_data(b'chunk')
         data = self.loop.run_until_complete(stream.read(5))
         self.assertEqual(b'chunk', data)
         self.assertEqual(b'', stream._buffer)
@@ -284,16 +239,14 @@ def test_read_limit(self):
     def test_readline(self):
         # Read one line. 'readline' will need to wait for the data
         # to come from 'cb'
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'chunk1 ')
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(b'chunk1 ')
         read_task = self.loop.create_task(stream.readline())
 
         def cb():
-            stream._feed_data(b'chunk2 ')
-            stream._feed_data(b'chunk3 ')
-            stream._feed_data(b'\n chunk4')
+            stream.feed_data(b'chunk2 ')
+            stream.feed_data(b'chunk3 ')
+            stream.feed_data(b'\n chunk4')
         self.loop.call_soon(cb)
 
         line = self.loop.run_until_complete(read_task)
@@ -301,26 +254,22 @@ def cb():
         self.assertEqual(b' chunk4', stream._buffer)
 
     def test_readline_limit_with_existing_data(self):
-        # Read one line. The data is in Stream's buffer
+        # Read one line. The data is in StreamReader's buffer
         # before the event loop is run.
 
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                limit=3, loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'li')
-        stream._feed_data(b'ne1\nline2\n')
+        stream = asyncio.StreamReader(limit=3, loop=self.loop)
+        stream.feed_data(b'li')
+        stream.feed_data(b'ne1\nline2\n')
 
         self.assertRaises(
             ValueError, self.loop.run_until_complete, stream.readline())
         # The buffer should contain the remaining data after exception
         self.assertEqual(b'line2\n', stream._buffer)
 
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                limit=3, loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'li')
-        stream._feed_data(b'ne1')
-        stream._feed_data(b'li')
+        stream = asyncio.StreamReader(limit=3, loop=self.loop)
+        stream.feed_data(b'li')
+        stream.feed_data(b'ne1')
+        stream.feed_data(b'li')
 
         self.assertRaises(
             ValueError, self.loop.run_until_complete, stream.readline())
@@ -332,34 +281,30 @@ def test_readline_limit_with_existing_data(self):
         self.assertEqual(b'', stream._buffer)
 
     def test_at_eof(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
         self.assertFalse(stream.at_eof())
 
-        stream._feed_data(b'some data\n')
+        stream.feed_data(b'some data\n')
         self.assertFalse(stream.at_eof())
 
         self.loop.run_until_complete(stream.readline())
         self.assertFalse(stream.at_eof())
 
-        stream._feed_data(b'some data\n')
-        stream._feed_eof()
+        stream.feed_data(b'some data\n')
+        stream.feed_eof()
         self.loop.run_until_complete(stream.readline())
         self.assertTrue(stream.at_eof())
 
     def test_readline_limit(self):
-        # Read one line. Streams are fed with data after
+        # Read one line. StreamReaders are fed with data after
         # their 'readline' methods are called.
 
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                limit=7, loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(limit=7, loop=self.loop)
         def cb():
-            stream._feed_data(b'chunk1')
-            stream._feed_data(b'chunk2')
-            stream._feed_data(b'chunk3\n')
-            stream._feed_eof()
+            stream.feed_data(b'chunk1')
+            stream.feed_data(b'chunk2')
+            stream.feed_data(b'chunk3\n')
+            stream.feed_eof()
         self.loop.call_soon(cb)
 
         self.assertRaises(
@@ -368,14 +313,12 @@ def cb():
         # a ValueError it should be empty.
         self.assertEqual(b'', stream._buffer)
 
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                limit=7, loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(limit=7, loop=self.loop)
         def cb():
-            stream._feed_data(b'chunk1')
-            stream._feed_data(b'chunk2\n')
-            stream._feed_data(b'chunk3\n')
-            stream._feed_eof()
+            stream.feed_data(b'chunk1')
+            stream.feed_data(b'chunk2\n')
+            stream.feed_data(b'chunk3\n')
+            stream.feed_eof()
         self.loop.call_soon(cb)
 
         self.assertRaises(
@@ -383,20 +326,18 @@ def cb():
         self.assertEqual(b'chunk3\n', stream._buffer)
 
         # check strictness of the limit
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                limit=7, loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'1234567\n')
+        stream = asyncio.StreamReader(limit=7, loop=self.loop)
+        stream.feed_data(b'1234567\n')
         line = self.loop.run_until_complete(stream.readline())
         self.assertEqual(b'1234567\n', line)
         self.assertEqual(b'', stream._buffer)
 
-        stream._feed_data(b'12345678\n')
+        stream.feed_data(b'12345678\n')
         with self.assertRaises(ValueError) as cm:
             self.loop.run_until_complete(stream.readline())
         self.assertEqual(b'', stream._buffer)
 
-        stream._feed_data(b'12345678')
+        stream.feed_data(b'12345678')
         with self.assertRaises(ValueError) as cm:
             self.loop.run_until_complete(stream.readline())
         self.assertEqual(b'', stream._buffer)
@@ -404,11 +345,9 @@ def cb():
     def test_readline_nolimit_nowait(self):
         # All needed data for the first 'readline' call will be
         # in the buffer.
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(self.DATA[:6])
-        stream._feed_data(self.DATA[6:])
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(self.DATA[:6])
+        stream.feed_data(self.DATA[6:])
 
         line = self.loop.run_until_complete(stream.readline())
 
@@ -416,29 +355,23 @@ def test_readline_nolimit_nowait(self):
         self.assertEqual(b'line2\nline3\n', stream._buffer)
 
     def test_readline_eof(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'some data')
-        stream._feed_eof()
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(b'some data')
+        stream.feed_eof()
 
         line = self.loop.run_until_complete(stream.readline())
         self.assertEqual(b'some data', line)
 
     def test_readline_empty_eof(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_eof()
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_eof()
 
         line = self.loop.run_until_complete(stream.readline())
         self.assertEqual(b'', line)
 
     def test_readline_read_byte_count(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(self.DATA)
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(self.DATA)
 
         self.loop.run_until_complete(stream.readline())
 
@@ -448,89 +381,79 @@ def test_readline_read_byte_count(self):
         self.assertEqual(b'ine3\n', stream._buffer)
 
     def test_readline_exception(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'line\n')
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(b'line\n')
 
         data = self.loop.run_until_complete(stream.readline())
         self.assertEqual(b'line\n', data)
 
-        stream._set_exception(ValueError())
+        stream.set_exception(ValueError())
         self.assertRaises(
             ValueError, self.loop.run_until_complete, stream.readline())
         self.assertEqual(b'', stream._buffer)
 
     def test_readuntil_separator(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
         with self.assertRaisesRegex(ValueError, 'Separator should be'):
             self.loop.run_until_complete(stream.readuntil(separator=b''))
 
     def test_readuntil_multi_chunks(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
 
-        stream._feed_data(b'lineAAA')
+        stream.feed_data(b'lineAAA')
         data = self.loop.run_until_complete(stream.readuntil(separator=b'AAA'))
         self.assertEqual(b'lineAAA', data)
         self.assertEqual(b'', stream._buffer)
 
-        stream._feed_data(b'lineAAA')
+        stream.feed_data(b'lineAAA')
         data = self.loop.run_until_complete(stream.readuntil(b'AAA'))
         self.assertEqual(b'lineAAA', data)
         self.assertEqual(b'', stream._buffer)
 
-        stream._feed_data(b'lineAAAxxx')
+        stream.feed_data(b'lineAAAxxx')
         data = self.loop.run_until_complete(stream.readuntil(b'AAA'))
         self.assertEqual(b'lineAAA', data)
         self.assertEqual(b'xxx', stream._buffer)
 
     def test_readuntil_multi_chunks_1(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
 
-        stream._feed_data(b'QWEaa')
-        stream._feed_data(b'XYaa')
-        stream._feed_data(b'a')
+        stream.feed_data(b'QWEaa')
+        stream.feed_data(b'XYaa')
+        stream.feed_data(b'a')
         data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
         self.assertEqual(b'QWEaaXYaaa', data)
         self.assertEqual(b'', stream._buffer)
 
-        stream._feed_data(b'QWEaa')
-        stream._feed_data(b'XYa')
-        stream._feed_data(b'aa')
+        stream.feed_data(b'QWEaa')
+        stream.feed_data(b'XYa')
+        stream.feed_data(b'aa')
         data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
         self.assertEqual(b'QWEaaXYaaa', data)
         self.assertEqual(b'', stream._buffer)
 
-        stream._feed_data(b'aaa')
+        stream.feed_data(b'aaa')
         data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
         self.assertEqual(b'aaa', data)
         self.assertEqual(b'', stream._buffer)
 
-        stream._feed_data(b'Xaaa')
+        stream.feed_data(b'Xaaa')
         data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
         self.assertEqual(b'Xaaa', data)
         self.assertEqual(b'', stream._buffer)
 
-        stream._feed_data(b'XXX')
-        stream._feed_data(b'a')
-        stream._feed_data(b'a')
-        stream._feed_data(b'a')
+        stream.feed_data(b'XXX')
+        stream.feed_data(b'a')
+        stream.feed_data(b'a')
+        stream.feed_data(b'a')
         data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
         self.assertEqual(b'XXXaaa', data)
         self.assertEqual(b'', stream._buffer)
 
     def test_readuntil_eof(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'some dataAA')
-        stream._feed_eof()
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(b'some dataAA')
+        stream.feed_eof()
 
         with self.assertRaises(asyncio.IncompleteReadError) as cm:
             self.loop.run_until_complete(stream.readuntil(b'AAA'))
@@ -539,18 +462,15 @@ def test_readuntil_eof(self):
         self.assertEqual(b'', stream._buffer)
 
     def test_readuntil_limit_found_sep(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop, limit=3,
-                                _asyncio_internal=True)
-        stream._feed_data(b'some dataAA')
-
+        stream = asyncio.StreamReader(loop=self.loop, limit=3)
+        stream.feed_data(b'some dataAA')
         with self.assertRaisesRegex(asyncio.LimitOverrunError,
                                     'not found') as cm:
             self.loop.run_until_complete(stream.readuntil(b'AAA'))
 
         self.assertEqual(b'some dataAA', stream._buffer)
 
-        stream._feed_data(b'A')
+        stream.feed_data(b'A')
         with self.assertRaisesRegex(asyncio.LimitOverrunError,
                                     'is found') as cm:
             self.loop.run_until_complete(stream.readuntil(b'AAA'))
@@ -559,10 +479,8 @@ def test_readuntil_limit_found_sep(self):
 
     def test_readexactly_zero_or_less(self):
         # Read exact number of bytes (zero or less).
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(self.DATA)
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(self.DATA)
 
         data = self.loop.run_until_complete(stream.readexactly(0))
         self.assertEqual(b'', data)
@@ -574,17 +492,15 @@ def test_readexactly_zero_or_less(self):
 
     def test_readexactly(self):
         # Read exact number of bytes.
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
 
         n = 2 * len(self.DATA)
         read_task = self.loop.create_task(stream.readexactly(n))
 
         def cb():
-            stream._feed_data(self.DATA)
-            stream._feed_data(self.DATA)
-            stream._feed_data(self.DATA)
+            stream.feed_data(self.DATA)
+            stream.feed_data(self.DATA)
+            stream.feed_data(self.DATA)
         self.loop.call_soon(cb)
 
         data = self.loop.run_until_complete(read_task)
@@ -592,25 +508,21 @@ def cb():
         self.assertEqual(self.DATA, stream._buffer)
 
     def test_readexactly_limit(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                limit=3, loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'chunk')
+        stream = asyncio.StreamReader(limit=3, loop=self.loop)
+        stream.feed_data(b'chunk')
         data = self.loop.run_until_complete(stream.readexactly(5))
         self.assertEqual(b'chunk', data)
         self.assertEqual(b'', stream._buffer)
 
     def test_readexactly_eof(self):
         # Read exact number of bytes (eof).
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
         n = 2 * len(self.DATA)
         read_task = self.loop.create_task(stream.readexactly(n))
 
         def cb():
-            stream._feed_data(self.DATA)
-            stream._feed_eof()
+            stream.feed_data(self.DATA)
+            stream.feed_eof()
         self.loop.call_soon(cb)
 
         with self.assertRaises(asyncio.IncompleteReadError) as cm:
@@ -622,35 +534,29 @@ def cb():
         self.assertEqual(b'', stream._buffer)
 
     def test_readexactly_exception(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'line\n')
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(b'line\n')
 
         data = self.loop.run_until_complete(stream.readexactly(2))
         self.assertEqual(b'li', data)
 
-        stream._set_exception(ValueError())
+        stream.set_exception(ValueError())
         self.assertRaises(
             ValueError, self.loop.run_until_complete, stream.readexactly(2))
 
     def test_exception(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
         self.assertIsNone(stream.exception())
 
         exc = ValueError()
-        stream._set_exception(exc)
+        stream.set_exception(exc)
         self.assertIs(stream.exception(), exc)
 
     def test_exception_waiter(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
 
         async def set_err():
-            stream._set_exception(ValueError())
+            stream.set_exception(ValueError())
 
         t1 = self.loop.create_task(stream.readline())
         t2 = self.loop.create_task(set_err())
@@ -660,16 +566,14 @@ def test_exception_waiter(self):
         self.assertRaises(ValueError, t1.result)
 
     def test_exception_cancel(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
 
         t = self.loop.create_task(stream.readline())
         test_utils.run_briefly(self.loop)
         t.cancel()
         test_utils.run_briefly(self.loop)
         # The following line fails if set_exception() isn't careful.
-        stream._set_exception(RuntimeError('message'))
+        stream.set_exception(RuntimeError('message'))
         test_utils.run_briefly(self.loop)
         self.assertIs(stream._waiter, None)
 
@@ -829,7 +733,7 @@ def stop(self):
     def test_read_all_from_pipe_reader(self):
         # See asyncio issue 168.  This test is derived from the example
         # subprocess_attach_read_pipe.py, but we configure the
-        # Stream's limit so that twice it is less than the size
+        # StreamReader's limit so that twice it is less than the size
         # of the data writter.  Also we must explicitly attach a child
         # watcher to the event loop.
 
@@ -843,11 +747,8 @@ def test_read_all_from_pipe_reader(self):
         args = [sys.executable, '-c', code, str(wfd)]
 
         pipe = open(rfd, 'rb', 0)
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop, limit=1,
-                                _asyncio_internal=True)
-        protocol = _StreamProtocol(stream, loop=self.loop,
-                                   _asyncio_internal=True)
+        reader = asyncio.StreamReader(loop=self.loop, limit=1)
+        protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
         transport, _ = self.loop.run_until_complete(
             self.loop.connect_read_pipe(lambda: protocol, pipe))
 
@@ -865,30 +766,29 @@ def test_read_all_from_pipe_reader(self):
             asyncio.set_child_watcher(None)
 
         os.close(wfd)
-        data = self.loop.run_until_complete(stream.read(-1))
+        data = self.loop.run_until_complete(reader.read(-1))
         self.assertEqual(data, b'data')
 
     def test_streamreader_constructor(self):
         self.addCleanup(asyncio.set_event_loop, None)
         asyncio.set_event_loop(self.loop)
 
-        # asyncio issue #184: Ensure that _StreamProtocol constructor
+        # asyncio issue #184: Ensure that StreamReaderProtocol constructor
         # retrieves the current loop if the loop parameter is not set
-        reader = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                _asyncio_internal=True)
+        reader = asyncio.StreamReader()
         self.assertIs(reader._loop, self.loop)
 
     def test_streamreaderprotocol_constructor(self):
         self.addCleanup(asyncio.set_event_loop, None)
         asyncio.set_event_loop(self.loop)
 
-        # asyncio issue #184: Ensure that _StreamProtocol constructor
+        # asyncio issue #184: Ensure that StreamReaderProtocol constructor
         # retrieves the current loop if the loop parameter is not set
-        stream = mock.Mock()
-        protocol = _StreamProtocol(stream, _asyncio_internal=True)
+        reader = mock.Mock()
+        protocol = asyncio.StreamReaderProtocol(reader)
         self.assertIs(protocol._loop, self.loop)
 
-    def test_drain_raises_deprecated(self):
+    def test_drain_raises(self):
         # See http://bugs.python.org/issue25441
 
         # This test should not use asyncio for the mock server; the
@@ -902,7 +802,7 @@ def test_drain_raises_deprecated(self):
 
         def server():
             # Runs in a separate thread.
-            with socket.create_server(('127.0.0.1', 0)) as sock:
+            with socket.create_server(('localhost', 0)) as sock:
                 addr = sock.getsockname()
                 q.put(addr)
                 clt, _ = sock.accept()
@@ -933,106 +833,48 @@ def server():
         thread.join()
         self.assertEqual([], messages)
 
-    def test_drain_raises(self):
-        # See http://bugs.python.org/issue25441
-
-        # This test should not use asyncio for the mock server; the
-        # whole point of the test is to test for a bug in drain()
-        # where it never gives up the event loop but the socket is
-        # closed on the  server side.
-
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-        q = queue.Queue()
-
-        def server():
-            # Runs in a separate thread.
-            with socket.create_server(('localhost', 0)) as sock:
-                addr = sock.getsockname()
-                q.put(addr)
-                clt, _ = sock.accept()
-                clt.close()
-
-        async def client(host, port):
-            stream = await asyncio.connect(host, port)
-
-            while True:
-                stream.write(b"foo\n")
-                await stream.drain()
-
-        # Start the server thread and wait for it to be listening.
-        thread = threading.Thread(target=server)
-        thread.setDaemon(True)
-        thread.start()
-        addr = q.get()
-
-        # Should not be stuck in an infinite loop.
-        with self.assertRaises((ConnectionResetError, ConnectionAbortedError,
-                                BrokenPipeError)):
-            self.loop.run_until_complete(client(*addr))
-
-        # Clean up the thread.  (Only on success; on failure, it may
-        # be stuck in accept().)
-        thread.join()
-        self.assertEqual([], messages)
-
     def test___repr__(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        self.assertEqual("<Stream mode=StreamMode.READ>", repr(stream))
+        stream = asyncio.StreamReader(loop=self.loop)
+        self.assertEqual("<StreamReader>", repr(stream))
 
     def test___repr__nondefault_limit(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop, limit=123,
-                                _asyncio_internal=True)
-        self.assertEqual("<Stream mode=StreamMode.READ limit=123>", repr(stream))
+        stream = asyncio.StreamReader(loop=self.loop, limit=123)
+        self.assertEqual("<StreamReader limit=123>", repr(stream))
 
     def test___repr__eof(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_eof()
-        self.assertEqual("<Stream mode=StreamMode.READ eof>", repr(stream))
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_eof()
+        self.assertEqual("<StreamReader eof>", repr(stream))
 
     def test___repr__data(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._feed_data(b'data')
-        self.assertEqual("<Stream mode=StreamMode.READ 4 bytes>", repr(stream))
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream.feed_data(b'data')
+        self.assertEqual("<StreamReader 4 bytes>", repr(stream))
 
     def test___repr__exception(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
         exc = RuntimeError()
-        stream._set_exception(exc)
-        self.assertEqual("<Stream mode=StreamMode.READ exception=RuntimeError()>",
+        stream.set_exception(exc)
+        self.assertEqual("<StreamReader exception=RuntimeError()>",
                          repr(stream))
 
     def test___repr__waiter(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
-        stream._waiter = self.loop.create_future()
+        stream = asyncio.StreamReader(loop=self.loop)
+        stream._waiter = asyncio.Future(loop=self.loop)
         self.assertRegex(
             repr(stream),
-            r"<Stream .+ waiter=<Future pending[\S ]*>>")
+            r"<StreamReader waiter=<Future pending[\S ]*>>")
         stream._waiter.set_result(None)
         self.loop.run_until_complete(stream._waiter)
         stream._waiter = None
-        self.assertEqual("<Stream mode=StreamMode.READ>", repr(stream))
+        self.assertEqual("<StreamReader>", repr(stream))
 
     def test___repr__transport(self):
-        stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                loop=self.loop,
-                                _asyncio_internal=True)
+        stream = asyncio.StreamReader(loop=self.loop)
         stream._transport = mock.Mock()
         stream._transport.__repr__ = mock.Mock()
         stream._transport.__repr__.return_value = "<Transport>"
-        self.assertEqual("<Stream mode=StreamMode.READ transport=<Transport>>",
-                         repr(stream))
+        self.assertEqual("<StreamReader transport=<Transport>>", repr(stream))
 
     def test_IncompleteReadError_pickleable(self):
         e = asyncio.IncompleteReadError(b'abc', 10)
@@ -1051,7 +893,7 @@ def test_LimitOverrunError_pickleable(self):
                 self.assertEqual(str(e), str(e2))
                 self.assertEqual(e.consumed, e2.consumed)
 
-    def test_wait_closed_on_close_deprecated(self):
+    def test_wait_closed_on_close(self):
         with test_utils.run_test_server() as httpd:
             with self.assertWarns(DeprecationWarning):
                 rd, wr = self.loop.run_until_complete(
@@ -1069,24 +911,7 @@ def test_wait_closed_on_close_deprecated(self):
             self.assertTrue(wr.is_closing())
             self.loop.run_until_complete(wr.wait_closed())
 
-    def test_wait_closed_on_close(self):
-        with test_utils.run_test_server() as httpd:
-            stream = self.loop.run_until_complete(
-                asyncio.connect(*httpd.address))
-
-            stream.write(b'GET / HTTP/1.0\r\n\r\n')
-            f = stream.readline()
-            data = self.loop.run_until_complete(f)
-            self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
-            f = stream.read()
-            data = self.loop.run_until_complete(f)
-            self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
-            self.assertFalse(stream.is_closing())
-            stream.close()
-            self.assertTrue(stream.is_closing())
-            self.loop.run_until_complete(stream.wait_closed())
-
-    def test_wait_closed_on_close_with_unread_data_deprecated(self):
+    def test_wait_closed_on_close_with_unread_data(self):
         with test_utils.run_test_server() as httpd:
             with self.assertWarns(DeprecationWarning):
                 rd, wr = self.loop.run_until_complete(
@@ -1099,44 +924,33 @@ def test_wait_closed_on_close_with_unread_data_deprecated(self):
             wr.close()
             self.loop.run_until_complete(wr.wait_closed())
 
-    def test_wait_closed_on_close_with_unread_data(self):
-        with test_utils.run_test_server() as httpd:
-            stream = self.loop.run_until_complete(
-                asyncio.connect(*httpd.address))
-
-            stream.write(b'GET / HTTP/1.0\r\n\r\n')
-            f = stream.readline()
-            data = self.loop.run_until_complete(f)
-            self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
-            stream.close()
-            self.loop.run_until_complete(stream.wait_closed())
-
     def test_del_stream_before_sock_closing(self):
         messages = []
         self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
 
-        async def test():
-
-            with test_utils.run_test_server() as httpd:
-                stream = await asyncio.connect(*httpd.address)
-                sock = stream.get_extra_info('socket')
-                self.assertNotEqual(sock.fileno(), -1)
+        with test_utils.run_test_server() as httpd:
+            with self.assertWarns(DeprecationWarning):
+                rd, wr = self.loop.run_until_complete(
+                    asyncio.open_connection(*httpd.address, loop=self.loop))
+            sock = wr.get_extra_info('socket')
+            self.assertNotEqual(sock.fileno(), -1)
 
-                await stream.write(b'GET / HTTP/1.0\r\n\r\n')
-                data = await stream.readline()
-                self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
+            wr.write(b'GET / HTTP/1.0\r\n\r\n')
+            f = rd.readline()
+            data = self.loop.run_until_complete(f)
+            self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
 
-                # drop refs to reader/writer
-                del stream
-                gc.collect()
-                # make a chance to close the socket
-                await asyncio.sleep(0)
+            # drop refs to reader/writer
+            del rd
+            del wr
+            gc.collect()
+            # make a chance to close the socket
+            test_utils.run_briefly(self.loop)
 
-                self.assertEqual(1, len(messages), messages)
-                self.assertEqual(sock.fileno(), -1)
+            self.assertEqual(1, len(messages))
+            self.assertEqual(sock.fileno(), -1)
 
-        self.loop.run_until_complete(test())
-        self.assertEqual(1, len(messages), messages)
+        self.assertEqual(1, len(messages))
         self.assertEqual('An open stream object is being garbage '
                          'collected; call "stream.close()" explicitly.',
                          messages[0]['message'])
@@ -1146,12 +960,9 @@ def test_del_stream_before_connection_made(self):
         self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
 
         with test_utils.run_test_server() as httpd:
-            stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                    loop=self.loop,
-                                    _asyncio_internal=True)
-            pr = _StreamProtocol(stream, loop=self.loop,
-                                 _asyncio_internal=True)
-            del stream
+            rd = asyncio.StreamReader(loop=self.loop)
+            pr = asyncio.StreamReaderProtocol(rd, loop=self.loop)
+            del rd
             gc.collect()
             tr, _ = self.loop.run_until_complete(
                 self.loop.create_connection(
@@ -1168,14 +979,15 @@ def test_del_stream_before_connection_made(self):
 
     def test_async_writer_api(self):
         async def inner(httpd):
-            stream = await asyncio.connect(*httpd.address)
+            rd, wr = await asyncio.open_connection(*httpd.address)
 
-            await stream.write(b'GET / HTTP/1.0\r\n\r\n')
-            data = await stream.readline()
+            wr.write(b'GET / HTTP/1.0\r\n\r\n')
+            data = await rd.readline()
             self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
-            data = await stream.read()
+            data = await rd.read()
             self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
-            await stream.close()
+            wr.close()
+            await wr.wait_closed()
 
         messages = []
         self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
@@ -1187,16 +999,17 @@ def test_async_writer_api(self):
 
     def test_async_writer_api_exception_after_close(self):
         async def inner(httpd):
-            stream = await asyncio.connect(*httpd.address)
+            rd, wr = await asyncio.open_connection(*httpd.address)
 
-            await stream.write(b'GET / HTTP/1.0\r\n\r\n')
-            data = await stream.readline()
+            wr.write(b'GET / HTTP/1.0\r\n\r\n')
+            data = await rd.readline()
             self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
-            data = await stream.read()
+            data = await rd.read()
             self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
-            stream.close()
+            wr.close()
             with self.assertRaises(ConnectionResetError):
-                await stream.write(b'data')
+                wr.write(b'data')
+                await wr.drain()
 
         messages = []
         self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
@@ -1227,587 +1040,6 @@ def test_eof_feed_when_closing_writer(self):
 
         self.assertEqual(messages, [])
 
-    def test_stream_reader_create_warning(self):
-        with contextlib.suppress(AttributeError):
-            del asyncio.StreamReader
-        with self.assertWarns(DeprecationWarning):
-            asyncio.StreamReader
-
-    def test_stream_writer_create_warning(self):
-        with contextlib.suppress(AttributeError):
-            del asyncio.StreamWriter
-        with self.assertWarns(DeprecationWarning):
-            asyncio.StreamWriter
-
-    def test_stream_reader_forbidden_ops(self):
-        async def inner():
-            stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                    _asyncio_internal=True)
-            with self.assertRaisesRegex(RuntimeError, "The stream is read-only"):
-                await stream.write(b'data')
-            with self.assertRaisesRegex(RuntimeError, "The stream is read-only"):
-                await stream.writelines([b'data', b'other'])
-            with self.assertRaisesRegex(RuntimeError, "The stream is read-only"):
-                stream.write_eof()
-            with self.assertRaisesRegex(RuntimeError, "The stream is read-only"):
-                await stream.drain()
-
-        self.loop.run_until_complete(inner())
-
-    def test_stream_writer_forbidden_ops(self):
-        async def inner():
-            stream = asyncio.Stream(mode=asyncio.StreamMode.WRITE,
-                                    _asyncio_internal=True)
-            with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
-                stream._feed_data(b'data')
-            with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
-                await stream.readline()
-            with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
-                await stream.readuntil()
-            with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
-                await stream.read()
-            with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
-                await stream.readexactly(10)
-            with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
-                async for chunk in stream:
-                    pass
-
-        self.loop.run_until_complete(inner())
-
-    def _basetest_connect(self, stream):
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-
-        stream.write(b'GET / HTTP/1.0\r\n\r\n')
-        f = stream.readline()
-        data = self.loop.run_until_complete(f)
-        self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
-        f = stream.read()
-        data = self.loop.run_until_complete(f)
-        self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
-        stream.close()
-        self.loop.run_until_complete(stream.wait_closed())
-
-        self.assertEqual([], messages)
-
-    def test_connect(self):
-        with test_utils.run_test_server() as httpd:
-            stream = self.loop.run_until_complete(
-                asyncio.connect(*httpd.address))
-            self.assertFalse(stream.is_server_side())
-            self._basetest_connect(stream)
-
-    @support.skip_unless_bind_unix_socket
-    def test_connect_unix(self):
-        with test_utils.run_test_unix_server() as httpd:
-            stream = self.loop.run_until_complete(
-                asyncio.connect_unix(httpd.address))
-            self._basetest_connect(stream)
-
-    def test_stream_async_context_manager(self):
-        async def test(httpd):
-            stream = await asyncio.connect(*httpd.address)
-            async with stream:
-                await stream.write(b'GET / HTTP/1.0\r\n\r\n')
-                data = await stream.readline()
-                self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
-                data = await stream.read()
-                self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
-            self.assertTrue(stream.is_closing())
-
-        with test_utils.run_test_server() as httpd:
-            self.loop.run_until_complete(test(httpd))
-
-    def test_connect_async_context_manager(self):
-        async def test(httpd):
-            async with asyncio.connect(*httpd.address) as stream:
-                await stream.write(b'GET / HTTP/1.0\r\n\r\n')
-                data = await stream.readline()
-                self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
-                data = await stream.read()
-                self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
-            self.assertTrue(stream.is_closing())
-
-        with test_utils.run_test_server() as httpd:
-            self.loop.run_until_complete(test(httpd))
-
-    @support.skip_unless_bind_unix_socket
-    def test_connect_unix_async_context_manager(self):
-        async def test(httpd):
-            async with asyncio.connect_unix(httpd.address) as stream:
-                await stream.write(b'GET / HTTP/1.0\r\n\r\n')
-                data = await stream.readline()
-                self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
-                data = await stream.read()
-                self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
-            self.assertTrue(stream.is_closing())
-
-        with test_utils.run_test_unix_server() as httpd:
-            self.loop.run_until_complete(test(httpd))
-
-    def test_stream_server(self):
-
-        async def handle_client(stream):
-            self.assertTrue(stream.is_server_side())
-            data = await stream.readline()
-            await stream.write(data)
-            await stream.close()
-
-        async def client(srv):
-            addr = srv.sockets[0].getsockname()
-            stream = await asyncio.connect(*addr)
-            # send a line
-            await stream.write(b"hello world!\n")
-            # read it back
-            msgback = await stream.readline()
-            await stream.close()
-            self.assertEqual(msgback, b"hello world!\n")
-            await srv.close()
-
-        async def test():
-            async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server:
-                await server.start_serving()
-                task = asyncio.create_task(client(server))
-                with contextlib.suppress(asyncio.CancelledError):
-                    await server.serve_forever()
-                await task
-
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-        self.loop.run_until_complete(test())
-        self.assertEqual(messages, [])
-
-    @support.skip_unless_bind_unix_socket
-    def test_unix_stream_server(self):
-
-        async def handle_client(stream):
-            data = await stream.readline()
-            await stream.write(data)
-            await stream.close()
-
-        async def client(srv):
-            addr = srv.sockets[0].getsockname()
-            stream = await asyncio.connect_unix(addr)
-            # send a line
-            await stream.write(b"hello world!\n")
-            # read it back
-            msgback = await stream.readline()
-            await stream.close()
-            self.assertEqual(msgback, b"hello world!\n")
-            await srv.close()
-
-        async def test():
-            with test_utils.unix_socket_path() as path:
-                async with asyncio.UnixStreamServer(handle_client, path) as server:
-                    await server.start_serving()
-                    task = asyncio.create_task(client(server))
-                    with contextlib.suppress(asyncio.CancelledError):
-                        await server.serve_forever()
-                    await task
-
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-        self.loop.run_until_complete(test())
-        self.assertEqual(messages, [])
-
-    def test_stream_server_inheritance_forbidden(self):
-        with self.assertRaises(TypeError):
-            class MyServer(asyncio.StreamServer):
-                pass
-
-    @support.skip_unless_bind_unix_socket
-    def test_unix_stream_server_inheritance_forbidden(self):
-        with self.assertRaises(TypeError):
-            class MyServer(asyncio.UnixStreamServer):
-                pass
-
-    def test_stream_server_bind(self):
-        async def handle_client(stream):
-            await stream.close()
-
-        async def test():
-            srv = asyncio.StreamServer(handle_client, '127.0.0.1', 0)
-            self.assertFalse(srv.is_bound())
-            self.assertEqual(0, len(srv.sockets))
-            await srv.bind()
-            self.assertTrue(srv.is_bound())
-            self.assertEqual(1, len(srv.sockets))
-            await srv.close()
-            self.assertFalse(srv.is_bound())
-            self.assertEqual(0, len(srv.sockets))
-
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-        self.loop.run_until_complete(test())
-        self.assertEqual(messages, [])
-
-    def test_stream_server_bind_async_with(self):
-        async def handle_client(stream):
-            await stream.close()
-
-        async def test():
-            async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as srv:
-                self.assertTrue(srv.is_bound())
-                self.assertEqual(1, len(srv.sockets))
-
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-        self.loop.run_until_complete(test())
-        self.assertEqual(messages, [])
-
-    def test_stream_server_start_serving(self):
-        async def handle_client(stream):
-            await stream.close()
-
-        async def test():
-            async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as srv:
-                self.assertFalse(srv.is_serving())
-                await srv.start_serving()
-                self.assertTrue(srv.is_serving())
-                await srv.close()
-                self.assertFalse(srv.is_serving())
-
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-        self.loop.run_until_complete(test())
-        self.assertEqual(messages, [])
-
-    def test_stream_server_close(self):
-        server_stream_aborted = False
-        fut1 = self.loop.create_future()
-        fut2 = self.loop.create_future()
-
-        async def handle_client(stream):
-            data = await stream.readexactly(4)
-            self.assertEqual(b'data', data)
-            fut1.set_result(None)
-            await fut2
-            self.assertEqual(b'', await stream.readline())
-            nonlocal server_stream_aborted
-            server_stream_aborted = True
-
-        async def client(srv):
-            addr = srv.sockets[0].getsockname()
-            stream = await asyncio.connect(*addr)
-            await stream.write(b'data')
-            await fut2
-            self.assertEqual(b'', await stream.readline())
-            await stream.close()
-
-        async def test():
-            async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server:
-                await server.start_serving()
-                task = asyncio.create_task(client(server))
-                await fut1
-                fut2.set_result(None)
-                await server.close()
-                await task
-
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-        self.loop.run_until_complete(asyncio.wait_for(test(), 60.0))
-        self.assertEqual(messages, [])
-        self.assertTrue(fut1.done())
-        self.assertTrue(fut2.done())
-        self.assertTrue(server_stream_aborted)
-
-    def test_stream_server_abort(self):
-        server_stream_aborted = False
-        fut1 = self.loop.create_future()
-        fut2 = self.loop.create_future()
-
-        async def handle_client(stream):
-            data = await stream.readexactly(4)
-            self.assertEqual(b'data', data)
-            fut1.set_result(None)
-            await fut2
-            self.assertEqual(b'', await stream.readline())
-            nonlocal server_stream_aborted
-            server_stream_aborted = True
-
-        async def client(srv):
-            addr = srv.sockets[0].getsockname()
-            stream = await asyncio.connect(*addr)
-            await stream.write(b'data')
-            await fut2
-            self.assertEqual(b'', await stream.readline())
-            await stream.close()
-
-        async def test():
-            async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server:
-                await server.start_serving()
-                task = asyncio.create_task(client(server))
-                await fut1
-                fut2.set_result(None)
-                await server.abort()
-                await task
-
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-        self.loop.run_until_complete(asyncio.wait_for(test(), 60.0))
-        self.assertEqual(messages, [])
-        self.assertTrue(fut1.done())
-        self.assertTrue(fut2.done())
-        self.assertTrue(server_stream_aborted)
-
-    def test_stream_shutdown_hung_task(self):
-        fut1 = self.loop.create_future()
-        fut2 = self.loop.create_future()
-        cancelled = self.loop.create_future()
-
-        async def handle_client(stream):
-            data = await stream.readexactly(4)
-            self.assertEqual(b'data', data)
-            fut1.set_result(None)
-            await fut2
-            try:
-                while True:
-                    await asyncio.sleep(0.01)
-            except asyncio.CancelledError:
-                cancelled.set_result(None)
-                raise
-
-        async def client(srv):
-            addr = srv.sockets[0].getsockname()
-            stream = await asyncio.connect(*addr)
-            await stream.write(b'data')
-            await fut2
-            self.assertEqual(b'', await stream.readline())
-            await stream.close()
-
-        async def test():
-            async with asyncio.StreamServer(handle_client,
-                                            '127.0.0.1',
-                                            0,
-                                            shutdown_timeout=0.3) as server:
-                await server.start_serving()
-                task = asyncio.create_task(client(server))
-                await fut1
-                fut2.set_result(None)
-                await server.close()
-                await task
-                await cancelled
-
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-        self.loop.run_until_complete(asyncio.wait_for(test(), 60.0))
-        self.assertEqual(messages, [])
-        self.assertTrue(fut1.done())
-        self.assertTrue(fut2.done())
-        self.assertTrue(cancelled.done())
-
-    def test_stream_shutdown_hung_task_prevents_cancellation(self):
-        fut1 = self.loop.create_future()
-        fut2 = self.loop.create_future()
-        cancelled = self.loop.create_future()
-        do_handle_client = True
-
-        async def handle_client(stream):
-            data = await stream.readexactly(4)
-            self.assertEqual(b'data', data)
-            fut1.set_result(None)
-            await fut2
-            while do_handle_client:
-                with contextlib.suppress(asyncio.CancelledError):
-                    await asyncio.sleep(0.01)
-            cancelled.set_result(None)
-
-        async def client(srv):
-            addr = srv.sockets[0].getsockname()
-            stream = await asyncio.connect(*addr)
-            await stream.write(b'data')
-            await fut2
-            self.assertEqual(b'', await stream.readline())
-            await stream.close()
-
-        async def test():
-            async with asyncio.StreamServer(handle_client,
-                                            '127.0.0.1',
-                                            0,
-                                            shutdown_timeout=0.3) as server:
-                await server.start_serving()
-                task = asyncio.create_task(client(server))
-                await fut1
-                fut2.set_result(None)
-                await server.close()
-                nonlocal do_handle_client
-                do_handle_client = False
-                await task
-                await cancelled
-
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-        self.loop.run_until_complete(asyncio.wait_for(test(), 60.0))
-        self.assertEqual(1, len(messages))
-        self.assertRegex(messages[0]['message'],
-                         "<Task pending .+ ignored cancellation request")
-        self.assertTrue(fut1.done())
-        self.assertTrue(fut2.done())
-        self.assertTrue(cancelled.done())
-
-    def test_sendfile(self):
-        messages = []
-        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-
-        with open(support.TESTFN, 'wb') as fp:
-            fp.write(b'data\n')
-        self.addCleanup(support.unlink, support.TESTFN)
-
-        async def serve_callback(stream):
-            data = await stream.readline()
-            await stream.write(b'ack-' + data)
-            data = await stream.readline()
-            await stream.write(b'ack-' + data)
-            data = await stream.readline()
-            await stream.write(b'ack-' + data)
-            await stream.close()
-
-        async def do_connect(host, port):
-            stream = await asyncio.connect(host, port)
-            await stream.write(b'begin\n')
-            data = await stream.readline()
-            self.assertEqual(b'ack-begin\n', data)
-            with open(support.TESTFN, 'rb') as fp:
-                await stream.sendfile(fp)
-            data = await stream.readline()
-            self.assertEqual(b'ack-data\n', data)
-            await stream.write(b'end\n')
-            data = await stream.readline()
-            self.assertEqual(data, b'ack-end\n')
-            await stream.close()
-
-        async def test():
-            async with asyncio.StreamServer(serve_callback, '127.0.0.1', 0) as srv:
-                await srv.start_serving()
-                await do_connect(*srv.sockets[0].getsockname())
-
-        self.loop.run_until_complete(test())
-
-        self.assertEqual([], messages)
-
-
-    @unittest.skipIf(ssl is None, 'No ssl module')
-    def test_connect_start_tls(self):
-        with test_utils.run_test_server(use_ssl=True) as httpd:
-            # connect without SSL but upgrade to TLS just after
-            # connection is established
-            stream = self.loop.run_until_complete(
-                asyncio.connect(*httpd.address))
-
-            self.loop.run_until_complete(
-                stream.start_tls(
-                    sslcontext=test_utils.dummy_ssl_context()))
-            self._basetest_connect(stream)
-
-    def test_repr_unbound(self):
-        async def serve(stream):
-            pass
-
-        async def test():
-            srv = asyncio.StreamServer(serve)
-            self.assertEqual('<StreamServer>', repr(srv))
-            await srv.close()
-
-        self.loop.run_until_complete(test())
-
-    def test_repr_bound(self):
-        async def serve(stream):
-            pass
-
-        async def test():
-            srv = asyncio.StreamServer(serve, '127.0.0.1', 0)
-            await srv.bind()
-            self.assertRegex(repr(srv), r'<StreamServer sockets=\(.+\)>')
-            await srv.close()
-
-        self.loop.run_until_complete(test())
-
-    def test_repr_serving(self):
-        async def serve(stream):
-            pass
-
-        async def test():
-            srv = asyncio.StreamServer(serve, '127.0.0.1', 0)
-            await srv.start_serving()
-            self.assertRegex(repr(srv), r'<StreamServer serving sockets=\(.+\)>')
-            await srv.close()
-
-        self.loop.run_until_complete(test())
-
-
-    @unittest.skipUnless(sys.platform != 'win32',
-                         "Don't support pipes for Windows")
-    def test_read_pipe(self):
-        async def test():
-            rpipe, wpipe = os.pipe()
-            pipeobj = io.open(rpipe, 'rb', 1024)
-
-            async with asyncio.connect_read_pipe(pipeobj) as stream:
-                self.assertEqual(stream.mode, asyncio.StreamMode.READ)
-
-                os.write(wpipe, b'1')
-                data = await stream.readexactly(1)
-                self.assertEqual(data, b'1')
-
-                os.write(wpipe, b'2345')
-                data = await stream.readexactly(4)
-                self.assertEqual(data, b'2345')
-                os.close(wpipe)
-
-        self.loop.run_until_complete(test())
-
-    @unittest.skipUnless(sys.platform != 'win32',
-                         "Don't support pipes for Windows")
-    def test_write_pipe(self):
-        async def test():
-            rpipe, wpipe = os.pipe()
-            pipeobj = io.open(wpipe, 'wb', 1024)
-
-            async with asyncio.connect_write_pipe(pipeobj) as stream:
-                self.assertEqual(stream.mode, asyncio.StreamMode.WRITE)
-
-                await stream.write(b'1')
-                data = os.read(rpipe, 1024)
-                self.assertEqual(data, b'1')
-
-                await stream.write(b'2345')
-                data = os.read(rpipe, 1024)
-                self.assertEqual(data, b'2345')
-
-                os.close(rpipe)
-
-        self.loop.run_until_complete(test())
-
-    def test_stream_ctor_forbidden(self):
-        with self.assertRaisesRegex(RuntimeError,
-                                    "should be instantiated "
-                                    "by asyncio internals only"):
-            asyncio.Stream(asyncio.StreamMode.READWRITE)
-
-    def test_deprecated_methods(self):
-        async def f():
-            return asyncio.Stream(mode=asyncio.StreamMode.READWRITE,
-                                  _asyncio_internal=True)
-
-        stream = self.loop.run_until_complete(f())
-
-        tr = mock.Mock()
-
-        with self.assertWarns(DeprecationWarning):
-            stream.set_transport(tr)
-
-        with self.assertWarns(DeprecationWarning):
-            stream.transport is tr
-
-        with self.assertWarns(DeprecationWarning):
-            stream.feed_data(b'data')
-
-        with self.assertWarns(DeprecationWarning):
-            stream.feed_eof()
-
-        with self.assertWarns(DeprecationWarning):
-            stream.set_exception(ConnectionResetError("test"))
-
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index 3ad18e5c5166..fe8cfa61b1b2 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -582,18 +582,6 @@ def test_read_stdout_after_process_exit(self):
 
         self.loop.run_until_complete(execute())
 
-    def test_subprocess_protocol_create_warning(self):
-        with self.assertWarns(DeprecationWarning):
-            subprocess.SubprocessStreamProtocol(limit=10, loop=self.loop)
-
-    def test_process_create_warning(self):
-        proto = subprocess.SubprocessStreamProtocol(limit=10, loop=self.loop,
-                                                    _asyncio_internal=True)
-        transp = mock.Mock()
-
-        with self.assertWarns(DeprecationWarning):
-            subprocess.Process(transp, proto, loop=self.loop)
-
     def test_create_subprocess_exec_text_mode_fails(self):
         async def execute():
             with self.assertRaises(ValueError):
diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py
index d0ba19391fa0..9ed10fc20f81 100644
--- a/Lib/test/test_asyncio/test_windows_events.py
+++ b/Lib/test/test_asyncio/test_windows_events.py
@@ -15,7 +15,6 @@
 
 import asyncio
 from asyncio import windows_events
-from asyncio.streams import _StreamProtocol
 from test.test_asyncio import utils as test_utils
 
 
@@ -118,16 +117,14 @@ def test_pipe(self):
 
         clients = []
         for i in range(5):
-            stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
-                                    loop=self.loop, _asyncio_internal=True)
-            protocol = _StreamProtocol(stream,
-                                       loop=self.loop,
-                                       _asyncio_internal=True)
+            stream_reader = asyncio.StreamReader(loop=self.loop)
+            protocol = asyncio.StreamReaderProtocol(stream_reader,
+                                                    loop=self.loop)
             trans, proto = await self.loop.create_pipe_connection(
                 lambda: protocol, ADDRESS)
             self.assertIsInstance(trans, asyncio.Transport)
             self.assertEqual(protocol, proto)
-            clients.append((stream, trans))
+            clients.append((stream_reader, trans))
 
         for i, (r, w) in enumerate(clients):
             w.write('lower-{}\n'.format(i).encode())
@@ -136,7 +133,6 @@ def test_pipe(self):
             response = await r.readline()
             self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
             w.close()
-            await r.close()
 
         server.close()
 
diff --git a/Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst b/Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst
new file mode 100644
index 000000000000..be9da891b866
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst
@@ -0,0 +1 @@
+Revert the new asyncio Streams API



More information about the Python-checkins mailing list