[Python-checkins] cpython (3.4): asyncio: sync with Tulip, add a new asyncio.coroutines module

victor.stinner python-checkins at python.org
Sun Jun 29 00:53:10 CEST 2014


http://hg.python.org/cpython/rev/8734e881c400
changeset:   91461:8734e881c400
branch:      3.4
parent:      91459:6889fb276d87
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Sun Jun 29 00:46:45 2014 +0200
summary:
  asyncio: sync with Tulip, add a new asyncio.coroutines module

files:
  Lib/asyncio/__init__.py             |    4 +-
  Lib/asyncio/base_events.py          |   28 +-
  Lib/asyncio/base_subprocess.py      |    4 +-
  Lib/asyncio/tasks.py                |  633 +---------------
  Lib/asyncio/locks.py                |   12 +-
  Lib/asyncio/streams.py              |   18 +-
  Lib/asyncio/subprocess.py           |   15 +-
  Lib/asyncio/tasks.py                |  143 +---
  Lib/asyncio/test_utils.py           |    3 +-
  Lib/asyncio/unix_events.py          |    8 +-
  Lib/asyncio/windows_events.py       |   11 +-
  Lib/test/test_asyncio/test_tasks.py |   34 +-
  12 files changed, 83 insertions(+), 830 deletions(-)


diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py
--- a/Lib/asyncio/__init__.py
+++ b/Lib/asyncio/__init__.py
@@ -18,6 +18,7 @@
         import _overlapped  # Will also be exported.
 
 # This relies on each of the submodules having an __all__ variable.
+from .coroutines import *
 from .events import *
 from .futures import *
 from .locks import *
@@ -34,7 +35,8 @@
     from .unix_events import *  # pragma: no cover
 
 
