[Python-checkins] cpython (3.4): Issue #23812: Fix asyncio.Queue.get() to avoid loosing items on cancellation.

yury.selivanov python-checkins at python.org
Wed Aug 5 19:59:02 CEST 2015


https://hg.python.org/cpython/rev/7aa2d3e1c885
changeset:   97277:7aa2d3e1c885
branch:      3.4
parent:      97272:94e215a5e24b
user:        Yury Selivanov <yselivanov at sprymix.com>
date:        Wed Aug 05 13:52:33 2015 -0400
summary:
  Issue #23812: Fix asyncio.Queue.get() to avoid loosing items on cancellation.

Patch by Gustavo J. A. M. Carneiro.

files:
  Lib/asyncio/queues.py                |  47 +++++++++--
  Lib/test/test_asyncio/test_queues.py |  61 +++++++++++++++-
  Misc/NEWS                            |   3 +
  3 files changed, 101 insertions(+), 10 deletions(-)


diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -47,7 +47,7 @@
 
         # Futures.
         self._getters = collections.deque()
-        # Pairs of (item, Future).
+        # Futures
         self._putters = collections.deque()
         self._unfinished_tasks = 0
         self._finished = locks.Event(loop=self._loop)
@@ -98,7 +98,7 @@
 
     def _consume_done_putters(self):
         # Delete waiters at the head of the put() queue who've timed out.
-        while self._putters and self._putters[0][1].done():
+        while self._putters and self._putters[0].done():
             self._putters.popleft()
 
     def qsize(self):
@@ -148,8 +148,9 @@
         elif self._maxsize > 0 and self._maxsize <= self.qsize():
             waiter = futures.Future(loop=self._loop)
 
-            self._putters.append((item, waiter))
+            self._putters.append(waiter)
             yield from waiter
+            self._put(item)
 
         else:
             self.__put_internal(item)
@@ -186,8 +187,7 @@
         self._consume_done_putters()
         if self._putters:
             assert self.full(), 'queue not full, why are putters waiting?'
-            item, putter = self._putters.popleft()
-            self.__put_internal(item)
+            putter = self._putters.popleft()
 
             # When a getter runs and frees up a slot so this putter can
             # run, we need to defer the put for a tick to ensure that
@@ -201,9 +201,39 @@
             return self._get()
         else:
             waiter = futures.Future(loop=self._loop)
+            self._getters.append(waiter)
+            try:
+                return (yield from waiter)
+            except futures.CancelledError:
+                # if we get CancelledError, it means someone cancelled this
+                # get() coroutine.  But there is a chance that the waiter
+                # already is ready and contains an item that has just been
+                # removed from the queue.  In this case, we need to put the item
+                # back into the front of the queue.  This get() must either
+                # succeed without fault or, if it gets cancelled, it must be as
+                # if it never happened.
+                if waiter.done():
+                    self._put_it_back(waiter.result())
+                raise
 
-            self._getters.append(waiter)
-            return (yield from waiter)
+    def _put_it_back(self, item):
+        """
+        This is called when we have a waiter to get() an item and this waiter
+        gets cancelled.  In this case, we put the item back: wake up another
+        waiter or put it in the _queue.
+        """
+        self._consume_done_getters()
+        if self._getters:
+            assert not self._queue, (
+                'queue non-empty, why are getters waiting?')
+
+            getter = self._getters.popleft()
+            self._put_internal(item)
+
+            # getter cannot be cancelled, we just removed done getters
+            getter.set_result(item)
+        else:
+            self._queue.appendleft(item)
 
     def get_nowait(self):
         """Remove and return an item from the queue.
@@ -213,8 +243,7 @@
         self._consume_done_putters()
         if self._putters:
             assert self.full(), 'queue not full, why are putters waiting?'
-            item, putter = self._putters.popleft()
-            self.__put_internal(item)
+            putter = self._putters.popleft()
             # Wake putter on next tick.
 
             # getter cannot be cancelled, we just removed done putters
diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py
--- a/Lib/test/test_asyncio/test_queues.py
+++ b/Lib/test/test_asyncio/test_queues.py
@@ -171,7 +171,7 @@
         q.put_nowait(1)
 
         waiter = asyncio.Future(loop=self.loop)
-        q._putters.append((2, waiter))
+        q._putters.append(waiter)
 
         res = self.loop.run_until_complete(q.get())
         self.assertEqual(1, res)
@@ -322,6 +322,64 @@
         q.put_nowait(1)
         self.assertEqual(1, q.get_nowait())
 
+    def test_get_cancel_drop(self):
+        def gen():
+            yield 0.01
+            yield 0.1
+
+        loop = self.new_test_loop(gen)
+
+        q = asyncio.Queue(loop=loop)
+
+        reader = loop.create_task(q.get())
+
+        loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
+
+        q.put_nowait(1)
+        q.put_nowait(2)
+        reader.cancel()
+
+        try:
+            loop.run_until_complete(reader)
+        except asyncio.CancelledError:
+            # try again
+            reader = loop.create_task(q.get())
+            loop.run_until_complete(reader)
+
+        result = reader.result()
+        # if we get 2, it means 1 got dropped!
+        self.assertEqual(1, result)
+
+    def test_put_cancel_drop(self):
+
+        def gen():
+            yield 0.01
+            yield 0.1
+
+        loop = self.new_test_loop(gen)
+        q = asyncio.Queue(1, loop=loop)
+
+        q.put_nowait(1)
+
+        # putting a second item in the queue has to block (qsize=1)
+        writer = loop.create_task(q.put(2))
+        loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
+
+        value1 = q.get_nowait()
+        self.assertEqual(value1, 1)
+
+        writer.cancel()
+        try:
+            loop.run_until_complete(writer)
+        except asyncio.CancelledError:
+            # try again
+            writer = loop.create_task(q.put(2))
+            loop.run_until_complete(writer)
+
+        value2 = q.get_nowait()
+        self.assertEqual(value2, 2)
+        self.assertEqual(q.qsize(), 0)
+
     def test_nonblocking_put_exception(self):
         q = asyncio.Queue(maxsize=1, loop=self.loop)
         q.put_nowait(1)
@@ -374,6 +432,7 @@
         test_utils.run_briefly(self.loop)
         self.assertTrue(put_c.done())
         self.assertEqual(q.get_nowait(), 'a')
+        test_utils.run_briefly(self.loop)
         self.assertEqual(q.get_nowait(), 'b')
 
         self.loop.run_until_complete(put_b)
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -63,6 +63,9 @@
 
 - Issue #21354: PyCFunction_New function is exposed by python DLL again.
 
+- Issue #23812: Fix asyncio.Queue.get() to avoid loosing items on cancellation.
+  Patch by Gustavo J. A. M. Carneiro.
+
 Library
 -------
 

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list