[Python-checkins] cpython: asyncio.transports: Make _ProactorBasePipeTransport use _FlowControlMixin

yury.selivanov python-checkins at python.org
Wed Feb 19 00:42:00 CET 2014


http://hg.python.org/cpython/rev/72dba0d11235
changeset:   89263:72dba0d11235
user:        Yury Selivanov <yselivanov at sprymix.com>
date:        Tue Feb 18 18:41:13 2014 -0500
summary:
  asyncio.transports: Make _ProactorBasePipeTransport use _FlowControlMixin

files:
  Lib/asyncio/proactor_events.py |  55 +---------------
  Lib/asyncio/selector_events.py |  73 +---------------------
  Lib/asyncio/transports.py      |  70 +++++++++++++++++++++
  Lib/asyncio/unix_events.py     |   2 +-
  4 files changed, 75 insertions(+), 125 deletions(-)


diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -15,7 +15,8 @@
 from .log import logger
 
 
-class _ProactorBasePipeTransport(transports.BaseTransport):
+class _ProactorBasePipeTransport(transports._FlowControlMixin,
+                                 transports.BaseTransport):
     """Base class for pipe and socket transports."""
 
     def __init__(self, loop, sock, protocol, waiter=None,
@@ -33,8 +34,6 @@
         self._conn_lost = 0
         self._closing = False  # Set when close() called.
         self._eof_written = False
-        self._protocol_paused = False
-        self.set_write_buffer_limits()
         if self._server is not None:
             self._server.attach(self)
         self._loop.call_soon(self._protocol.connection_made, self)
@@ -94,56 +93,6 @@
                 server.detach(self)
                 self._server = None
 
-    # XXX The next four methods are nearly identical to corresponding
-    # ones in _SelectorTransport.  Maybe refactor buffer management to
-    # share the implementations?  (Also these are really only needed
-    # by _ProactorWritePipeTransport but since _buffer is defined on
-    # the base class I am putting it here for now.)
-
-    def _maybe_pause_protocol(self):
-        size = self.get_write_buffer_size()
-        if size <= self._high_water:
-            return
-        if not self._protocol_paused:
-            self._protocol_paused = True
-            try:
-                self._protocol.pause_writing()
-            except Exception as exc:
-                self._loop.call_exception_handler({
-                    'message': 'protocol.pause_writing() failed',
-                    'exception': exc,
-                    'transport': self,
-                    'protocol': self._protocol,
-                })
-
-    def _maybe_resume_protocol(self):
-        if (self._protocol_paused and
-            self.get_write_buffer_size() <= self._low_water):
-            self._protocol_paused = False
-            try:
-                self._protocol.resume_writing()
-            except Exception as exc:
-                self._loop.call_exception_handler({
-                    'message': 'protocol.resume_writing() failed',
-                    'exception': exc,
-                    'transport': self,
-                    'protocol': self._protocol,
-                })
-
-    def set_write_buffer_limits(self, high=None, low=None):
-        if high is None:
-            if low is None:
-                high = 64*1024
-            else:
-                high = 4*low
-        if low is None:
-            low = high // 4
-        if not high >= low >= 0:
-            raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
-                             (high, low))
-        self._high_water = high
-        self._low_water = low
-
     def get_write_buffer_size(self):
         size = self._pending_write
         if self._buffer is not None:
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -338,77 +338,8 @@
         sock.close()
 
 
