[Python-checkins] cpython (3.4): Issue #23198: Reactor asyncio.StreamReader

victor.stinner python-checkins at python.org
Wed Jan 14 00:54:45 CET 2015


https://hg.python.org/cpython/rev/94a6f9a3580e
changeset:   94131:94a6f9a3580e
branch:      3.4
parent:      94128:432b817611f2
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Wed Jan 14 00:53:37 2015 +0100
summary:
  Issue #23198: Reactor asyncio.StreamReader

- Add a new _wakeup_waiter() method
- Replace _create_waiter() method with a _wait_for_data() coroutine function
- Use the value None instead of True or False to wake up the waiter

files:
  Lib/asyncio/streams.py |  47 ++++++++++++++---------------
  1 files changed, 22 insertions(+), 25 deletions(-)


diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -313,8 +313,8 @@
         else:
             self._loop = loop
         self._buffer = bytearray()
-        self._eof = False  # Whether we're done.
-        self._waiter = None  # A future.
+        self._eof = False    # Whether we're done.
+        self._waiter = None  # A future used by _wait_for_data()
         self._exception = None
         self._transport = None
         self._paused = False
@@ -331,6 +331,14 @@
             if not waiter.cancelled():
                 waiter.set_exception(exc)
 
+    def _wakeup_waiter(self):
+        """Wakeup read() or readline() function waiting for data or EOF."""
+        waiter = self._waiter
+        if waiter is not None:
+            self._waiter = None
+            if not waiter.cancelled():
+                waiter.set_result(None)
+
     def set_transport(self, transport):
         assert self._transport is None, 'Transport already set'
         self._transport = transport
@@ -342,11 +350,7 @@
 
     def feed_eof(self):
         self._eof = True
-        waiter = self._waiter
-        if waiter is not None:
-            self._waiter = None
-            if not waiter.cancelled():
-                waiter.set_result(True)
+        self._wakeup_waiter()
 
     def at_eof(self):
         """Return True if the buffer is empty and 'feed_eof' was called."""
@@ -359,12 +363,7 @@
             return
 
         self._buffer.extend(data)
-
-        waiter = self._waiter
-        if waiter is not None:
-            self._waiter = None
-            if not waiter.cancelled():
-                waiter.set_result(False)
+        self._wakeup_waiter()
 
         if (self._transport is not None and
             not self._paused and
@@ -379,7 +378,8 @@
             else:
                 self._paused = True
 
-    def _create_waiter(self, func_name):
+    def _wait_for_data(self, func_name):
+        """Wait until feed_data() or feed_eof() is called."""
         # StreamReader uses a future to link the protocol feed_data() method
         # to a read coroutine. Running two read coroutines at the same time
         # would have an unexpected behaviour. It would not possible to know
@@ -387,7 +387,12 @@
         if self._waiter is not None:
             raise RuntimeError('%s() called while another coroutine is '
                                'already waiting for incoming data' % func_name)
-        return futures.Future(loop=self._loop)
+
+        self._waiter = futures.Future(loop=self._loop)
+        try:
+            yield from self._waiter
+        finally:
+            self._waiter = None
 
     @coroutine
     def readline(self):
@@ -417,11 +422,7 @@
                 break
 
             if not_enough:
-                self._waiter = self._create_waiter('readline')
-                try:
-                    yield from self._waiter
-                finally:
-                    self._waiter = None
+                yield from self._wait_for_data('readline')
 
         self._maybe_resume_transport()
         return bytes(line)
@@ -448,11 +449,7 @@
             return b''.join(blocks)
         else:
             if not self._buffer and not self._eof:
-                self._waiter = self._create_waiter('read')
-                try:
-                    yield from self._waiter
-                finally:
-                    self._waiter = None
+                yield from self._wait_for_data('read')
 
         if n < 0 or len(self._buffer) <= n:
             data = bytes(self._buffer)

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list