-__all__ = (events.__all__ +
+__all__ = (coroutines.__all__ +
+           events.__all__ +
            futures.__all__ +
            locks.__all__ +
            protocols.__all__ +
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -26,9 +26,11 @@
 import os
 import sys
 
+from . import coroutines
 from . import events
 from . import futures
 from . import tasks
+from .coroutines import coroutine
 from .log import logger
 
 
@@ -118,7 +120,7 @@
             if not waiter.done():
                 waiter.set_result(waiter)
 
-    @tasks.coroutine
+    @coroutine
     def wait_closed(self):
         if self.sockets is None or self.waiters is None:
             return
@@ -175,7 +177,7 @@
         """Create write pipe transport."""
         raise NotImplementedError
 
-    @tasks.coroutine
+    @coroutine
     def _make_subprocess_transport(self, protocol, args, shell,
                                    stdin, stdout, stderr, bufsize,
                                    extra=None, **kwargs):
@@ -298,7 +300,7 @@
 
     def call_at(self, when, callback, *args):
         """Like call_later(), but uses an absolute time."""
-        if tasks.iscoroutinefunction(callback):
+        if coroutines.iscoroutinefunction(callback):
             raise TypeError("coroutines cannot be used with call_at()")
         if self._debug:
             self._assert_is_current_event_loop()
@@ -324,7 +326,7 @@
         return handle
 
     def _call_soon(self, callback, args, check_loop):
-        if tasks.iscoroutinefunction(callback):
+        if coroutines.iscoroutinefunction(callback):
             raise TypeError("coroutines cannot be used with call_soon()")
         if self._debug and check_loop:
             self._assert_is_current_event_loop()
@@ -361,7 +363,7 @@
         return handle
 
     def run_in_executor(self, executor, callback, *args):
-        if tasks.iscoroutinefunction(callback):
+        if coroutines.iscoroutinefunction(callback):
             raise TypeError("coroutines cannot be used with run_in_executor()")
         if isinstance(callback, events.Handle):
             assert not args
@@ -389,7 +391,7 @@
     def getnameinfo(self, sockaddr, flags=0):
         return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
 
-    @tasks.coroutine
+    @coroutine
     def create_connection(self, protocol_factory, host=None, port=None, *,
                           ssl=None, family=0, proto=0, flags=0, sock=None,
                           local_addr=None, server_hostname=None):
@@ -505,7 +507,7 @@
             sock, protocol_factory, ssl, server_hostname)
         return transport, protocol
 
-    @tasks.coroutine
+    @coroutine
     def _create_connection_transport(self, sock, protocol_factory, ssl,
                                      server_hostname):
         protocol = protocol_factory()
@@ -521,7 +523,7 @@
         yield from waiter
         return transport, protocol
 
-    @tasks.coroutine
+    @coroutine
     def create_datagram_endpoint(self, protocol_factory,
                                  local_addr=None, remote_addr=None, *,
                                  family=0, proto=0, flags=0):
@@ -593,7 +595,7 @@
         transport = self._make_datagram_transport(sock, protocol, r_addr)
         return transport, protocol
 
-    @tasks.coroutine
+    @coroutine
     def create_server(self, protocol_factory, host=None, port=None,
                       *,
                       family=socket.AF_UNSPEC,
@@ -672,7 +674,7 @@
             self._start_serving(protocol_factory, sock, ssl, server)
         return server
 
-    @tasks.coroutine
+    @coroutine
     def connect_read_pipe(self, protocol_factory, pipe):
         protocol = protocol_factory()
         waiter = futures.Future(loop=self)
@@ -680,7 +682,7 @@
         yield from waiter
         return transport, protocol
 
-    @tasks.coroutine
+    @coroutine
     def connect_write_pipe(self, protocol_factory, pipe):
         protocol = protocol_factory()
         waiter = futures.Future(loop=self)
@@ -688,7 +690,7 @@
         yield from waiter
         return transport, protocol
 
-    @tasks.coroutine
+    @coroutine
     def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                          universal_newlines=False, shell=True, bufsize=0,
@@ -706,7 +708,7 @@
             protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
         return transport, protocol
 
-    @tasks.coroutine
+    @coroutine
     def subprocess_exec(self, protocol_factory, program, *args,
                         stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE, universal_newlines=False,
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -2,8 +2,8 @@
 import subprocess
 
 from . import protocols
-from . import tasks
 from . import transports
+from .coroutines import coroutine
 
 
 class BaseSubprocessTransport(transports.SubprocessTransport):
@@ -65,7 +65,7 @@
     def kill(self):
         self._proc.kill()
 
-    @tasks.coroutine
+    @coroutine
     def _post_init(self):
         proc = self._proc
         loop = self._loop
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/coroutines.py
copy from Lib/asyncio/tasks.py
copy to Lib/asyncio/coroutines.py
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/coroutines.py
@@ -1,20 +1,11 @@
-"""Support for tasks, coroutines and the scheduler."""
+__all__ = ['coroutine',
+           'iscoroutinefunction', 'iscoroutine']
 
-__all__ = ['coroutine', 'Task',
-           'iscoroutinefunction', 'iscoroutine',
-           'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
-           'wait', 'wait_for', 'as_completed', 'sleep', 'async',
-           'gather', 'shield',
-           ]
-
-import concurrent.futures
 import functools
 import inspect
-import linecache
 import os
 import sys
 import traceback
-import weakref
 
 from . import events
 from . import futures
@@ -32,10 +23,8 @@
 _DEBUG = (not sys.flags.ignore_environment
           and bool(os.environ.get('PYTHONASYNCIODEBUG')))
 
-_PY34 = (sys.version_info >= (3, 4))
 _PY35 = (sys.version_info >= (3, 5))
 
-
 class CoroWrapper:
     # Wrapper for coroutine in _DEBUG mode.
 
@@ -149,621 +138,3 @@
     else:
         lineno = coro.gi_code.co_firstlineno
         return '%s() done at %s:%s' % (coro_name, filename, lineno)
-
-
-class Task(futures.Future):
-    """A coroutine wrapped in a Future."""
-
-    # An important invariant maintained while a Task not done:
-    #
-    # - Either _fut_waiter is None, and _step() is scheduled;
-    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
-    #
-    # The only transition from the latter to the former is through
-    # _wakeup().  When _fut_waiter is not None, one of its callbacks
-    # must be _wakeup().
-
-    # Weak set containing all tasks alive.
-    _all_tasks = weakref.WeakSet()
-
-    # Dictionary containing tasks that are currently active in
-    # all running event loops.  {EventLoop: Task}
-    _current_tasks = {}
-
-    @classmethod
-    def current_task(cls, loop=None):
-        """Return the currently running task in an event loop or None.
-
-        By default the current task for the current event loop is returned.
-
-        None is returned when called not in the context of a Task.
-        """
-        if loop is None:
-            loop = events.get_event_loop()
-        return cls._current_tasks.get(loop)
-
-    @classmethod
-    def all_tasks(cls, loop=None):
-        """Return a set of all tasks for an event loop.
-
-        By default all tasks for the current event loop are returned.
-        """
-        if loop is None:
-            loop = events.get_event_loop()
-        return {t for t in cls._all_tasks if t._loop is loop}
-
-    def __init__(self, coro, *, loop=None):
-        assert iscoroutine(coro), repr(coro)  # Not a coroutine function!
-        super().__init__(loop=loop)
-        if self._source_traceback:
-            del self._source_traceback[-1]
-        self._coro = iter(coro)  # Use the iterator just in case.
-        self._fut_waiter = None
-        self._must_cancel = False
-        self._loop.call_soon(self._step)
-        self.__class__._all_tasks.add(self)
-
-    # On Python 3.3 or older, objects with a destructor part of a reference
-    # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
-    # the PEP 442.
-    if _PY34:
-        def __del__(self):
-            if self._state == futures._PENDING:
-                context = {
-                    'task': self,
-                    'message': 'Task was destroyed but it is pending!',
-                }
-                if self._source_traceback:
-                    context['source_traceback'] = self._source_traceback
-                self._loop.call_exception_handler(context)
-            futures.Future.__del__(self)
-
-    def __repr__(self):
-        info = []
-        if self._must_cancel:
-            info.append('cancelling')
-        else:
-            info.append(self._state.lower())
-
-        info.append(_format_coroutine(self._coro))
-
-        if self._state == futures._FINISHED:
-            info.append(self._format_result())
-
-        if self._callbacks:
-            info.append(self._format_callbacks())
-
-        return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
-
-    def get_stack(self, *, limit=None):
-        """Return the list of stack frames for this task's coroutine.
-
-        If the coroutine is active, this returns the stack where it is
-        suspended.  If the coroutine has completed successfully or was
-        cancelled, this returns an empty list.  If the coroutine was
-        terminated by an exception, this returns the list of traceback
-        frames.
-
-        The frames are always ordered from oldest to newest.
-
-        The optional limit gives the maximum number of frames to
-        return; by default all available frames are returned.  Its
-        meaning differs depending on whether a stack or a traceback is
-        returned: the newest frames of a stack are returned, but the
-        oldest frames of a traceback are returned.  (This matches the
-        behavior of the traceback module.)
-
-        For reasons beyond our control, only one stack frame is
-        returned for a suspended coroutine.
-        """
-        frames = []
-        f = self._coro.gi_frame
-        if f is not None:
-            while f is not None:
-                if limit is not None:
-                    if limit <= 0:
-                        break
-                    limit -= 1
-                frames.append(f)
-                f = f.f_back
-            frames.reverse()
-        elif self._exception is not None:
-            tb = self._exception.__traceback__
-            while tb is not None:
-                if limit is not None:
-                    if limit <= 0:
-                        break
-                    limit -= 1
-                frames.append(tb.tb_frame)
-                tb = tb.tb_next
-        return frames
-
-    def print_stack(self, *, limit=None, file=None):
-        """Print the stack or traceback for this task's coroutine.
-
-        This produces output similar to that of the traceback module,
-        for the frames retrieved by get_stack().  The limit argument
-        is passed to get_stack().  The file argument is an I/O stream
-        to which the output goes; by default it goes to sys.stderr.
-        """
-        extracted_list = []
-        checked = set()
-        for f in self.get_stack(limit=limit):
-            lineno = f.f_lineno
-            co = f.f_code
-            filename = co.co_filename
-            name = co.co_name
-            if filename not in checked:
-                checked.add(filename)
-                linecache.checkcache(filename)
-            line = linecache.getline(filename, lineno, f.f_globals)
-            extracted_list.append((filename, lineno, name, line))
-        exc = self._exception
-        if not extracted_list:
-            print('No stack for %r' % self, file=file)
-        elif exc is not None:
-            print('Traceback for %r (most recent call last):' % self,
-                  file=file)
-        else:
-            print('Stack for %r (most recent call last):' % self,
-                  file=file)
-        traceback.print_list(extracted_list, file=file)
-        if exc is not None:
-            for line in traceback.format_exception_only(exc.__class__, exc):
-                print(line, file=file, end='')
-
-    def cancel(self):
-        """Request this task to cancel itself.
-
-        This arranges for a CancelledError to be thrown into the
-        wrapped coroutine on the next cycle through the event loop.
-        The coroutine then has a chance to clean up or even deny
-        the request using try/except/finally.
-
-        Contrary to Future.cancel(), this does not guarantee that the
-        task will be cancelled: the exception might be caught and
-        acted upon, delaying cancellation of the task or preventing it
-        completely.  The task may also return a value or raise a
-        different exception.
-
-        Immediately after this method is called, Task.cancelled() will
-        not return True (unless the task was already cancelled).  A
-        task will be marked as cancelled when the wrapped coroutine
-        terminates with a CancelledError exception (even if cancel()
-        was not called).
-        """
-        if self.done():
-            return False
-        if self._fut_waiter is not None:
-            if self._fut_waiter.cancel():
-                # Leave self._fut_waiter; it may be a Task that
-                # catches and ignores the cancellation so we may have
-                # to cancel it again later.
-                return True
-        # It must be the case that self._step is already scheduled.
-        self._must_cancel = True
-        return True
-
-    def _step(self, value=None, exc=None):
-        assert not self.done(), \
-            '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
-        if self._must_cancel:
-            if not isinstance(exc, futures.CancelledError):
-                exc = futures.CancelledError()
-            self._must_cancel = False
-        coro = self._coro
-        self._fut_waiter = None
-
-        self.__class__._current_tasks[self._loop] = self
-        # Call either coro.throw(exc) or coro.send(value).
-        try:
-            if exc is not None:
-                result = coro.throw(exc)
-            elif value is not None:
-                result = coro.send(value)
-            else:
-                result = next(coro)
-        except StopIteration as exc:
-            self.set_result(exc.value)
-        except futures.CancelledError as exc:
-            super().cancel()  # I.e., Future.cancel(self).
-        except Exception as exc:
-            self.set_exception(exc)
-        except BaseException as exc:
-            self.set_exception(exc)
-            raise
-        else:
-            if isinstance(result, futures.Future):
-                # Yielded Future must come from Future.__iter__().
-                if result._blocking:
-                    result._blocking = False
-                    result.add_done_callback(self._wakeup)
-                    self._fut_waiter = result
-                    if self._must_cancel:
-                        if self._fut_waiter.cancel():
-                            self._must_cancel = False
-                else:
-                    self._loop.call_soon(
-                        self._step, None,
-                        RuntimeError(
-                            'yield was used instead of yield from '
-                            'in task {!r} with {!r}'.format(self, result)))
-            elif result is None:
-                # Bare yield relinquishes control for one event loop iteration.
-                self._loop.call_soon(self._step)
-            elif inspect.isgenerator(result):
-                # Yielding a generator is just wrong.
-                self._loop.call_soon(
-                    self._step, None,
-                    RuntimeError(
-                        'yield was used instead of yield from for '
-                        'generator in task {!r} with {}'.format(
-                            self, result)))
-            else:
-                # Yielding something else is an error.
-                self._loop.call_soon(
-                    self._step, None,
-                    RuntimeError(
-                        'Task got bad yield: {!r}'.format(result)))
-        finally:
-            self.__class__._current_tasks.pop(self._loop)
-            self = None  # Needed to break cycles when an exception occurs.
-
-    def _wakeup(self, future):
-        try:
-            value = future.result()
-        except Exception as exc:
-            # This may also be a cancellation.
-            self._step(None, exc)
-        else:
-            self._step(value, None)
-        self = None  # Needed to break cycles when an exception occurs.
-
-
-# wait() and as_completed() similar to those in PEP 3148.
-
-FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
-FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
-ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
-
-
- at coroutine
-def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
-    """Wait for the Futures and coroutines given by fs to complete.
-
-    The sequence futures must not be empty.
-
-    Coroutines will be wrapped in Tasks.
-
-    Returns two sets of Future: (done, pending).
-
-    Usage:
-
-        done, pending = yield from asyncio.wait(fs)
-
-    Note: This does not raise TimeoutError! Futures that aren't done
-    when the timeout occurs are returned in the second set.
-    """
-    if isinstance(fs, futures.Future) or iscoroutine(fs):
-        raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
-    if not fs:
-        raise ValueError('Set of coroutines/Futures is empty.')
-
-    if loop is None:
-        loop = events.get_event_loop()
-
-    fs = {async(f, loop=loop) for f in set(fs)}
-
-    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
-        raise ValueError('Invalid return_when value: {}'.format(return_when))
-    return (yield from _wait(fs, timeout, return_when, loop))
-
-
-def _release_waiter(waiter, value=True, *args):
-    if not waiter.done():
-        waiter.set_result(value)
-
-
- at coroutine
-def wait_for(fut, timeout, *, loop=None):
-    """Wait for the single Future or coroutine to complete, with timeout.
-
-    Coroutine will be wrapped in Task.
-
-    Returns result of the Future or coroutine.  When a timeout occurs,
-    it cancels the task and raises TimeoutError.  To avoid the task
-    cancellation, wrap it in shield().
-
-    Usage:
-
-        result = yield from asyncio.wait_for(fut, 10.0)
-
-    """
-    if loop is None:
-        loop = events.get_event_loop()
-
-    if timeout is None:
-        return (yield from fut)
-
-    waiter = futures.Future(loop=loop)
-    timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
-    cb = functools.partial(_release_waiter, waiter, True)
-
-    fut = async(fut, loop=loop)
-    fut.add_done_callback(cb)
-
-    try:
-        if (yield from waiter):
-            return fut.result()
-        else:
-            fut.remove_done_callback(cb)
-            fut.cancel()
-            raise futures.TimeoutError()
-    finally:
-        timeout_handle.cancel()
-
-
- at coroutine
-def _wait(fs, timeout, return_when, loop):
-    """Internal helper for wait() and _wait_for().
-
-    The fs argument must be a collection of Futures.
-    """
-    assert fs, 'Set of Futures is empty.'
-    waiter = futures.Future(loop=loop)
-    timeout_handle = None
-    if timeout is not None:
-        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
-    counter = len(fs)
-
-    def _on_completion(f):
-        nonlocal counter
-        counter -= 1
-        if (counter <= 0 or
-            return_when == FIRST_COMPLETED or
-            return_when == FIRST_EXCEPTION and (not f.cancelled() and
-                                                f.exception() is not None)):
-            if timeout_handle is not None:
-                timeout_handle.cancel()
-            if not waiter.done():
-                waiter.set_result(False)
-
-    for f in fs:
-        f.add_done_callback(_on_completion)
-
-    try:
-        yield from waiter
-    finally:
-        if timeout_handle is not None:
-            timeout_handle.cancel()
-
-    done, pending = set(), set()
-    for f in fs:
-        f.remove_done_callback(_on_completion)
-        if f.done():
-            done.add(f)
-        else:
-            pending.add(f)
-    return done, pending
-
-
-# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
-def as_completed(fs, *, loop=None, timeout=None):
-    """Return an iterator whose values are coroutines.
-
-    When waiting for the yielded coroutines you'll get the results (or
-    exceptions!) of the original Futures (or coroutines), in the order
-    in which and as soon as they complete.
-
-    This differs from PEP 3148; the proper way to use this is:
-
-        for f in as_completed(fs):
-            result = yield from f  # The 'yield from' may raise.
-            # Use result.
-
-    If a timeout is specified, the 'yield from' will raise
-    TimeoutError when the timeout occurs before all Futures are done.
-
-    Note: The futures 'f' are not necessarily members of fs.
-    """
-    if isinstance(fs, futures.Future) or iscoroutine(fs):
-        raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
-    loop = loop if loop is not None else events.get_event_loop()
-    todo = {async(f, loop=loop) for f in set(fs)}
-    from .queues import Queue  # Import here to avoid circular import problem.
-    done = Queue(loop=loop)
-    timeout_handle = None
-
-    def _on_timeout():
-        for f in todo:
-            f.remove_done_callback(_on_completion)
-            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
-        todo.clear()  # Can't do todo.remove(f) in the loop.
-
-    def _on_completion(f):
-        if not todo:
-            return  # _on_timeout() was here first.
-        todo.remove(f)
-        done.put_nowait(f)
-        if not todo and timeout_handle is not None:
-            timeout_handle.cancel()
-
-    @coroutine
-    def _wait_for_one():
-        f = yield from done.get()
-        if f is None:
-            # Dummy value from _on_timeout().
-            raise futures.TimeoutError
-        return f.result()  # May raise f.exception().
-
-    for f in todo:
-        f.add_done_callback(_on_completion)
-    if todo and timeout is not None:
-        timeout_handle = loop.call_later(timeout, _on_timeout)
-    for _ in range(len(todo)):
-        yield _wait_for_one()
-
-
- at coroutine
-def sleep(delay, result=None, *, loop=None):
-    """Coroutine that completes after a given time (in seconds)."""
-    future = futures.Future(loop=loop)
-    h = future._loop.call_later(delay, future.set_result, result)
-    try:
-        return (yield from future)
-    finally:
-        h.cancel()
-
-
-def async(coro_or_future, *, loop=None):
-    """Wrap a coroutine in a future.
-
-    If the argument is a Future, it is returned directly.
-    """
-    if isinstance(coro_or_future, futures.Future):
-        if loop is not None and loop is not coro_or_future._loop:
-            raise ValueError('loop argument must agree with Future')
-        return coro_or_future
-    elif iscoroutine(coro_or_future):
-        task = Task(coro_or_future, loop=loop)
-        if task._source_traceback:
-            del task._source_traceback[-1]
-        return task
-    else:
-        raise TypeError('A Future or coroutine is required')
-
-
-class _GatheringFuture(futures.Future):
-    """Helper for gather().
-
-    This overrides cancel() to cancel all the children and act more
-    like Task.cancel(), which doesn't immediately mark itself as
-    cancelled.
-    """
-
-    def __init__(self, children, *, loop=None):
-        super().__init__(loop=loop)
-        self._children = children
-
-    def cancel(self):
-        if self.done():
-            return False
-        for child in self._children:
-            child.cancel()
-        return True
-
-
-def gather(*coros_or_futures, loop=None, return_exceptions=False):
-    """Return a future aggregating results from the given coroutines
-    or futures.
-
-    All futures must share the same event loop.  If all the tasks are
-    done successfully, the returned future's result is the list of
-    results (in the order of the original sequence, not necessarily
-    the order of results arrival).  If *return_exceptions* is True,
-    exceptions in the tasks are treated the same as successful
-    results, and gathered in the result list; otherwise, the first
-    raised exception will be immediately propagated to the returned
-    future.
-
-    Cancellation: if the outer Future is cancelled, all children (that
-    have not completed yet) are also cancelled.  If any child is
-    cancelled, this is treated as if it raised CancelledError --
-    the outer Future is *not* cancelled in this case.  (This is to
-    prevent the cancellation of one child to cause other children to
-    be cancelled.)
-    """
-    arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)}
-    children = [arg_to_fut[arg] for arg in coros_or_futures]
-    n = len(children)
-    if n == 0:
-        outer = futures.Future(loop=loop)
-        outer.set_result([])
-        return outer
-    if loop is None:
-        loop = children[0]._loop
-    for fut in children:
-        if fut._loop is not loop:
-            raise ValueError("futures are tied to different event loops")
-    outer = _GatheringFuture(children, loop=loop)
-    nfinished = 0
-    results = [None] * n
-
-    def _done_callback(i, fut):
-        nonlocal nfinished
-        if outer._state != futures._PENDING:
-            if fut._exception is not None:
-                # Mark exception retrieved.
-                fut.exception()
-            return
-        if fut._state == futures._CANCELLED:
-            res = futures.CancelledError()
-            if not return_exceptions:
-                outer.set_exception(res)
-                return
-        elif fut._exception is not None:
-            res = fut.exception()  # Mark exception retrieved.
-            if not return_exceptions:
-                outer.set_exception(res)
-                return
-        else:
-            res = fut._result
-        results[i] = res
-        nfinished += 1
-        if nfinished == n:
-            outer.set_result(results)
-
-    for i, fut in enumerate(children):
-        fut.add_done_callback(functools.partial(_done_callback, i))
-    return outer
-
-
-def shield(arg, *, loop=None):
-    """Wait for a future, shielding it from cancellation.
-
-    The statement
-
-        res = yield from shield(something())
-
-    is exactly equivalent to the statement
-
-        res = yield from something()
-
-    *except* that if the coroutine containing it is cancelled, the
-    task running in something() is not cancelled.  From the POV of
-    something(), the cancellation did not happen.  But its caller is
-    still cancelled, so the yield-from expression still raises
-    CancelledError.  Note: If something() is cancelled by other means
-    this will still cancel shield().
-
-    If you want to completely ignore cancellation (not recommended)
-    you can combine shield() with a try/except clause, as follows:
-
-        try:
-            res = yield from shield(something())
-        except CancelledError:
-            res = None
-    """
-    inner = async(arg, loop=loop)
-    if inner.done():
-        # Shortcut.
-        return inner
-    loop = inner._loop
-    outer = futures.Future(loop=loop)
-
-    def _done_callback(inner):
-        if outer.cancelled():
-            # Mark inner's result as retrieved.
-            inner.cancelled() or inner.exception()
-            return
-        if inner.cancelled():
-            outer.cancel()
-        else:
-            exc = inner.exception()
-            if exc is not None:
-                outer.set_exception(exc)
-            else:
-                outer.set_result(inner.result())
-
-    inner.add_done_callback(_done_callback)
-    return outer
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py
--- a/Lib/asyncio/locks.py
+++ b/Lib/asyncio/locks.py
@@ -6,7 +6,7 @@
 
 from . import events
 from . import futures
-from . import tasks
+from .coroutines import coroutine
 
 
 class _ContextManager:
@@ -112,7 +112,7 @@
         """Return True if lock is acquired."""
         return self._locked
 
-    @tasks.coroutine
+    @coroutine
     def acquire(self):
         """Acquire a lock.
 
@@ -225,7 +225,7 @@
         to true again."""
         self._value = False
 
-    @tasks.coroutine
+    @coroutine
     def wait(self):
         """Block until the internal flag is true.
 
@@ -278,7 +278,7 @@
             extra = '{},waiters:{}'.format(extra, len(self._waiters))
         return '<{} [{}]>'.format(res[1:-1], extra)
 
-    @tasks.coroutine
+    @coroutine
     def wait(self):
         """Wait until notified.
 
@@ -306,7 +306,7 @@
         finally:
             yield from self.acquire()
 
-    @tasks.coroutine
+    @coroutine
     def wait_for(self, predicate):
         """Wait until a predicate becomes true.
 
@@ -402,7 +402,7 @@
         """Returns True if semaphore can not be acquired immediately."""
         return self._value == 0
 
-    @tasks.coroutine
+    @coroutine
     def acquire(self):
         """Acquire a semaphore.
 
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -10,10 +10,12 @@
 if hasattr(socket, 'AF_UNIX'):
     __all__.extend(['open_unix_connection', 'start_unix_server'])
 
+from . import coroutines
 from . import events
 from . import futures
 from . import protocols
 from . import tasks
+from .coroutines import coroutine
 
 
 _DEFAULT_LIMIT = 2**16
@@ -33,7 +35,7 @@
         self.expected = expected
 
 
- at tasks.coroutine
+ at coroutine
 def open_connection(host=None, port=None, *,
                     loop=None, limit=_DEFAULT_LIMIT, **kwds):
     """A wrapper for create_connection() returning a (reader, writer) pair.
@@ -63,7 +65,7 @@
     return reader, writer
 
 
- at tasks.coroutine
+ at coroutine
 def start_server(client_connected_cb, host=None, port=None, *,
                  loop=None, limit=_DEFAULT_LIMIT, **kwds):
     """Start a socket server, call back for each client connected.
@@ -102,7 +104,7 @@
 if hasattr(socket, 'AF_UNIX'):
     # UNIX Domain Sockets are supported on this platform
 
-    @tasks.coroutine
+    @coroutine
     def open_unix_connection(path=None, *,
                              loop=None, limit=_DEFAULT_LIMIT, **kwds):
         """Similar to `open_connection` but works with UNIX Domain Sockets."""
@@ -116,7 +118,7 @@
         return reader, writer
 
 
-    @tasks.coroutine
+    @coroutine
     def start_unix_server(client_connected_cb, path=None, *,
                           loop=None, limit=_DEFAULT_LIMIT, **kwds):
         """Similar to `start_server` but works with UNIX Domain Sockets."""
@@ -210,7 +212,7 @@
                                                self._loop)
             res = self._client_connected_cb(self._stream_reader,
                                             self._stream_writer)
