[Python-Dev] Trial balloon: microthreads library in stdlib

Phillip J. Eby pje at telecommunity.com
Wed Feb 14 01:27:38 CET 2007


At 08:41 PM 2/13/2007 +0000, glyph at divmod.com wrote:
>  and the microthreading features being discussed here are a trivial hack 
> somewhere in its mainloop machinery, not an entirely new subsystem that 
> it should be implemented in terms of.

It doesn't even require hacking the mainloop; it can be done entirely in 
userspace.  Here's some code I wrote a few months ago but never got around 
to using it.  (Or testing it, for that matter, so it may not work, or even 
compile!)

It requires Python 2.5 and the "simplegeneric" package from the Cheeseshop, 
and it allows you to "spawn" generators to create independently-running 
pseudothreads.  These pseudothreads can yield to Deferreds, "returning" the 
value or failure thereof.  (Failures cause the error to be raises such that 
it appears to happen at the yield point.)

There are also a few other yield targets like "Return" (to return a value 
to a calling co-routine), "Pause" (to delay resumption), and "with_timeout" 
(to wrap some other yieldable in a timeout that raises TimeoutError).  Enjoy!


import types
from twisted.internet import reactor
from twisted.internet.defer import Deferred, TimeoutError
from twisted.python import failure
from simplegeneric import generic
from functools import partial

__all__ = [
     # user APIs:
     'spawn', 'Pause', 'Return', 'with_timeout',

     # extension APIs:
     'schedule', 'resume', 'throw', 'current_task', 'yield_to', 'yieldable',
]


def spawn(geniter, delay=0):
     """Create a new task and schedule it for execution

     Usage::

         spawn(someGeneratorFunc(args))

     The given generator-iterator will be run by the Twisted reactor after
     `delay` seconds have passed (0 by default).  The return value of this
     function is a "task" object that can be passed to the ``schedule()``,
     ``throw()``, and ``yield_to()`` APIs.
     """
     task = [geniter]
     schedule(task, delay)
     return task


def schedule(task, delay=0, value=None):
     """Schedule `task` to resume after `delay` seconds (w/optional `value`)"""
     if task:
         return reactor.callLater(delay, resume, task, value)
     # XXX warn if non-None value sent to empty task?


def resume(task, value=None):
     """Resume `task` immediately, returning `value` for the current yield"""
     if task:
         _invoke(task, partial(task[-1].send, value))
     # XXX warn if non-None value sent to empty task?


def throw(task, exc):
     """Raise `exc` tuple in `task` and immediately resume its execution"""
     if not task:
         # Propagate exception out of a failed task
         raise exc[0], exc[1], exc[2]
     _invoke(task, partial(task[-1].throw, *exc))


def _invoke(task, method):
     """Invoke method() in context of `task`, yielding to the return value"""
     try:
         value = method()
     except StopIteration:
         task.pop()      # it's an exit with no return value
         resume(task)    # so send None up the stack
     except:
         task.pop()      # Propagate exception up the stack
         throw(task, sys.exc_info())
     else:
         # Handle a yielded value
         yield_to(value, task)


@generic
def yield_to(value, task):
     """Handle a yielded value

     To register special handlers, use ``@yield_to.when_type()`` or
     or ``@yield_to.when_object()``.  (See the ``simplegeneric`` docs for
     details.)
     """
     raise TypeError(
         "Unrecognized yield value (maybe missing Return()?): %r" % (value,)
     )

@yield_to.when_type(defer.Deferred):
def _yield_to_deferred(value, task):
     """Return a deferred value immediately or pause until fired"""

     def _resume(value):
         if isinstance(value, failure.Failure):
             throw(task, (type(value), value, None))  # XXX extract error?
         else:
             resume(task, value)
         return value    # don't alter the deferred's value

     # This will either fire immediately, or delay until the appropriate time
     value.addCallbacks(_resume, _resume)


@yield_to.when_type(types.GeneratorType)
def _yield_to_subgenerator(value, task):
     """Call a sub-generator, putting it on the task's call stack"""
     task.append(value)
     schedule(task)


def yieldable(f):
     """Declare that a function may be yielded to

     Usage::

         @yieldable
         def do_something(task):
             # code that will be invoked upon (yield do_something)

         (yield do_something)

     The function's return value will be ignored, and errors in it are NOT
     caught!  It must instead use the ``resume()``, ``schedule()``,
     ``throw()``, or ``yield_to()`` APIs to communicate with the yielding task
     (which is provided as the sole argument.)
     """
     f.__yieldable__ = True
     return f


@yield_to.when_type(types.FunctionType)
def _yield_to_function(value, task):
     """Invoke task-management function"""
     if getattr(value, '__yieldable__', None):
         return value(task)  # function is marked as yieldable
     else:
         raise TypeError(
             "Function is not marked yieldable (maybe missing Return()?): %r" %
             (value,)
         )


@yieldable
def current_task(task):
     """Yield this function (don't call it) to obtain the current task

     Usage: ``task = (yield current_task)``

     The yielding coroutine is immediately resumed; no task switching will
     occur.
     """
     resume(task, task)


class Pause(object):
     """Object that can be yielded to temporarily pause execution

     Usage::
         yield Pause     # allow other tasks to run, then resume ASAP
         yield Pause(5)  # don't run for at least five seconds
     """
     seconds = 0
     def __init__(self, seconds=0):
         self.seconds = seconds

     def __repr__(self):
         return 'Pause(%s)' % self.seconds

@yield_to.when_type(Pause)
@yield_to.when_object(Pause)
def _yield_to_pause(value, task):
     """Allow other tasks to run, by moving this task to the end of the 
queue"""
     schedule(task, value.seconds)


class Return(object):
     """Return a value to your caller

     Usage::

         yield Return(42)      # returns 42 to calling coroutine
         yield Return          # returns None to calling coroutine
     """

     value = None

     def __init__(self, value):
         self.value = value

     def __repr__(self):
         return 'Return(%s)' % self.value

@yield_to.when_type(Return)
@yield_to.when_object(Return)
def _yield_to_return(value, task):
     """Return a value to the caller"""
     task.pop().close()  # ensure any ``finally`` clauses are run
     resume(task, value.value)


def with_timeout(seconds, value):
     """Raise TimeoutError if `value` doesn't resume before `seconds` elapse

     Example::

         result = yield with_timeout(30, something(whatever))

     This is equivalent to ``result = yield something(whatever)``, except that
     if the current task isn't resumed in 30 seconds or less, a
     ``defer.TimeoutError`` will be raised in the blocked task.  (Note: if the

     The `value` passed to this routine may be any yieldable value, although it
     makes no sense to yield a value that will just be returned to a parent
     coroutine.

     Note that the ``something(whatever)`` subtask may trap the 
``TimeoutError``
     and misinterpret its significance, so generally speaking you should use
     this only for relatively low-level operations, and expose a timeout
     parameter to your callers, to allow them to simply specify the timeout
     rather than using this wrapper.

     In fact, this is what most Twisted APIs do, providing a ``timeout``
     keyword argument that you should use instead of this wrapper, as they will
     then raise ``TimeoutError`` automatically after the specified time 
expires.
     So, if you're calling such an API, don't use this wrapper.
     """
     # raise a timeout error in this task after seconds
     delayedCall = reactor.callLater(
         seconds,
         throw, (yield current_task), (TimeoutError, TimeoutError(), None)
     )

     try:
         yield Return((yield value))   # call the subtask and return its result
     finally:
         # Ensure timeout call doesn't fire after we've exited
         if delayedCall.active():
             delayedCall.cancel()



More information about the Python-Dev mailing list