[Python-checkins] cpython (3.4): asyncio, tulip issue 202: Add unit test of pause/resume writing for proactor

victor.stinner python-checkins at python.org
Thu Dec 11 22:27:11 CET 2014


https://hg.python.org/cpython/rev/8f46f0af54cb
changeset:   93833:8f46f0af54cb
branch:      3.4
parent:      93831:daec40891d43
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Thu Dec 11 22:23:19 2014 +0100
summary:
  asyncio, tulip issue 202: Add unit test of pause/resume writing for proactor
socket transport

files:
  Lib/asyncio/proactor_events.py                |   4 -
  Lib/test/test_asyncio/test_proactor_events.py |  82 ++++++++++
  2 files changed, 82 insertions(+), 4 deletions(-)


diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -230,10 +230,6 @@
             assert self._buffer is None
             # Pass a copy, except if it's already immutable.
             self._loop_writing(data=bytes(data))
-            # XXX Should we pause the protocol at this point
-            # if len(data) > self._high_water?  (That would
-            # require keeping track of the number of bytes passed
-            # to a send() that hasn't finished yet.)
         elif not self._buffer:  # WRITING -> BACKED UP
             # Make a mutable copy which we can extend.
             self._buffer = bytearray(data)
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -343,6 +343,88 @@
         tr.close()
 
 
+    def pause_writing_transport(self, high):
+        tr = _ProactorSocketTransport(
+            self.loop, self.sock, self.protocol)
+        self.addCleanup(tr.close)
+
+        tr.set_write_buffer_limits(high=high)
+
+        self.assertEqual(tr.get_write_buffer_size(), 0)
+        self.assertFalse(self.protocol.pause_writing.called)
+        self.assertFalse(self.protocol.resume_writing.called)
+        return tr
+
+    def test_pause_resume_writing(self):
+        tr = self.pause_writing_transport(high=4)
+
+        # write a large chunk, must pause writing
+        fut = asyncio.Future(loop=self.loop)
+        self.loop._proactor.send.return_value = fut
+        tr.write(b'large data')
+        self.loop._run_once()
+        self.assertTrue(self.protocol.pause_writing.called)
+
+        # flush the buffer
+        fut.set_result(None)
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 0)
+        self.assertTrue(self.protocol.resume_writing.called)
+
+    def test_pause_writing_2write(self):
+        tr = self.pause_writing_transport(high=4)
+
+        # first short write, the buffer is not full (3 <= 4)
+        fut1 = asyncio.Future(loop=self.loop)
+        self.loop._proactor.send.return_value = fut1
+        tr.write(b'123')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 3)
+        self.assertFalse(self.protocol.pause_writing.called)
+
+        # fill the buffer, must pause writing (6 > 4)
+        tr.write(b'abc')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 6)
+        self.assertTrue(self.protocol.pause_writing.called)
+
+    def test_pause_writing_3write(self):
+        tr = self.pause_writing_transport(high=4)
+
+        # first short write, the buffer is not full (1 <= 4)
+        fut = asyncio.Future(loop=self.loop)
+        self.loop._proactor.send.return_value = fut
+        tr.write(b'1')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 1)
+        self.assertFalse(self.protocol.pause_writing.called)
+
+        # second short write, the buffer is not full (3 <= 4)
+        tr.write(b'23')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 3)
+        self.assertFalse(self.protocol.pause_writing.called)
+
+        # fill the buffer, must pause writing (6 > 4)
+        tr.write(b'abc')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 6)
+        self.assertTrue(self.protocol.pause_writing.called)
+
+    def test_dont_pause_writing(self):
+        tr = self.pause_writing_transport(high=4)
+
+        # write a large chunk which completes immedialty,
+        # it should not pause writing
+        fut = asyncio.Future(loop=self.loop)
+        fut.set_result(None)
+        self.loop._proactor.send.return_value = fut
+        tr.write(b'very large data')
+        self.loop._run_once()
+        self.assertEqual(tr.get_write_buffer_size(), 0)
+        self.assertFalse(self.protocol.pause_writing.called)
+
+
 class BaseProactorEventLoopTests(test_utils.TestCase):
 
     def setUp(self):

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list