-            if tasks.iscoroutine(res):
+            if coroutines.iscoroutine(res):
                 tasks.Task(res, loop=self._loop)
 
     def connection_lost(self, exc):
@@ -373,7 +375,7 @@
                                'already waiting for incoming data' % func_name)
         return futures.Future(loop=self._loop)
 
-    @tasks.coroutine
+    @coroutine
     def readline(self):
         if self._exception is not None:
             raise self._exception
@@ -410,7 +412,7 @@
         self._maybe_resume_transport()
         return bytes(line)
 
-    @tasks.coroutine
+    @coroutine
     def read(self, n=-1):
         if self._exception is not None:
             raise self._exception
@@ -449,7 +451,7 @@
         self._maybe_resume_transport()
         return data
 
-    @tasks.coroutine
+    @coroutine
     def readexactly(self, n):
         if self._exception is not None:
             raise self._exception
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -8,6 +8,7 @@
 from . import protocols
 from . import streams
 from . import tasks
+from .coroutines import coroutine
 
 
 PIPE = subprocess.PIPE
@@ -94,7 +95,7 @@
     def returncode(self):
         return self._transport.get_returncode()
 
-    @tasks.coroutine
+    @coroutine
     def wait(self):
         """Wait until the process exit and return the process return code."""
         returncode = self._transport.get_returncode()
