[Python-checkins] cpython (merge 3.4 -> 3.5): merge 3.4

andrew.svetlov python-checkins at python.org
Mon Jan 11 02:13:20 EST 2016


https://hg.python.org/cpython/rev/b2fac2ff7b01
changeset:   99832:b2fac2ff7b01
branch:      3.5
parent:      99829:51cc7adac78d
parent:      99831:f4fe55dd5659
user:        Andrew Svetlov <andrew.svetlov at gmail.com>
date:        Mon Jan 11 09:09:10 2016 +0200
summary:
  merge 3.4

files:
  Lib/asyncio/coroutines.py           |   17 +-
  Lib/asyncio/tasks.py                |   51 ++++
  Lib/test/test_asyncio/test_tasks.py |  169 ++++++++++++++++
  3 files changed, 230 insertions(+), 7 deletions(-)


diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py
--- a/Lib/asyncio/coroutines.py
+++ b/Lib/asyncio/coroutines.py
@@ -27,8 +27,8 @@
 # before you define your coroutines.  A downside of using this feature
 # is that tracebacks show entries for the CoroWrapper.__next__ method
 # when _DEBUG is true.
-_DEBUG = (not sys.flags.ignore_environment
-          and bool(os.environ.get('PYTHONASYNCIODEBUG')))
+_DEBUG = (not sys.flags.ignore_environment and
+          bool(os.environ.get('PYTHONASYNCIODEBUG')))
 
 
 try:
@@ -86,7 +86,7 @@
     def __init__(self, gen, func=None):
         assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen
         self.gen = gen
-        self.func = func # Used to unwrap @coroutine decorator
+        self.func = func  # Used to unwrap @coroutine decorator
         self._source_traceback = traceback.extract_stack(sys._getframe(1))
         self.__name__ = getattr(gen, '__name__', None)
         self.__qualname__ = getattr(gen, '__qualname__', None)
@@ -283,10 +283,13 @@
         coro_frame = coro.cr_frame
 
     filename = coro_code.co_filename
-    if (isinstance(coro, CoroWrapper)
-    and not inspect.isgeneratorfunction(coro.func)
-    and coro.func is not None):
-        filename, lineno = events._get_function_source(coro.func)
+    lineno = 0
+    if (isinstance(coro, CoroWrapper) and
+            not inspect.isgeneratorfunction(coro.func) and
+            coro.func is not None):
+        source = events._get_function_source(coro.func)
+        if source is not None:
+            filename, lineno = source
         if coro_frame is None:
             coro_repr = ('%s done, defined at %s:%s'
                          % (coro_name, filename, lineno))
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -4,6 +4,7 @@
            'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
            'wait', 'wait_for', 'as_completed', 'sleep', 'async',
            'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
+           'timeout',
            ]
 
 import concurrent.futures
@@ -732,3 +733,53 @@
 
     loop.call_soon_threadsafe(callback)
     return future
+
+
+def timeout(timeout, *, loop=None):
+    """A factory which produce a context manager with timeout.
+
+    Useful in cases when you want to apply timeout logic around block
+    of code or in cases when asyncio.wait_for is not suitable.
+
+    For example:
+
+    >>> with asyncio.timeout(0.001):
+    >>>     yield from coro()
+
+
+    timeout: timeout value in seconds
+    loop: asyncio compatible event loop
+    """
+    if loop is None:
+        loop = events.get_event_loop()
+    return _Timeout(timeout, loop=loop)
+
+
+class _Timeout:
+    def __init__(self, timeout, *, loop):
+        self._timeout = timeout
+        self._loop = loop
+        self._task = None
+        self._cancelled = False
+        self._cancel_handler = None
+
+    def __enter__(self):
+        self._task = Task.current_task(loop=self._loop)
+        if self._task is None:
+            raise RuntimeError('Timeout context manager should be used '
+                               'inside a task')
+        self._cancel_handler = self._loop.call_later(
+            self._timeout, self._cancel_task)
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        if exc_type is futures.CancelledError and self._cancelled:
+            self._cancel_handler = None
+            self._task = None
+            raise futures.TimeoutError
+        self._cancel_handler.cancel()
+        self._cancel_handler = None
+        self._task = None
+
+    def _cancel_task(self):
+        self._cancelled = self._task.cancel()
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -6,6 +6,7 @@
 import os
 import re
 import sys
+import time
 import types
 import unittest
 import weakref
@@ -2235,5 +2236,173 @@
         self.assertEqual(result, 11)
 
 
