[Python-checkins] cpython: asyncio: Refactor drain logic in streams.py to be reusable.

guido.van.rossum python-checkins at python.org
Wed Jan 29 23:41:56 CET 2014


http://hg.python.org/cpython/rev/472376125c8f
changeset:   88823:472376125c8f
user:        Guido van Rossum <guido at python.org>
date:        Wed Jan 29 14:24:45 2014 -0800
summary:
  asyncio: Refactor drain logic in streams.py to be reusable.

files:
  Lib/asyncio/streams.py |  97 ++++++++++++++++++-----------
  1 files changed, 61 insertions(+), 36 deletions(-)


diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -94,8 +94,63 @@
     return (yield from loop.create_server(factory, host, port, **kwds))
 
 
-class StreamReaderProtocol(protocols.Protocol):
-    """Trivial helper class to adapt between Protocol and StreamReader.
+class FlowControlMixin(protocols.Protocol):
+    """Reusable flow control logic for StreamWriter.drain().
+
+    This implements the protocol methods pause_writing(),
+    resume_reading() and connection_lost().  If the subclass overrides
+    these it must call the super methods.
+
+    StreamWriter.drain() must check for error conditions and then call
+    _make_drain_waiter(), which will return either () or a Future
+    depending on the paused state.
+    """
+
+    def __init__(self, loop=None):
+        self._loop = loop  # May be None; we may never need it.
+        self._paused = False
+        self._drain_waiter = None
+
+    def pause_writing(self):
+        assert not self._paused
+        self._paused = True
+
+    def resume_writing(self):
+        assert self._paused
+        self._paused = False
+        waiter = self._drain_waiter
+        if waiter is not None:
+            self._drain_waiter = None
+            if not waiter.done():
+                waiter.set_result(None)
+
+    def connection_lost(self, exc):
+        # Wake up the writer if currently paused.
+        if not self._paused:
+            return
+        waiter = self._drain_waiter
+        if waiter is None:
+            return
+        self._drain_waiter = None
+        if waiter.done():
+            return
+        if exc is None:
+            waiter.set_result(None)
+        else:
+            waiter.set_exception(exc)
+
+    def _make_drain_waiter(self):
+        if not self._paused:
+            return ()
+        waiter = self._drain_waiter
+        assert waiter is None or waiter.cancelled()
+        waiter = futures.Future(loop=self._loop)
+        self._drain_waiter = waiter
+        return waiter
+
+
+class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
+    """Helper class to adapt between Protocol and StreamReader.
 
     (This is a helper class instead of making StreamReader itself a
     Protocol subclass, because the StreamReader has other potential
@@ -104,12 +159,10 @@
     """
 
     def __init__(self, stream_reader, client_connected_cb=None, loop=None):
+        super().__init__(loop=loop)
         self._stream_reader = stream_reader
         self._stream_writer = None
-        self._drain_waiter = None
-        self._paused = False
         self._client_connected_cb = client_connected_cb
-        self._loop = loop  # May be None; we may never need it.
 
     def connection_made(self, transport):
         self._stream_reader.set_transport(transport)
@@ -127,16 +180,7 @@
             self._stream_reader.feed_eof()
         else:
             self._stream_reader.set_exception(exc)
-        # Also wake up the writing side.
-        if self._paused:
-            waiter = self._drain_waiter
-            if waiter is not None:
-                self._drain_waiter = None
-                if not waiter.done():
-                    if exc is None:
-                        waiter.set_result(None)
-                    else:
-                        waiter.set_exception(exc)
+        super().connection_lost(exc)
 
     def data_received(self, data):
         self._stream_reader.feed_data(data)
@@ -144,19 +188,6 @@
     def eof_received(self):
         self._stream_reader.feed_eof()
 
-    def pause_writing(self):
-        assert not self._paused
-        self._paused = True
-
-    def resume_writing(self):
-        assert self._paused
-        self._paused = False
-        waiter = self._drain_waiter
-        if waiter is not None:
-            self._drain_waiter = None
-            if not waiter.done():
-                waiter.set_result(None)
-
 
 class StreamWriter:
     """Wraps a Transport.
@@ -211,17 +242,11 @@
         completed, which will happen when the buffer is (partially)
         drained and the protocol is resumed.
         """
-        if self._reader._exception is not None:
+        if self._reader is not None and self._reader._exception is not None:
             raise self._reader._exception
         if self._transport._conn_lost:  # Uses private variable.
             raise ConnectionResetError('Connection lost')
-        if not self._protocol._paused:
-            return ()
-        waiter = self._protocol._drain_waiter
-        assert waiter is None or waiter.cancelled()
-        waiter = futures.Future(loop=self._loop)
-        self._protocol._drain_waiter = waiter
-        return waiter
+        return self._protocol._make_drain_waiter()
 
 
 class StreamReader:

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list