@@ -122,17 +123,17 @@
         self._check_alive()
         self._transport.kill()
 
-    @tasks.coroutine
+    @coroutine
     def _feed_stdin(self, input):
         self.stdin.write(input)
         yield from self.stdin.drain()
         self.stdin.close()
 
-    @tasks.coroutine
+    @coroutine
     def _noop(self):
         return None
 
-    @tasks.coroutine
+    @coroutine
     def _read_stream(self, fd):
         transport = self._transport.get_pipe_transport(fd)
         if fd == 2:
@@ -144,7 +145,7 @@
         transport.close()
         return output
 
-    @tasks.coroutine
+    @coroutine
     def communicate(self, input=None):
         if input:
             stdin = self._feed_stdin(input)
@@ -164,7 +165,7 @@
         return (stdout, stderr)
 
 
- at tasks.coroutine
+ at coroutine
 def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
                             loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
     if loop is None:
@@ -178,7 +179,7 @@
     yield from protocol.waiter
     return Process(transport, protocol, loop)
 
- at tasks.coroutine
+ at coroutine
 def create_subprocess_exec(program, *args, stdin=None, stdout=None,
                            stderr=None, loop=None,
                            limit=streams._DEFAULT_LIMIT, **kwds):
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -1,7 +1,6 @@
 """Support for tasks, coroutines and the scheduler."""
 
