[Python-Dev] Minimal async event loop and async utilities (Was: PEP 492: async/await in Python; version 4)

PJ Eby pje at telecommunity.com
Fri May 15 19:03:41 CEST 2015

On Mon, May 11, 2015 at 6:05 PM, Guido van Rossum <guido at python.org> wrote:
> OTOH you may look at micropython's uasyncio -- IIRC it doesn't have Futures
> and it definitely has I/O waiting.

Here's a sketch of an *extremely* minimal main loop that can do I/O
without Futures, and might be suitable as a PEP example.  (Certainly,
it would be hard to write a *simpler* example than this, since it
doesn't even use any *classes* or require any specially named methods,
works with present-day generators, and is (I think) both 2.x/3.x

    coroutines = []     # round-robin of currently "running" coroutines

    def schedule(coroutine, val=None, err=None):
        coroutines.insert(0, (coroutine, val, err))

    def runLoop():
        while coroutines:
            (coroutine, val, err) = coroutines.pop()
                if err is not None:
                    suspend = coroutine.throw(err)
                    suspend = coroutine.send(val)
            except StopIteration:
                # coroutine is finished, so don't reschedule it

            except Exception:
                # framework-specific detail  (i.e., log it, send
                # to an error handling coroutine, or just stop the program
                # Here, we just ignore it and stop the coroutine

                if hasattr(suspend, '__call__') and suspend(coroutine):
                    # put it back on the round-robin list

To use it, `schedule()` one or more coroutines, then call `runLoop()`,
which will run as long as there are things to do.  Each coroutine
scheduled must yield *thunks*: callable objects that take a coroutine
as a parameter, and return True if the coroutine should be suspended,
or False if it should continue to run.  If the thunk returns true,
that means the thunk has taken responsibility for arranging to
`schedule()` the coroutine with a value or error when it's time to
send it the result of the suspension.

You might be asking, "wait, but where's the I/O?"  Why, in a
coroutine, of course...

    readers = {}
    writers = {}
    timers = []

    def readable(fileno):
        """yield readable(fileno) resumes when fileno is readable"""
        def suspend(coroutine):
            readers[fileno] = coroutine
            return True
        return suspend

    def writable(fileno):
        """yield writable(fileno) resumes when fileno is writable"""
        def suspend(coroutine):
            writers[fileno] = coroutine
            return True
        return suspend

    def sleepFor(seconds):
        """yield sleepFor(seconds) resumes after that much time"""
        return suspendUntil(time.time() + seconds)

    def suspendUntil(timestamp):
        """yield suspendUntil(timestamp) resumes when that time is reached"""
        def suspend(coroutine)
            heappush(timers, (timestamp, coroutine)
        return suspend

   def doIO():
        while coroutines or readers or writers or timers:

            # Resume scheduled tasks
            while timers and timers[0][0] <= time.time():
                ts, coroutine = heappop(timers)

            if readers or writers:
                if coroutines:
                    # Other tasks are running; use minimal timeout
                    timeout = 0.001
                else if timers:
                    timeout = max(timers[0][0] - time.time(), 0.001)
                    timeout = 0     # take as long as necessary
                r, w, e = select(readers, writers, [], timeout)
                for rr in r: schedule(readers.pop(rr))
                for ww in w: schedule(writers.pop(ww))

            yield   # allow other coroutines to run

    schedule(doIO())  # run the I/O loop as a coroutine

(This is painfully incomplete for a real framework, but it's a rough
sketch of how one of peak.events' first drafts worked, circa early

Basically, you just need a coroutine whose job is to resume coroutines
whose scheduled time has arrived, or whose I/O is ready.  And of
course, some data structures to keep track of such things, and an API
to update the data structures and suspend the coroutines.  The I/O
loop exits once there are no more running tasks and nothing waiting on
I/O...  which will also exit the runLoop.  (A bit like a miniature
version of NodeJS for Python.)

And, while you need to preferably have only *one* such I/O coroutine
(to prevent busy-waiting), the I/O coroutine is completely
replaceable.  All that's required to implement one is that the core
runloop expose the count of active coroutines.  (Notice that, apart
from checking the length of `coroutines`, the I/O loop shown above
uses only the public `schedule()` API and the exposed thunk-suspension
protocol to do its thing.)

Also, note that you *can* indeed have multiple I/O coroutines running
at the same time, as long as you don't mind busy-waiting.  In fact,
you can refactor this to move the time-based scheduling inside the
runloop, and expose the "time until next task" and "number of running
non-I/O coroutines" to allow multiple I/O waiters to co-ordinate and
avoid busy-waiting.  (A later version of peak.events did this, though
it really wasn't to allow multiple I/O waiters, so much as to simplify
I/O waiters by providing a core time-scheduler, and to support
simulated time for running tests.)

So, there's definitely no requirement for I/O to be part of a "core"
runloop system.  The overall approach is *extremely* open to
extension, hardcodes next to nothing, and is super-easy to write new
yieldables for, since they need only have a method (or function) that
returns a suspend function.

At the time I *first* implemented this approach in '03/'04, I hadn't
thought of using plain functions as suspend targets; I used objects
with a `shouldSupend()` method.  But in fairness, I was working with
Python 2.2 and closures were still a pretty new feature back then.

Since then, though, I've seen this approach implemented elsewhere
using closures in almost exactly this way.  For example, the `co`
library for Javascript implements almost exactly the above sketch's
approach, in not much more code.  It just uses the built-in Javascript
event loop facilities, and supports yielding other things besides
thunks.  (Its thunks also don't return a value, and take a callback
rather than a coroutine.  But these are superficial differences.)

This approach is super-flexible in practice, as there are a ton of
add-on libraries for `co` that implement their control flow using
these thunks.  You can indeed fully generalize control flow in such
terms, without the need for futures or similar objects.  For example,
if you want to provide sugar for yielding to futures or other types of
objects, you just write a thunk-returning function or method, e.g.:

    def await_future(future):
        def suspend(coroutine):
            def resume(future):
                err = future.exception()
                if err:
                    schedule(coroutine, None, future.exception())
                    schedule(coroutine, future.result())
            return True
        return suspend

So `yield await_future(someFuture)` will arrange for suspension until
the future is ready.  Libraries or frameworks can also be written that
wrap a generator with one that provides automatic translation to
thunks for a variety of types or protocols.  Similarly, you can write
functions that take multiple awaitables, or that provide cancellation,
etc. on top of thunks.

More information about the Python-Dev mailing list