[Python-checkins] cpython (3.5): asyncio: Sync with the upstream

yury.selivanov python-checkins at python.org
Thu Sep 15 13:27:47 EDT 2016


https://hg.python.org/cpython/rev/bf07ff6d072e
changeset:   103814:bf07ff6d072e
branch:      3.5
parent:      103800:423ad3b14ee1
user:        Yury Selivanov <yury at magic.io>
date:        Thu Sep 15 13:10:51 2016 -0400
summary:
  asyncio: Sync with the upstream

files:
  Lib/asyncio/base_events.py |  60 ++++++++++++++++++++++++++
  Lib/asyncio/events.py      |   4 +
  2 files changed, 64 insertions(+), 0 deletions(-)


diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -28,6 +28,7 @@
 import traceback
 import sys
 import warnings
+import weakref
 
 from . import compat
 from . import coroutines
@@ -242,6 +243,17 @@
         self._task_factory = None
         self._coroutine_wrapper_set = False
 
+        if hasattr(sys, 'get_asyncgen_hooks'):
+            # Python >= 3.6
+            # A weak set of all asynchronous generators that are
+            # being iterated by the loop.
+            self._asyncgens = weakref.WeakSet()
+        else:
+            self._asyncgens = None
+
+        # Set to True when `loop.shutdown_asyncgens` is called.
+        self._asyncgens_shutdown_called = False
+
     def __repr__(self):
         return ('<%s running=%s closed=%s debug=%s>'
                 % (self.__class__.__name__, self.is_running(),
@@ -333,6 +345,48 @@
         if self._closed:
             raise RuntimeError('Event loop is closed')
 
+    def _asyncgen_finalizer_hook(self, agen):
+        self._asyncgens.discard(agen)
+        if not self.is_closed():
+            self.create_task(agen.aclose())
+
+    def _asyncgen_firstiter_hook(self, agen):
+        if self._asyncgens_shutdown_called:
+            warnings.warn(
+                "asynchronous generator {!r} was scheduled after "
+                "loop.shutdown_asyncgens() call".format(agen),
+                ResourceWarning, source=self)
+
+        self._asyncgens.add(agen)
+
+    @coroutine
+    def shutdown_asyncgens(self):
+        """Shutdown all active asynchronous generators."""
+        self._asyncgens_shutdown_called = True
+
+        if self._asyncgens is None or not len(self._asyncgens):
+            # If Python version is <3.6 or we don't have any asynchronous
+            # generators alive.
+            return
+
+        closing_agens = list(self._asyncgens)
+        self._asyncgens.clear()
+
+        shutdown_coro = tasks.gather(
+            *[ag.aclose() for ag in closing_agens],
+            return_exceptions=True,
+            loop=self)
+
+        results = yield from shutdown_coro
+        for result, agen in zip(results, closing_agens):
+            if isinstance(result, Exception):
+                self.call_exception_handler({
+                    'message': 'an error occurred during closing of '
+                               'asynchronous generator {!r}'.format(agen),
+                    'exception': result,
+                    'asyncgen': agen
+                })
+
     def run_forever(self):
         """Run until stop() is called."""
         self._check_closed()
@@ -340,6 +394,10 @@
             raise RuntimeError('Event loop is running.')
         self._set_coroutine_wrapper(self._debug)
         self._thread_id = threading.get_ident()
+        if self._asyncgens is not None:
+            old_agen_hooks = sys.get_asyncgen_hooks()
+            sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
+                                   finalizer=self._asyncgen_finalizer_hook)
         try:
             while True:
                 self._run_once()
@@ -349,6 +407,8 @@
             self._stopping = False
             self._thread_id = None
             self._set_coroutine_wrapper(False)
+            if self._asyncgens is not None:
+                sys.set_asyncgen_hooks(*old_agen_hooks)
 
     def run_until_complete(self, future):
         """Run until the Future is done.
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -248,6 +248,10 @@
         """
         raise NotImplementedError
 
+    def shutdown_asyncgens(self):
+        """Shutdown all active asynchronous generators."""
+        raise NotImplementedError
+
     # Methods scheduling callbacks.  All these return Handles.
 
     def _timer_handle_cancelled(self, handle):

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list