-__all__ = ['coroutine', 'Task',
-           'iscoroutinefunction', 'iscoroutine',
+__all__ = ['Task',
            'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
            'wait', 'wait_for', 'as_completed', 'sleep', 'async',
            'gather', 'shield',
@@ -11,146 +10,20 @@
 import functools
 import inspect
 import linecache
-import os
 import sys
 import traceback
 import weakref
 
+from . import coroutines
 from . import events
 from . import futures
+from .coroutines import coroutine
 from .log import logger
 
-# If you set _DEBUG to true, @coroutine will wrap the resulting
-# generator objects in a CoroWrapper instance (defined below).  That
-# instance will log a message when the generator is never iterated
-# over, which may happen when you forget to use "yield from" with a
-# coroutine call.  Note that the value of the _DEBUG flag is taken
-# when the decorator is used, so to be of any use it must be set
-# before you define your coroutines.  A downside of using this feature
-# is that tracebacks show entries for the CoroWrapper.__next__ method
-# when _DEBUG is true.
-_DEBUG = (not sys.flags.ignore_environment
-          and bool(os.environ.get('PYTHONASYNCIODEBUG')))
-
 _PY34 = (sys.version_info >= (3, 4))
 _PY35 = (sys.version_info >= (3, 5))
 
 
-class CoroWrapper:
-    # Wrapper for coroutine in _DEBUG mode.
-
-    def __init__(self, gen, func):
-        assert inspect.isgenerator(gen), gen
-        self.gen = gen
-        self.func = func
-        self._source_traceback = traceback.extract_stack(sys._getframe(1))
-
-    def __iter__(self):
-        return self
-
-    def __next__(self):
-        return next(self.gen)
-
-    def send(self, *value):
-        # We use `*value` because of a bug in CPythons prior
-        # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
-        # for details.  This workaround should be removed in 3.5.0.
-        if len(value) == 1:
-            value = value[0]
-        return self.gen.send(value)
-
-    def throw(self, exc):
-        return self.gen.throw(exc)
-
-    def close(self):
-        return self.gen.close()
-
-    @property
-    def gi_frame(self):
-        return self.gen.gi_frame
-
-    @property
-    def gi_running(self):
-        return self.gen.gi_running
-
-    @property
-    def gi_code(self):
-        return self.gen.gi_code
-
-    def __del__(self):
-        # Be careful accessing self.gen.frame -- self.gen might not exist.
-        gen = getattr(self, 'gen', None)
-        frame = getattr(gen, 'gi_frame', None)
-        if frame is not None and frame.f_lasti == -1:
-            func = events._format_callback(self.func, ())
-            tb = ''.join(traceback.format_list(self._source_traceback))
-            message = ('Coroutine %s was never yielded from\n'
-                       'Coroutine object created at (most recent call last):\n'
-                       '%s'
-                       % (func, tb.rstrip()))
-            logger.error(message)
-
-
-def coroutine(func):
-    """Decorator to mark coroutines.
-
-    If the coroutine is not yielded from before it is destroyed,
-    an error message is logged.
-    """
-    if inspect.isgeneratorfunction(func):
-        coro = func
-    else:
-        @functools.wraps(func)
-        def coro(*args, **kw):
-            res = func(*args, **kw)
-            if isinstance(res, futures.Future) or inspect.isgenerator(res):
-                res = yield from res
-            return res
-
-    if not _DEBUG:
-        wrapper = coro
-    else:
-        @functools.wraps(func)
-        def wrapper(*args, **kwds):
-            w = CoroWrapper(coro(*args, **kwds), func)
-            if w._source_traceback:
-                del w._source_traceback[-1]
-            w.__name__ = func.__name__
-            if _PY35:
-                w.__qualname__ = func.__qualname__
-            w.__doc__ = func.__doc__
-            return w
-
-    wrapper._is_coroutine = True  # For iscoroutinefunction().
-    return wrapper
-
-
-def iscoroutinefunction(func):
-    """Return True if func is a decorated coroutine function."""
-    return getattr(func, '_is_coroutine', False)
-
-
-def iscoroutine(obj):
-    """Return True if obj is a coroutine object."""
-    return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
-
-
-def _format_coroutine(coro):
-    assert iscoroutine(coro)
-    if _PY35:
-        coro_name = coro.__qualname__
-    else:
-        coro_name = coro.__name__
-
-    filename = coro.gi_code.co_filename
-    if coro.gi_frame is not None:
-        lineno = coro.gi_frame.f_lineno
-        return '%s() at %s:%s' % (coro_name, filename, lineno)
-    else:
-        lineno = coro.gi_code.co_firstlineno
-        return '%s() done at %s:%s' % (coro_name, filename, lineno)
-
-
 class Task(futures.Future):
     """A coroutine wrapped in a Future."""
 
@@ -193,7 +66,7 @@
         return {t for t in cls._all_tasks if t._loop is loop}
 
     def __init__(self, coro, *, loop=None):
-        assert iscoroutine(coro), repr(coro)  # Not a coroutine function!
+        assert coroutines.iscoroutine(coro), repr(coro)  # Not a coroutine function!
         super().__init__(loop=loop)
         if self._source_traceback:
             del self._source_traceback[-1]
@@ -225,7 +98,7 @@
         else:
             info.append(self._state.lower())
 
-        info.append(_format_coroutine(self._coro))
+        info.append(coroutines._format_coroutine(self._coro))
 
         if self._state == futures._FINISHED:
             info.append(self._format_result())
@@ -444,7 +317,7 @@
     Note: This does not raise TimeoutError! Futures that aren't done
     when the timeout occurs are returned in the second set.
     """
-    if isinstance(fs, futures.Future) or iscoroutine(fs):
+    if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
         raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
     if not fs:
         raise ValueError('Set of coroutines/Futures is empty.')
@@ -566,7 +439,7 @@
 
     Note: The futures 'f' are not necessarily members of fs.
     """
-    if isinstance(fs, futures.Future) or iscoroutine(fs):
+    if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
         raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
     loop = loop if loop is not None else events.get_event_loop()
     todo = {async(f, loop=loop) for f in set(fs)}
@@ -624,7 +497,7 @@
         if loop is not None and loop is not coro_or_future._loop:
             raise ValueError('loop argument must agree with Future')
         return coro_or_future
-    elif iscoroutine(coro_or_future):
+    elif coroutines.iscoroutine(coro_or_future):
         task = Task(coro_or_future, loop=loop)
         if task._source_traceback:
             del task._source_traceback[-1]
diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py
--- a/Lib/asyncio/test_utils.py
+++ b/Lib/asyncio/test_utils.py
@@ -27,6 +27,7 @@
 from . import futures
 from . import selectors
 from . import tasks
+from .coroutines import coroutine
 
 
 if sys.platform == 'win32':  # pragma: no cover
@@ -43,7 +44,7 @@
 
 
 def run_briefly(loop):
-    @tasks.coroutine
+    @coroutine
     def once():
         pass
     gen = once()
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -16,8 +16,8 @@
 from . import constants
 from . import events
 from . import selector_events
-from . import tasks
 from . import transports
+from .coroutines import coroutine
 from .log import logger
 
 
@@ -147,7 +147,7 @@
                                    extra=None):
         return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
 
-    @tasks.coroutine
+    @coroutine
     def _make_subprocess_transport(self, protocol, args, shell,
                                    stdin, stdout, stderr, bufsize,
                                    extra=None, **kwargs):
@@ -164,7 +164,7 @@
     def _child_watcher_callback(self, pid, returncode, transp):
         self.call_soon_threadsafe(transp._process_exited, returncode)
 
-    @tasks.coroutine
+    @coroutine
     def create_unix_connection(self, protocol_factory, path, *,
                                ssl=None, sock=None,
                                server_hostname=None):
@@ -199,7 +199,7 @@
             sock, protocol_factory, ssl, server_hostname)
         return transport, protocol
 