+class TimeoutTests(test_utils.TestCase):
+    def setUp(self):
+        self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(None)
+
+    def tearDown(self):
+        self.loop.close()
+        self.loop = None
+
+    def test_timeout(self):
+        canceled_raised = [False]
+
+        @asyncio.coroutine
+        def long_running_task():
+            try:
+                yield from asyncio.sleep(10, loop=self.loop)
+            except asyncio.CancelledError:
+                canceled_raised[0] = True
+                raise
+
+        @asyncio.coroutine
+        def go():
+            with self.assertRaises(asyncio.TimeoutError):
+                with asyncio.timeout(0.01, loop=self.loop) as t:
+                    yield from long_running_task()
+                    self.assertIs(t._loop, self.loop)
+
+        self.loop.run_until_complete(go())
+        self.assertTrue(canceled_raised[0], 'CancelledError was not raised')
+
+    def test_timeout_finish_in_time(self):
+        @asyncio.coroutine
+        def long_running_task():
+            yield from asyncio.sleep(0.01, loop=self.loop)
+            return 'done'
+
+        @asyncio.coroutine
+        def go():
+            with asyncio.timeout(0.1, loop=self.loop):
+                resp = yield from long_running_task()
+            self.assertEqual(resp, 'done')
+
+        self.loop.run_until_complete(go())
+
+    def test_timeout_gloabal_loop(self):
+        asyncio.set_event_loop(self.loop)
+
+        @asyncio.coroutine
+        def run():
+            with asyncio.timeout(0.1) as t:
+                yield from asyncio.sleep(0.01)
+                self.assertIs(t._loop, self.loop)
+
+        self.loop.run_until_complete(run())
+
+    def test_timeout_not_relevant_exception(self):
+        @asyncio.coroutine
+        def go():
+            yield from asyncio.sleep(0, loop=self.loop)
+            with self.assertRaises(KeyError):
+                with asyncio.timeout(0.1, loop=self.loop):
+                    raise KeyError
+
+        self.loop.run_until_complete(go())
+
+    def test_timeout_canceled_error_is_converted_to_timeout(self):
+        @asyncio.coroutine
+        def go():
+            yield from asyncio.sleep(0, loop=self.loop)
+            with self.assertRaises(asyncio.CancelledError):
+                with asyncio.timeout(0.001, loop=self.loop):
+                    raise asyncio.CancelledError
+
+        self.loop.run_until_complete(go())
+
+    def test_timeout_blocking_loop(self):
+        @asyncio.coroutine
+        def long_running_task():
+            time.sleep(0.05)
+            return 'done'
+
+        @asyncio.coroutine
+        def go():
+            with asyncio.timeout(0.01, loop=self.loop):
+                result = yield from long_running_task()
+            self.assertEqual(result, 'done')
+
+        self.loop.run_until_complete(go())
+
+    def test_for_race_conditions(self):
+        fut = asyncio.Future(loop=self.loop)
+        self.loop.call_later(0.1, fut.set_result('done'))
+
+        @asyncio.coroutine
+        def go():
+            with asyncio.timeout(0.2, loop=self.loop):
+                resp = yield from fut
+            self.assertEqual(resp, 'done')
+
+        self.loop.run_until_complete(go())
+
+    def test_timeout_time(self):
+        @asyncio.coroutine
+        def go():
+            foo_running = None
+
+            start = self.loop.time()
+            with self.assertRaises(asyncio.TimeoutError):
+                with asyncio.timeout(0.1, loop=self.loop):
+                    foo_running = True
+                    try:
+                        yield from asyncio.sleep(0.2, loop=self.loop)
+                    finally:
+                        foo_running = False
+
+            dt = self.loop.time() - start
+            self.assertTrue(0.09 < dt < 0.11, dt)
+            self.assertFalse(foo_running)
+
+        self.loop.run_until_complete(go())
+
+    def test_raise_runtimeerror_if_no_task(self):
+        with self.assertRaises(RuntimeError):
+            with asyncio.timeout(0.1, loop=self.loop):
+                pass
+
+    def test_outer_coro_is_not_cancelled(self):
+
+        has_timeout = [False]
+
+        @asyncio.coroutine
+        def outer():
+            try:
+                with asyncio.timeout(0.001, loop=self.loop):
+                    yield from asyncio.sleep(1, loop=self.loop)
+            except asyncio.TimeoutError:
+                has_timeout[0] = True
+
+        @asyncio.coroutine
+        def go():
+            task = asyncio.ensure_future(outer(), loop=self.loop)
+            yield from task
+            self.assertTrue(has_timeout[0])
+            self.assertFalse(task.cancelled())
+            self.assertTrue(task.done())
+
+        self.loop.run_until_complete(go())
+
+    def test_cancel_outer_coro(self):
+        fut = asyncio.Future(loop=self.loop)
+
+        @asyncio.coroutine
+        def outer():
+            fut.set_result(None)
+            yield from asyncio.sleep(1, loop=self.loop)
+
+        @asyncio.coroutine
+        def go():
+            task = asyncio.ensure_future(outer(), loop=self.loop)
+            yield from fut
+            task.cancel()
+            with self.assertRaises(asyncio.CancelledError):
+                yield from task
+            self.assertTrue(task.cancelled())
+            self.assertTrue(task.done())
+
+        self.loop.run_until_complete(go())
+
 if __name__ == '__main__':
     unittest.main()

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list