-class _FlowControlMixin(transports.Transport):
-    """All the logic for (write) flow control in a mix-in base class.
-
-    The subclass must implement get_write_buffer_size().  It must call
-    _maybe_pause_protocol() whenever the write buffer size increases,
-    and _maybe_resume_protocol() whenever it decreases.  It may also
-    override set_write_buffer_limits() (e.g. to specify different
-    defaults).
-
-    The subclass constructor must call super().__init__(extra).  This
-    will call set_write_buffer_limits().
-
-    The user may call set_write_buffer_limits() and
-    get_write_buffer_size(), and their protocol's pause_writing() and
-    resume_writing() may be called.
-    """
-
-    def __init__(self, extra=None):
-        super().__init__(extra)
-        self._protocol_paused = False
-        self.set_write_buffer_limits()
-
-    def _maybe_pause_protocol(self):
-        size = self.get_write_buffer_size()
-        if size <= self._high_water:
-            return
-        if not self._protocol_paused:
-            self._protocol_paused = True
-            try:
-                self._protocol.pause_writing()
-            except Exception as exc:
-                self._loop.call_exception_handler({
-                    'message': 'protocol.pause_writing() failed',
-                    'exception': exc,
-                    'transport': self,
-                    'protocol': self._protocol,
-                })
-
-    def _maybe_resume_protocol(self):
-        if (self._protocol_paused and
-            self.get_write_buffer_size() <= self._low_water):
-            self._protocol_paused = False
-            try:
-                self._protocol.resume_writing()
-            except Exception as exc:
-                self._loop.call_exception_handler({
-                    'message': 'protocol.resume_writing() failed',
-                    'exception': exc,
-                    'transport': self,
-                    'protocol': self._protocol,
-                })
-
-    def set_write_buffer_limits(self, high=None, low=None):
-        if high is None:
-            if low is None:
-                high = 64*1024
-            else:
-                high = 4*low
-        if low is None:
-            low = high // 4
-        if not high >= low >= 0:
-            raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
-                             (high, low))
-        self._high_water = high
-        self._low_water = low
-
-    def get_write_buffer_size(self):
-        raise NotImplementedError
-
-
-class _SelectorTransport(_FlowControlMixin, transports.Transport):
+class _SelectorTransport(transports._FlowControlMixin,
+                         transports.Transport):
 
     max_size = 256 * 1024  # Buffer size passed to recv().
 
diff --git a/Lib/asyncio/transports.py b/Lib/asyncio/transports.py
--- a/Lib/asyncio/transports.py
+++ b/Lib/asyncio/transports.py
@@ -219,3 +219,73 @@
         http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
         """
         raise NotImplementedError
+
+
+class _FlowControlMixin(Transport):
+    """All the logic for (write) flow control in a mix-in base class.
+
+    The subclass must implement get_write_buffer_size().  It must call
+    _maybe_pause_protocol() whenever the write buffer size increases,
+    and _maybe_resume_protocol() whenever it decreases.  It may also
+    override set_write_buffer_limits() (e.g. to specify different
+    defaults).
+
+    The subclass constructor must call super().__init__(extra).  This
+    will call set_write_buffer_limits().
+
+    The user may call set_write_buffer_limits() and
+    get_write_buffer_size(), and their protocol's pause_writing() and
+    resume_writing() may be called.
+    """
+
+    def __init__(self, extra=None):
+        super().__init__(extra)
+        self._protocol_paused = False
+        self.set_write_buffer_limits()
+
+    def _maybe_pause_protocol(self):
+        size = self.get_write_buffer_size()
+        if size <= self._high_water:
+            return
+        if not self._protocol_paused:
+            self._protocol_paused = True
+            try:
+                self._protocol.pause_writing()
+            except Exception as exc:
+                self._loop.call_exception_handler({
+                    'message': 'protocol.pause_writing() failed',
+                    'exception': exc,
+                    'transport': self,
+                    'protocol': self._protocol,
+                })
+
+    def _maybe_resume_protocol(self):
+        if (self._protocol_paused and
+            self.get_write_buffer_size() <= self._low_water):
+            self._protocol_paused = False
+            try:
+                self._protocol.resume_writing()
+            except Exception as exc:
+                self._loop.call_exception_handler({
+                    'message': 'protocol.resume_writing() failed',
+                    'exception': exc,
+                    'transport': self,
+                    'protocol': self._protocol,
+                })
+
+    def set_write_buffer_limits(self, high=None, low=None):
+        if high is None:
+            if low is None:
+                high = 64*1024
+            else:
+                high = 4*low
+        if low is None:
+            low = high // 4
+        if not high >= low >= 0:
+            raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
+                             (high, low))
+        self._high_water = high
+        self._low_water = low
+
+    def get_write_buffer_size(self):
+        raise NotImplementedError
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -317,7 +317,7 @@
             self._loop = None
 
 
-class _UnixWritePipeTransport(selector_events._FlowControlMixin,
+class _UnixWritePipeTransport(transports._FlowControlMixin,
                               transports.WriteTransport):
 
     def __init__(self, loop, pipe, protocol, waiter=None, extra=None):

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list