-    @tasks.coroutine
+    @coroutine
     def create_unix_server(self, protocol_factory, path=None, *,
                            sock=None, backlog=100, ssl=None):
         if isinstance(ssl, bool):
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -14,8 +14,9 @@
 from . import selector_events
 from . import tasks
 from . import windows_utils
+from . import _overlapped
+from .coroutines import coroutine
 from .log import logger
-from . import _overlapped
 
 
 __all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
@@ -129,7 +130,7 @@
     def _socketpair(self):
         return windows_utils.socketpair()
 
-    @tasks.coroutine
+    @coroutine
     def create_pipe_connection(self, protocol_factory, address):
         f = self._proactor.connect_pipe(address)
         pipe = yield from f
@@ -138,7 +139,7 @@
                                                  extra={'addr': address})
         return trans, protocol
 
-    @tasks.coroutine
+    @coroutine
     def start_serving_pipe(self, protocol_factory, address):
         server = PipeServer(address)
 
@@ -172,7 +173,7 @@
         self.call_soon(loop)
         return [server]
 
-    @tasks.coroutine
+    @coroutine
     def _make_subprocess_transport(self, protocol, args, shell,
                                    stdin, stdout, stderr, bufsize,
                                    extra=None, **kwargs):
@@ -258,7 +259,7 @@
             conn.settimeout(listener.gettimeout())
             return conn, conn.getpeername()
 
