cpython (merge 3.4 -> default): (Merge 3.4) asyncio: sync with Tulip, add a new asyncio.coroutines module
![](https://secure.gravatar.com/avatar/8ac615df352a970211b0e3d94a307c6d.jpg?s=120&d=mm&r=g)
http://hg.python.org/cpython/rev/4f3a8829c069 changeset: 91462:4f3a8829c069 parent: 91460:54f94e753269 parent: 91461:8734e881c400 user: Victor Stinner <victor.stinner@gmail.com> date: Sun Jun 29 00:47:28 2014 +0200 summary: (Merge 3.4) asyncio: sync with Tulip, add a new asyncio.coroutines module files: Lib/asyncio/__init__.py | 4 +- Lib/asyncio/base_events.py | 28 +- Lib/asyncio/base_subprocess.py | 4 +- Lib/asyncio/tasks.py | 633 +--------------- Lib/asyncio/locks.py | 12 +- Lib/asyncio/streams.py | 18 +- Lib/asyncio/subprocess.py | 15 +- Lib/asyncio/tasks.py | 143 +--- Lib/asyncio/test_utils.py | 3 +- Lib/asyncio/unix_events.py | 8 +- Lib/asyncio/windows_events.py | 11 +- Lib/test/test_asyncio/test_tasks.py | 34 +- 12 files changed, 83 insertions(+), 830 deletions(-) diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -18,6 +18,7 @@ import _overlapped # Will also be exported. # This relies on each of the submodules having an __all__ variable. +from .coroutines import * from .events import * from .futures import * from .locks import * @@ -34,7 +35,8 @@ from .unix_events import * # pragma: no cover -__all__ = (events.__all__ + +__all__ = (coroutines.__all__ + + events.__all__ + futures.__all__ + locks.__all__ + protocols.__all__ + diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -26,9 +26,11 @@ import os import sys +from . import coroutines from . import events from . import futures from . import tasks +from .coroutines import coroutine from .log import logger @@ -118,7 +120,7 @@ if not waiter.done(): waiter.set_result(waiter) - @tasks.coroutine + @coroutine def wait_closed(self): if self.sockets is None or self.waiters is None: return @@ -175,7 +177,7 @@ """Create write pipe transport.""" raise NotImplementedError - @tasks.coroutine + @coroutine def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): @@ -298,7 +300,7 @@ def call_at(self, when, callback, *args): """Like call_later(), but uses an absolute time.""" - if tasks.iscoroutinefunction(callback): + if coroutines.iscoroutinefunction(callback): raise TypeError("coroutines cannot be used with call_at()") if self._debug: self._assert_is_current_event_loop() @@ -324,7 +326,7 @@ return handle def _call_soon(self, callback, args, check_loop): - if tasks.iscoroutinefunction(callback): + if coroutines.iscoroutinefunction(callback): raise TypeError("coroutines cannot be used with call_soon()") if self._debug and check_loop: self._assert_is_current_event_loop() @@ -361,7 +363,7 @@ return handle def run_in_executor(self, executor, callback, *args): - if tasks.iscoroutinefunction(callback): + if coroutines.iscoroutinefunction(callback): raise TypeError("coroutines cannot be used with run_in_executor()") if isinstance(callback, events.Handle): assert not args @@ -389,7 +391,7 @@ def getnameinfo(self, sockaddr, flags=0): return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags) - @tasks.coroutine + @coroutine def create_connection(self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None): @@ -505,7 +507,7 @@ sock, protocol_factory, ssl, server_hostname) return transport, protocol - @tasks.coroutine + @coroutine def _create_connection_transport(self, sock, protocol_factory, ssl, server_hostname): protocol = protocol_factory() @@ -521,7 +523,7 @@ yield from waiter return transport, protocol - @tasks.coroutine + @coroutine def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0): @@ -593,7 +595,7 @@ transport = self._make_datagram_transport(sock, protocol, r_addr) return transport, protocol - @tasks.coroutine + @coroutine def create_server(self, protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, @@ -672,7 +674,7 @@ self._start_serving(protocol_factory, sock, ssl, server) return server - @tasks.coroutine + @coroutine def connect_read_pipe(self, protocol_factory, pipe): protocol = protocol_factory() waiter = futures.Future(loop=self) @@ -680,7 +682,7 @@ yield from waiter return transport, protocol - @tasks.coroutine + @coroutine def connect_write_pipe(self, protocol_factory, pipe): protocol = protocol_factory() waiter = futures.Future(loop=self) @@ -688,7 +690,7 @@ yield from waiter return transport, protocol - @tasks.coroutine + @coroutine def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=False, shell=True, bufsize=0, @@ -706,7 +708,7 @@ protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) return transport, protocol - @tasks.coroutine + @coroutine def subprocess_exec(self, protocol_factory, program, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=False, diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -2,8 +2,8 @@ import subprocess from . import protocols -from . import tasks from . import transports +from .coroutines import coroutine class BaseSubprocessTransport(transports.SubprocessTransport): @@ -65,7 +65,7 @@ def kill(self): self._proc.kill() - @tasks.coroutine + @coroutine def _post_init(self): proc = self._proc loop = self._loop diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/coroutines.py copy from Lib/asyncio/tasks.py copy to Lib/asyncio/coroutines.py --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/coroutines.py @@ -1,20 +1,11 @@ -"""Support for tasks, coroutines and the scheduler.""" +__all__ = ['coroutine', + 'iscoroutinefunction', 'iscoroutine'] -__all__ = ['coroutine', 'Task', - 'iscoroutinefunction', 'iscoroutine', - 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', - 'wait', 'wait_for', 'as_completed', 'sleep', 'async', - 'gather', 'shield', - ] - -import concurrent.futures import functools import inspect -import linecache import os import sys import traceback -import weakref from . import events from . import futures @@ -32,10 +23,8 @@ _DEBUG = (not sys.flags.ignore_environment and bool(os.environ.get('PYTHONASYNCIODEBUG'))) -_PY34 = (sys.version_info >= (3, 4)) _PY35 = (sys.version_info >= (3, 5)) - class CoroWrapper: # Wrapper for coroutine in _DEBUG mode. @@ -149,621 +138,3 @@ else: lineno = coro.gi_code.co_firstlineno return '%s() done at %s:%s' % (coro_name, filename, lineno) - - -class Task(futures.Future): - """A coroutine wrapped in a Future.""" - - # An important invariant maintained while a Task not done: - # - # - Either _fut_waiter is None, and _step() is scheduled; - # - or _fut_waiter is some Future, and _step() is *not* scheduled. - # - # The only transition from the latter to the former is through - # _wakeup(). When _fut_waiter is not None, one of its callbacks - # must be _wakeup(). - - # Weak set containing all tasks alive. - _all_tasks = weakref.WeakSet() - - # Dictionary containing tasks that are currently active in - # all running event loops. {EventLoop: Task} - _current_tasks = {} - - @classmethod - def current_task(cls, loop=None): - """Return the currently running task in an event loop or None. - - By default the current task for the current event loop is returned. - - None is returned when called not in the context of a Task. - """ - if loop is None: - loop = events.get_event_loop() - return cls._current_tasks.get(loop) - - @classmethod - def all_tasks(cls, loop=None): - """Return a set of all tasks for an event loop. - - By default all tasks for the current event loop are returned. - """ - if loop is None: - loop = events.get_event_loop() - return {t for t in cls._all_tasks if t._loop is loop} - - def __init__(self, coro, *, loop=None): - assert iscoroutine(coro), repr(coro) # Not a coroutine function! - super().__init__(loop=loop) - if self._source_traceback: - del self._source_traceback[-1] - self._coro = iter(coro) # Use the iterator just in case. - self._fut_waiter = None - self._must_cancel = False - self._loop.call_soon(self._step) - self.__class__._all_tasks.add(self) - - # On Python 3.3 or older, objects with a destructor part of a reference - # cycle are never destroyed. It's not more the case on Python 3.4 thanks to - # the PEP 442. - if _PY34: - def __del__(self): - if self._state == futures._PENDING: - context = { - 'task': self, - 'message': 'Task was destroyed but it is pending!', - } - if self._source_traceback: - context['source_traceback'] = self._source_traceback - self._loop.call_exception_handler(context) - futures.Future.__del__(self) - - def __repr__(self): - info = [] - if self._must_cancel: - info.append('cancelling') - else: - info.append(self._state.lower()) - - info.append(_format_coroutine(self._coro)) - - if self._state == futures._FINISHED: - info.append(self._format_result()) - - if self._callbacks: - info.append(self._format_callbacks()) - - return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) - - def get_stack(self, *, limit=None): - """Return the list of stack frames for this task's coroutine. - - If the coroutine is active, this returns the stack where it is - suspended. If the coroutine has completed successfully or was - cancelled, this returns an empty list. If the coroutine was - terminated by an exception, this returns the list of traceback - frames. - - The frames are always ordered from oldest to newest. - - The optional limit gives the maximum number of frames to - return; by default all available frames are returned. Its - meaning differs depending on whether a stack or a traceback is - returned: the newest frames of a stack are returned, but the - oldest frames of a traceback are returned. (This matches the - behavior of the traceback module.) - - For reasons beyond our control, only one stack frame is - returned for a suspended coroutine. - """ - frames = [] - f = self._coro.gi_frame - if f is not None: - while f is not None: - if limit is not None: - if limit <= 0: - break - limit -= 1 - frames.append(f) - f = f.f_back - frames.reverse() - elif self._exception is not None: - tb = self._exception.__traceback__ - while tb is not None: - if limit is not None: - if limit <= 0: - break - limit -= 1 - frames.append(tb.tb_frame) - tb = tb.tb_next - return frames - - def print_stack(self, *, limit=None, file=None): - """Print the stack or traceback for this task's coroutine. - - This produces output similar to that of the traceback module, - for the frames retrieved by get_stack(). The limit argument - is passed to get_stack(). The file argument is an I/O stream - to which the output goes; by default it goes to sys.stderr. - """ - extracted_list = [] - checked = set() - for f in self.get_stack(limit=limit): - lineno = f.f_lineno - co = f.f_code - filename = co.co_filename - name = co.co_name - if filename not in checked: - checked.add(filename) - linecache.checkcache(filename) - line = linecache.getline(filename, lineno, f.f_globals) - extracted_list.append((filename, lineno, name, line)) - exc = self._exception - if not extracted_list: - print('No stack for %r' % self, file=file) - elif exc is not None: - print('Traceback for %r (most recent call last):' % self, - file=file) - else: - print('Stack for %r (most recent call last):' % self, - file=file) - traceback.print_list(extracted_list, file=file) - if exc is not None: - for line in traceback.format_exception_only(exc.__class__, exc): - print(line, file=file, end='') - - def cancel(self): - """Request this task to cancel itself. - - This arranges for a CancelledError to be thrown into the - wrapped coroutine on the next cycle through the event loop. - The coroutine then has a chance to clean up or even deny - the request using try/except/finally. - - Contrary to Future.cancel(), this does not guarantee that the - task will be cancelled: the exception might be caught and - acted upon, delaying cancellation of the task or preventing it - completely. The task may also return a value or raise a - different exception. - - Immediately after this method is called, Task.cancelled() will - not return True (unless the task was already cancelled). A - task will be marked as cancelled when the wrapped coroutine - terminates with a CancelledError exception (even if cancel() - was not called). - """ - if self.done(): - return False - if self._fut_waiter is not None: - if self._fut_waiter.cancel(): - # Leave self._fut_waiter; it may be a Task that - # catches and ignores the cancellation so we may have - # to cancel it again later. - return True - # It must be the case that self._step is already scheduled. - self._must_cancel = True - return True - - def _step(self, value=None, exc=None): - assert not self.done(), \ - '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc) - if self._must_cancel: - if not isinstance(exc, futures.CancelledError): - exc = futures.CancelledError() - self._must_cancel = False - coro = self._coro - self._fut_waiter = None - - self.__class__._current_tasks[self._loop] = self - # Call either coro.throw(exc) or coro.send(value). - try: - if exc is not None: - result = coro.throw(exc) - elif value is not None: - result = coro.send(value) - else: - result = next(coro) - except StopIteration as exc: - self.set_result(exc.value) - except futures.CancelledError as exc: - super().cancel() # I.e., Future.cancel(self). - except Exception as exc: - self.set_exception(exc) - except BaseException as exc: - self.set_exception(exc) - raise - else: - if isinstance(result, futures.Future): - # Yielded Future must come from Future.__iter__(). - if result._blocking: - result._blocking = False - result.add_done_callback(self._wakeup) - self._fut_waiter = result - if self._must_cancel: - if self._fut_waiter.cancel(): - self._must_cancel = False - else: - self._loop.call_soon( - self._step, None, - RuntimeError( - 'yield was used instead of yield from ' - 'in task {!r} with {!r}'.format(self, result))) - elif result is None: - # Bare yield relinquishes control for one event loop iteration. - self._loop.call_soon(self._step) - elif inspect.isgenerator(result): - # Yielding a generator is just wrong. - self._loop.call_soon( - self._step, None, - RuntimeError( - 'yield was used instead of yield from for ' - 'generator in task {!r} with {}'.format( - self, result))) - else: - # Yielding something else is an error. - self._loop.call_soon( - self._step, None, - RuntimeError( - 'Task got bad yield: {!r}'.format(result))) - finally: - self.__class__._current_tasks.pop(self._loop) - self = None # Needed to break cycles when an exception occurs. - - def _wakeup(self, future): - try: - value = future.result() - except Exception as exc: - # This may also be a cancellation. - self._step(None, exc) - else: - self._step(value, None) - self = None # Needed to break cycles when an exception occurs. - - -# wait() and as_completed() similar to those in PEP 3148. - -FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED -FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION -ALL_COMPLETED = concurrent.futures.ALL_COMPLETED - - -@coroutine -def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): - """Wait for the Futures and coroutines given by fs to complete. - - The sequence futures must not be empty. - - Coroutines will be wrapped in Tasks. - - Returns two sets of Future: (done, pending). - - Usage: - - done, pending = yield from asyncio.wait(fs) - - Note: This does not raise TimeoutError! Futures that aren't done - when the timeout occurs are returned in the second set. - """ - if isinstance(fs, futures.Future) or iscoroutine(fs): - raise TypeError("expect a list of futures, not %s" % type(fs).__name__) - if not fs: - raise ValueError('Set of coroutines/Futures is empty.') - - if loop is None: - loop = events.get_event_loop() - - fs = {async(f, loop=loop) for f in set(fs)} - - if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): - raise ValueError('Invalid return_when value: {}'.format(return_when)) - return (yield from _wait(fs, timeout, return_when, loop)) - - -def _release_waiter(waiter, value=True, *args): - if not waiter.done(): - waiter.set_result(value) - - -@coroutine -def wait_for(fut, timeout, *, loop=None): - """Wait for the single Future or coroutine to complete, with timeout. - - Coroutine will be wrapped in Task. - - Returns result of the Future or coroutine. When a timeout occurs, - it cancels the task and raises TimeoutError. To avoid the task - cancellation, wrap it in shield(). - - Usage: - - result = yield from asyncio.wait_for(fut, 10.0) - - """ - if loop is None: - loop = events.get_event_loop() - - if timeout is None: - return (yield from fut) - - waiter = futures.Future(loop=loop) - timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False) - cb = functools.partial(_release_waiter, waiter, True) - - fut = async(fut, loop=loop) - fut.add_done_callback(cb) - - try: - if (yield from waiter): - return fut.result() - else: - fut.remove_done_callback(cb) - fut.cancel() - raise futures.TimeoutError() - finally: - timeout_handle.cancel() - - -@coroutine -def _wait(fs, timeout, return_when, loop): - """Internal helper for wait() and _wait_for(). - - The fs argument must be a collection of Futures. - """ - assert fs, 'Set of Futures is empty.' - waiter = futures.Future(loop=loop) - timeout_handle = None - if timeout is not None: - timeout_handle = loop.call_later(timeout, _release_waiter, waiter) - counter = len(fs) - - def _on_completion(f): - nonlocal counter - counter -= 1 - if (counter <= 0 or - return_when == FIRST_COMPLETED or - return_when == FIRST_EXCEPTION and (not f.cancelled() and - f.exception() is not None)): - if timeout_handle is not None: - timeout_handle.cancel() - if not waiter.done(): - waiter.set_result(False) - - for f in fs: - f.add_done_callback(_on_completion) - - try: - yield from waiter - finally: - if timeout_handle is not None: - timeout_handle.cancel() - - done, pending = set(), set() - for f in fs: - f.remove_done_callback(_on_completion) - if f.done(): - done.add(f) - else: - pending.add(f) - return done, pending - - -# This is *not* a @coroutine! It is just an iterator (yielding Futures). -def as_completed(fs, *, loop=None, timeout=None): - """Return an iterator whose values are coroutines. - - When waiting for the yielded coroutines you'll get the results (or - exceptions!) of the original Futures (or coroutines), in the order - in which and as soon as they complete. - - This differs from PEP 3148; the proper way to use this is: - - for f in as_completed(fs): - result = yield from f # The 'yield from' may raise. - # Use result. - - If a timeout is specified, the 'yield from' will raise - TimeoutError when the timeout occurs before all Futures are done. - - Note: The futures 'f' are not necessarily members of fs. - """ - if isinstance(fs, futures.Future) or iscoroutine(fs): - raise TypeError("expect a list of futures, not %s" % type(fs).__name__) - loop = loop if loop is not None else events.get_event_loop() - todo = {async(f, loop=loop) for f in set(fs)} - from .queues import Queue # Import here to avoid circular import problem. - done = Queue(loop=loop) - timeout_handle = None - - def _on_timeout(): - for f in todo: - f.remove_done_callback(_on_completion) - done.put_nowait(None) # Queue a dummy value for _wait_for_one(). - todo.clear() # Can't do todo.remove(f) in the loop. - - def _on_completion(f): - if not todo: - return # _on_timeout() was here first. - todo.remove(f) - done.put_nowait(f) - if not todo and timeout_handle is not None: - timeout_handle.cancel() - - @coroutine - def _wait_for_one(): - f = yield from done.get() - if f is None: - # Dummy value from _on_timeout(). - raise futures.TimeoutError - return f.result() # May raise f.exception(). - - for f in todo: - f.add_done_callback(_on_completion) - if todo and timeout is not None: - timeout_handle = loop.call_later(timeout, _on_timeout) - for _ in range(len(todo)): - yield _wait_for_one() - - -@coroutine -def sleep(delay, result=None, *, loop=None): - """Coroutine that completes after a given time (in seconds).""" - future = futures.Future(loop=loop) - h = future._loop.call_later(delay, future.set_result, result) - try: - return (yield from future) - finally: - h.cancel() - - -def async(coro_or_future, *, loop=None): - """Wrap a coroutine in a future. - - If the argument is a Future, it is returned directly. - """ - if isinstance(coro_or_future, futures.Future): - if loop is not None and loop is not coro_or_future._loop: - raise ValueError('loop argument must agree with Future') - return coro_or_future - elif iscoroutine(coro_or_future): - task = Task(coro_or_future, loop=loop) - if task._source_traceback: - del task._source_traceback[-1] - return task - else: - raise TypeError('A Future or coroutine is required') - - -class _GatheringFuture(futures.Future): - """Helper for gather(). - - This overrides cancel() to cancel all the children and act more - like Task.cancel(), which doesn't immediately mark itself as - cancelled. - """ - - def __init__(self, children, *, loop=None): - super().__init__(loop=loop) - self._children = children - - def cancel(self): - if self.done(): - return False - for child in self._children: - child.cancel() - return True - - -def gather(*coros_or_futures, loop=None, return_exceptions=False): - """Return a future aggregating results from the given coroutines - or futures. - - All futures must share the same event loop. If all the tasks are - done successfully, the returned future's result is the list of - results (in the order of the original sequence, not necessarily - the order of results arrival). If *return_exceptions* is True, - exceptions in the tasks are treated the same as successful - results, and gathered in the result list; otherwise, the first - raised exception will be immediately propagated to the returned - future. - - Cancellation: if the outer Future is cancelled, all children (that - have not completed yet) are also cancelled. If any child is - cancelled, this is treated as if it raised CancelledError -- - the outer Future is *not* cancelled in this case. (This is to - prevent the cancellation of one child to cause other children to - be cancelled.) - """ - arg_to_fut = {arg: async(arg, loop=loop) for arg in set(coros_or_futures)} - children = [arg_to_fut[arg] for arg in coros_or_futures] - n = len(children) - if n == 0: - outer = futures.Future(loop=loop) - outer.set_result([]) - return outer - if loop is None: - loop = children[0]._loop - for fut in children: - if fut._loop is not loop: - raise ValueError("futures are tied to different event loops") - outer = _GatheringFuture(children, loop=loop) - nfinished = 0 - results = [None] * n - - def _done_callback(i, fut): - nonlocal nfinished - if outer._state != futures._PENDING: - if fut._exception is not None: - # Mark exception retrieved. - fut.exception() - return - if fut._state == futures._CANCELLED: - res = futures.CancelledError() - if not return_exceptions: - outer.set_exception(res) - return - elif fut._exception is not None: - res = fut.exception() # Mark exception retrieved. - if not return_exceptions: - outer.set_exception(res) - return - else: - res = fut._result - results[i] = res - nfinished += 1 - if nfinished == n: - outer.set_result(results) - - for i, fut in enumerate(children): - fut.add_done_callback(functools.partial(_done_callback, i)) - return outer - - -def shield(arg, *, loop=None): - """Wait for a future, shielding it from cancellation. - - The statement - - res = yield from shield(something()) - - is exactly equivalent to the statement - - res = yield from something() - - *except* that if the coroutine containing it is cancelled, the - task running in something() is not cancelled. From the POV of - something(), the cancellation did not happen. But its caller is - still cancelled, so the yield-from expression still raises - CancelledError. Note: If something() is cancelled by other means - this will still cancel shield(). - - If you want to completely ignore cancellation (not recommended) - you can combine shield() with a try/except clause, as follows: - - try: - res = yield from shield(something()) - except CancelledError: - res = None - """ - inner = async(arg, loop=loop) - if inner.done(): - # Shortcut. - return inner - loop = inner._loop - outer = futures.Future(loop=loop) - - def _done_callback(inner): - if outer.cancelled(): - # Mark inner's result as retrieved. - inner.cancelled() or inner.exception() - return - if inner.cancelled(): - outer.cancel() - else: - exc = inner.exception() - if exc is not None: - outer.set_exception(exc) - else: - outer.set_result(inner.result()) - - inner.add_done_callback(_done_callback) - return outer diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py --- a/Lib/asyncio/locks.py +++ b/Lib/asyncio/locks.py @@ -6,7 +6,7 @@ from . import events from . import futures -from . import tasks +from .coroutines import coroutine class _ContextManager: @@ -112,7 +112,7 @@ """Return True if lock is acquired.""" return self._locked - @tasks.coroutine + @coroutine def acquire(self): """Acquire a lock. @@ -225,7 +225,7 @@ to true again.""" self._value = False - @tasks.coroutine + @coroutine def wait(self): """Block until the internal flag is true. @@ -278,7 +278,7 @@ extra = '{},waiters:{}'.format(extra, len(self._waiters)) return '<{} [{}]>'.format(res[1:-1], extra) - @tasks.coroutine + @coroutine def wait(self): """Wait until notified. @@ -306,7 +306,7 @@ finally: yield from self.acquire() - @tasks.coroutine + @coroutine def wait_for(self, predicate): """Wait until a predicate becomes true. @@ -402,7 +402,7 @@ """Returns True if semaphore can not be acquired immediately.""" return self._value == 0 - @tasks.coroutine + @coroutine def acquire(self): """Acquire a semaphore. diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -10,10 +10,12 @@ if hasattr(socket, 'AF_UNIX'): __all__.extend(['open_unix_connection', 'start_unix_server']) +from . import coroutines from . import events from . import futures from . import protocols from . import tasks +from .coroutines import coroutine _DEFAULT_LIMIT = 2**16 @@ -33,7 +35,7 @@ self.expected = expected -@tasks.coroutine +@coroutine def open_connection(host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """A wrapper for create_connection() returning a (reader, writer) pair. @@ -63,7 +65,7 @@ return reader, writer -@tasks.coroutine +@coroutine def start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """Start a socket server, call back for each client connected. @@ -102,7 +104,7 @@ if hasattr(socket, 'AF_UNIX'): # UNIX Domain Sockets are supported on this platform - @tasks.coroutine + @coroutine def open_unix_connection(path=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """Similar to `open_connection` but works with UNIX Domain Sockets.""" @@ -116,7 +118,7 @@ return reader, writer - @tasks.coroutine + @coroutine def start_unix_server(client_connected_cb, path=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """Similar to `start_server` but works with UNIX Domain Sockets.""" @@ -210,7 +212,7 @@ self._loop) res = self._client_connected_cb(self._stream_reader, self._stream_writer) - if tasks.iscoroutine(res): + if coroutines.iscoroutine(res): tasks.Task(res, loop=self._loop) def connection_lost(self, exc): @@ -373,7 +375,7 @@ 'already waiting for incoming data' % func_name) return futures.Future(loop=self._loop) - @tasks.coroutine + @coroutine def readline(self): if self._exception is not None: raise self._exception @@ -410,7 +412,7 @@ self._maybe_resume_transport() return bytes(line) - @tasks.coroutine + @coroutine def read(self, n=-1): if self._exception is not None: raise self._exception @@ -449,7 +451,7 @@ self._maybe_resume_transport() return data - @tasks.coroutine + @coroutine def readexactly(self, n): if self._exception is not None: raise self._exception diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -8,6 +8,7 @@ from . import protocols from . import streams from . import tasks +from .coroutines import coroutine PIPE = subprocess.PIPE @@ -94,7 +95,7 @@ def returncode(self): return self._transport.get_returncode() - @tasks.coroutine + @coroutine def wait(self): """Wait until the process exit and return the process return code.""" returncode = self._transport.get_returncode() @@ -122,17 +123,17 @@ self._check_alive() self._transport.kill() - @tasks.coroutine + @coroutine def _feed_stdin(self, input): self.stdin.write(input) yield from self.stdin.drain() self.stdin.close() - @tasks.coroutine + @coroutine def _noop(self): return None - @tasks.coroutine + @coroutine def _read_stream(self, fd): transport = self._transport.get_pipe_transport(fd) if fd == 2: @@ -144,7 +145,7 @@ transport.close() return output - @tasks.coroutine + @coroutine def communicate(self, input=None): if input: stdin = self._feed_stdin(input) @@ -164,7 +165,7 @@ return (stdout, stderr) -@tasks.coroutine +@coroutine def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, loop=None, limit=streams._DEFAULT_LIMIT, **kwds): if loop is None: @@ -178,7 +179,7 @@ yield from protocol.waiter return Process(transport, protocol, loop) -@tasks.coroutine +@coroutine def create_subprocess_exec(program, *args, stdin=None, stdout=None, stderr=None, loop=None, limit=streams._DEFAULT_LIMIT, **kwds): diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -1,7 +1,6 @@ """Support for tasks, coroutines and the scheduler.""" -__all__ = ['coroutine', 'Task', - 'iscoroutinefunction', 'iscoroutine', +__all__ = ['Task', 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 'wait', 'wait_for', 'as_completed', 'sleep', 'async', 'gather', 'shield', @@ -11,146 +10,20 @@ import functools import inspect import linecache -import os import sys import traceback import weakref +from . import coroutines from . import events from . import futures +from .coroutines import coroutine from .log import logger -# If you set _DEBUG to true, @coroutine will wrap the resulting -# generator objects in a CoroWrapper instance (defined below). That -# instance will log a message when the generator is never iterated -# over, which may happen when you forget to use "yield from" with a -# coroutine call. Note that the value of the _DEBUG flag is taken -# when the decorator is used, so to be of any use it must be set -# before you define your coroutines. A downside of using this feature -# is that tracebacks show entries for the CoroWrapper.__next__ method -# when _DEBUG is true. -_DEBUG = (not sys.flags.ignore_environment - and bool(os.environ.get('PYTHONASYNCIODEBUG'))) - _PY34 = (sys.version_info >= (3, 4)) _PY35 = (sys.version_info >= (3, 5)) -class CoroWrapper: - # Wrapper for coroutine in _DEBUG mode. - - def __init__(self, gen, func): - assert inspect.isgenerator(gen), gen - self.gen = gen - self.func = func - self._source_traceback = traceback.extract_stack(sys._getframe(1)) - - def __iter__(self): - return self - - def __next__(self): - return next(self.gen) - - def send(self, *value): - # We use `*value` because of a bug in CPythons prior - # to 3.4.1. See issue #21209 and test_yield_from_corowrapper - # for details. This workaround should be removed in 3.5.0. - if len(value) == 1: - value = value[0] - return self.gen.send(value) - - def throw(self, exc): - return self.gen.throw(exc) - - def close(self): - return self.gen.close() - - @property - def gi_frame(self): - return self.gen.gi_frame - - @property - def gi_running(self): - return self.gen.gi_running - - @property - def gi_code(self): - return self.gen.gi_code - - def __del__(self): - # Be careful accessing self.gen.frame -- self.gen might not exist. - gen = getattr(self, 'gen', None) - frame = getattr(gen, 'gi_frame', None) - if frame is not None and frame.f_lasti == -1: - func = events._format_callback(self.func, ()) - tb = ''.join(traceback.format_list(self._source_traceback)) - message = ('Coroutine %s was never yielded from\n' - 'Coroutine object created at (most recent call last):\n' - '%s' - % (func, tb.rstrip())) - logger.error(message) - - -def coroutine(func): - """Decorator to mark coroutines. - - If the coroutine is not yielded from before it is destroyed, - an error message is logged. - """ - if inspect.isgeneratorfunction(func): - coro = func - else: - @functools.wraps(func) - def coro(*args, **kw): - res = func(*args, **kw) - if isinstance(res, futures.Future) or inspect.isgenerator(res): - res = yield from res - return res - - if not _DEBUG: - wrapper = coro - else: - @functools.wraps(func) - def wrapper(*args, **kwds): - w = CoroWrapper(coro(*args, **kwds), func) - if w._source_traceback: - del w._source_traceback[-1] - w.__name__ = func.__name__ - if _PY35: - w.__qualname__ = func.__qualname__ - w.__doc__ = func.__doc__ - return w - - wrapper._is_coroutine = True # For iscoroutinefunction(). - return wrapper - - -def iscoroutinefunction(func): - """Return True if func is a decorated coroutine function.""" - return getattr(func, '_is_coroutine', False) - - -def iscoroutine(obj): - """Return True if obj is a coroutine object.""" - return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj) - - -def _format_coroutine(coro): - assert iscoroutine(coro) - if _PY35: - coro_name = coro.__qualname__ - else: - coro_name = coro.__name__ - - filename = coro.gi_code.co_filename - if coro.gi_frame is not None: - lineno = coro.gi_frame.f_lineno - return '%s() at %s:%s' % (coro_name, filename, lineno) - else: - lineno = coro.gi_code.co_firstlineno - return '%s() done at %s:%s' % (coro_name, filename, lineno) - - class Task(futures.Future): """A coroutine wrapped in a Future.""" @@ -193,7 +66,7 @@ return {t for t in cls._all_tasks if t._loop is loop} def __init__(self, coro, *, loop=None): - assert iscoroutine(coro), repr(coro) # Not a coroutine function! + assert coroutines.iscoroutine(coro), repr(coro) # Not a coroutine function! super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] @@ -225,7 +98,7 @@ else: info.append(self._state.lower()) - info.append(_format_coroutine(self._coro)) + info.append(coroutines._format_coroutine(self._coro)) if self._state == futures._FINISHED: info.append(self._format_result()) @@ -444,7 +317,7 @@ Note: This does not raise TimeoutError! Futures that aren't done when the timeout occurs are returned in the second set. """ - if isinstance(fs, futures.Future) or iscoroutine(fs): + if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs): raise TypeError("expect a list of futures, not %s" % type(fs).__name__) if not fs: raise ValueError('Set of coroutines/Futures is empty.') @@ -566,7 +439,7 @@ Note: The futures 'f' are not necessarily members of fs. """ - if isinstance(fs, futures.Future) or iscoroutine(fs): + if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs): raise TypeError("expect a list of futures, not %s" % type(fs).__name__) loop = loop if loop is not None else events.get_event_loop() todo = {async(f, loop=loop) for f in set(fs)} @@ -624,7 +497,7 @@ if loop is not None and loop is not coro_or_future._loop: raise ValueError('loop argument must agree with Future') return coro_or_future - elif iscoroutine(coro_or_future): + elif coroutines.iscoroutine(coro_or_future): task = Task(coro_or_future, loop=loop) if task._source_traceback: del task._source_traceback[-1] diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py --- a/Lib/asyncio/test_utils.py +++ b/Lib/asyncio/test_utils.py @@ -27,6 +27,7 @@ from . import futures from . import selectors from . import tasks +from .coroutines import coroutine if sys.platform == 'win32': # pragma: no cover @@ -43,7 +44,7 @@ def run_briefly(loop): - @tasks.coroutine + @coroutine def once(): pass gen = once() diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -16,8 +16,8 @@ from . import constants from . import events from . import selector_events -from . import tasks from . import transports +from .coroutines import coroutine from .log import logger @@ -147,7 +147,7 @@ extra=None): return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) - @tasks.coroutine + @coroutine def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): @@ -164,7 +164,7 @@ def _child_watcher_callback(self, pid, returncode, transp): self.call_soon_threadsafe(transp._process_exited, returncode) - @tasks.coroutine + @coroutine def create_unix_connection(self, protocol_factory, path, *, ssl=None, sock=None, server_hostname=None): @@ -199,7 +199,7 @@ sock, protocol_factory, ssl, server_hostname) return transport, protocol - @tasks.coroutine + @coroutine def create_unix_server(self, protocol_factory, path=None, *, sock=None, backlog=100, ssl=None): if isinstance(ssl, bool): diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -14,8 +14,9 @@ from . import selector_events from . import tasks from . import windows_utils +from . import _overlapped +from .coroutines import coroutine from .log import logger -from . import _overlapped __all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', @@ -129,7 +130,7 @@ def _socketpair(self): return windows_utils.socketpair() - @tasks.coroutine + @coroutine def create_pipe_connection(self, protocol_factory, address): f = self._proactor.connect_pipe(address) pipe = yield from f @@ -138,7 +139,7 @@ extra={'addr': address}) return trans, protocol - @tasks.coroutine + @coroutine def start_serving_pipe(self, protocol_factory, address): server = PipeServer(address) @@ -172,7 +173,7 @@ self.call_soon(loop) return [server] - @tasks.coroutine + @coroutine def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): @@ -258,7 +259,7 @@ conn.settimeout(listener.gettimeout()) return conn, conn.getpeername() - @tasks.coroutine + @coroutine def accept_coro(future, conn): # Coroutine closing the accept socket if the future is cancelled try: diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -11,7 +11,7 @@ from unittest import mock import asyncio -from asyncio import tasks +from asyncio import coroutines from asyncio import test_utils @@ -193,7 +193,7 @@ # attribute). coro_name = 'notmuch' coro_qualname = 'TaskTests.test_task_repr_coro_decorator.<locals>.notmuch' - elif tasks._DEBUG: + elif coroutines._DEBUG: # In debug mode, @coroutine decorator uses CoroWrapper which gets # its name (__name__ attribute) from the wrapped coroutine # function. @@ -1475,23 +1475,23 @@ self.assertIsNone(gen.gi_frame) # Save debug flag. - old_debug = asyncio.tasks._DEBUG + old_debug = asyncio.coroutines._DEBUG try: # Test with debug flag cleared. - asyncio.tasks._DEBUG = False + asyncio.coroutines._DEBUG = False check() # Test with debug flag set. - asyncio.tasks._DEBUG = True + asyncio.coroutines._DEBUG = True check() finally: # Restore original debug flag. - asyncio.tasks._DEBUG = old_debug + asyncio.coroutines._DEBUG = old_debug def test_yield_from_corowrapper(self): - old_debug = asyncio.tasks._DEBUG - asyncio.tasks._DEBUG = True + old_debug = asyncio.coroutines._DEBUG + asyncio.coroutines._DEBUG = True try: @asyncio.coroutine def t1(): @@ -1511,7 +1511,7 @@ val = self.loop.run_until_complete(task) self.assertEqual(val, (1, 2, 3)) finally: - asyncio.tasks._DEBUG = old_debug + asyncio.coroutines._DEBUG = old_debug def test_yield_from_corowrapper_send(self): def foo(): @@ -1519,7 +1519,7 @@ return a def call(arg): - cw = asyncio.tasks.CoroWrapper(foo(), foo) + cw = asyncio.coroutines.CoroWrapper(foo(), foo) cw.send(None) try: cw.send(arg) @@ -1534,7 +1534,7 @@ def test_corowrapper_weakref(self): wd = weakref.WeakValueDictionary() def foo(): yield from [] - cw = asyncio.tasks.CoroWrapper(foo(), foo) + cw = asyncio.coroutines.CoroWrapper(foo(), foo) wd['cw'] = cw # Would fail without __weakref__ slot. cw.gen = None # Suppress warning from __del__. @@ -1580,16 +1580,16 @@ }) mock_handler.reset_mock() - @mock.patch('asyncio.tasks.logger') + @mock.patch('asyncio.coroutines.logger') def test_coroutine_never_yielded(self, m_log): - debug = asyncio.tasks._DEBUG + debug = asyncio.coroutines._DEBUG try: - asyncio.tasks._DEBUG = True + asyncio.coroutines._DEBUG = True @asyncio.coroutine def coro_noop(): pass finally: - asyncio.tasks._DEBUG = debug + asyncio.coroutines._DEBUG = debug tb_filename = __file__ tb_lineno = sys._getframe().f_lineno + 1 @@ -1695,8 +1695,8 @@ def test_env_var_debug(self): code = '\n'.join(( - 'import asyncio.tasks', - 'print(asyncio.tasks._DEBUG)')) + 'import asyncio.coroutines', + 'print(asyncio.coroutines._DEBUG)')) # Test with -E to not fail if the unit test was run with # PYTHONASYNCIODEBUG set to a non-empty string -- Repository URL: http://hg.python.org/cpython
participants (1)
-
victor.stinner