-        @tasks.coroutine
+        @coroutine
         def accept_coro(future, conn):
             # Coroutine closing the accept socket if the future is cancelled
             try:
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -11,7 +11,7 @@
 from unittest import mock
 
 import asyncio
-from asyncio import tasks
+from asyncio import coroutines
 from asyncio import test_utils
 
 
@@ -193,7 +193,7 @@
             # attribute).
             coro_name = 'notmuch'
             coro_qualname = 'TaskTests.test_task_repr_coro_decorator.<locals>.notmuch'
-        elif tasks._DEBUG:
+        elif coroutines._DEBUG:
             # In debug mode, @coroutine decorator uses CoroWrapper which gets
             # its name (__name__ attribute) from the wrapped coroutine
             # function.
@@ -1475,23 +1475,23 @@
             self.assertIsNone(gen.gi_frame)
 
         # Save debug flag.
-        old_debug = asyncio.tasks._DEBUG
+        old_debug = asyncio.coroutines._DEBUG
         try:
             # Test with debug flag cleared.
-            asyncio.tasks._DEBUG = False
+            asyncio.coroutines._DEBUG = False
             check()
 
             # Test with debug flag set.
-            asyncio.tasks._DEBUG = True
+            asyncio.coroutines._DEBUG = True
             check()
 
         finally:
             # Restore original debug flag.
-            asyncio.tasks._DEBUG = old_debug
+            asyncio.coroutines._DEBUG = old_debug
 
     def test_yield_from_corowrapper(self):
-        old_debug = asyncio.tasks._DEBUG
-        asyncio.tasks._DEBUG = True
+        old_debug = asyncio.coroutines._DEBUG
+        asyncio.coroutines._DEBUG = True
         try:
             @asyncio.coroutine
             def t1():
@@ -1511,7 +1511,7 @@
             val = self.loop.run_until_complete(task)
             self.assertEqual(val, (1, 2, 3))
         finally:
-            asyncio.tasks._DEBUG = old_debug
+            asyncio.coroutines._DEBUG = old_debug
 
     def test_yield_from_corowrapper_send(self):
         def foo():
@@ -1519,7 +1519,7 @@
             return a
 
         def call(arg):
-            cw = asyncio.tasks.CoroWrapper(foo(), foo)
+            cw = asyncio.coroutines.CoroWrapper(foo(), foo)
             cw.send(None)
             try:
                 cw.send(arg)
@@ -1534,7 +1534,7 @@
     def test_corowrapper_weakref(self):
         wd = weakref.WeakValueDictionary()
         def foo(): yield from []
-        cw = asyncio.tasks.CoroWrapper(foo(), foo)
+        cw = asyncio.coroutines.CoroWrapper(foo(), foo)
         wd['cw'] = cw  # Would fail without __weakref__ slot.
         cw.gen = None  # Suppress warning from __del__.
 
@@ -1580,16 +1580,16 @@
         })
         mock_handler.reset_mock()
 
-    @mock.patch('asyncio.tasks.logger')
+    @mock.patch('asyncio.coroutines.logger')
     def test_coroutine_never_yielded(self, m_log):
-        debug = asyncio.tasks._DEBUG
+        debug = asyncio.coroutines._DEBUG
         try:
-            asyncio.tasks._DEBUG = True
+            asyncio.coroutines._DEBUG = True
             @asyncio.coroutine
             def coro_noop():
                 pass
         finally:
-            asyncio.tasks._DEBUG = debug
+            asyncio.coroutines._DEBUG = debug
 
         tb_filename = __file__
         tb_lineno = sys._getframe().f_lineno + 1
@@ -1695,8 +1695,8 @@
 
     def test_env_var_debug(self):
         code = '\n'.join((
-            'import asyncio.tasks',
-            'print(asyncio.tasks._DEBUG)'))
+            'import asyncio.coroutines',
+            'print(asyncio.coroutines._DEBUG)'))
 
         # Test with -E to not fail if the unit test was run with
         # PYTHONASYNCIODEBUG set to a non-empty string

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list