[Python-Dev] Trial balloon: microthreads library in stdlib
Phillip J. Eby
pje at telecommunity.com
Wed Feb 14 01:27:38 CET 2007
At 08:41 PM 2/13/2007 +0000, glyph at divmod.com wrote:
> and the microthreading features being discussed here are a trivial hack
> somewhere in its mainloop machinery, not an entirely new subsystem that
> it should be implemented in terms of.
It doesn't even require hacking the mainloop; it can be done entirely in
userspace. Here's some code I wrote a few months ago but never got around
to using it. (Or testing it, for that matter, so it may not work, or even
compile!)
It requires Python 2.5 and the "simplegeneric" package from the Cheeseshop,
and it allows you to "spawn" generators to create independently-running
pseudothreads. These pseudothreads can yield to Deferreds, "returning" the
value or failure thereof. (Failures cause the error to be raises such that
it appears to happen at the yield point.)
There are also a few other yield targets like "Return" (to return a value
to a calling co-routine), "Pause" (to delay resumption), and "with_timeout"
(to wrap some other yieldable in a timeout that raises TimeoutError). Enjoy!
import types
from twisted.internet import reactor
from twisted.internet.defer import Deferred, TimeoutError
from twisted.python import failure
from simplegeneric import generic
from functools import partial
__all__ = [
# user APIs:
'spawn', 'Pause', 'Return', 'with_timeout',
# extension APIs:
'schedule', 'resume', 'throw', 'current_task', 'yield_to', 'yieldable',
]
def spawn(geniter, delay=0):
"""Create a new task and schedule it for execution
Usage::
spawn(someGeneratorFunc(args))
The given generator-iterator will be run by the Twisted reactor after
`delay` seconds have passed (0 by default). The return value of this
function is a "task" object that can be passed to the ``schedule()``,
``throw()``, and ``yield_to()`` APIs.
"""
task = [geniter]
schedule(task, delay)
return task
def schedule(task, delay=0, value=None):
"""Schedule `task` to resume after `delay` seconds (w/optional `value`)"""
if task:
return reactor.callLater(delay, resume, task, value)
# XXX warn if non-None value sent to empty task?
def resume(task, value=None):
"""Resume `task` immediately, returning `value` for the current yield"""
if task:
_invoke(task, partial(task[-1].send, value))
# XXX warn if non-None value sent to empty task?
def throw(task, exc):
"""Raise `exc` tuple in `task` and immediately resume its execution"""
if not task:
# Propagate exception out of a failed task
raise exc[0], exc[1], exc[2]
_invoke(task, partial(task[-1].throw, *exc))
def _invoke(task, method):
"""Invoke method() in context of `task`, yielding to the return value"""
try:
value = method()
except StopIteration:
task.pop() # it's an exit with no return value
resume(task) # so send None up the stack
except:
task.pop() # Propagate exception up the stack
throw(task, sys.exc_info())
else:
# Handle a yielded value
yield_to(value, task)
@generic
def yield_to(value, task):
"""Handle a yielded value
To register special handlers, use ``@yield_to.when_type()`` or
or ``@yield_to.when_object()``. (See the ``simplegeneric`` docs for
details.)
"""
raise TypeError(
"Unrecognized yield value (maybe missing Return()?): %r" % (value,)
)
@yield_to.when_type(defer.Deferred):
def _yield_to_deferred(value, task):
"""Return a deferred value immediately or pause until fired"""
def _resume(value):
if isinstance(value, failure.Failure):
throw(task, (type(value), value, None)) # XXX extract error?
else:
resume(task, value)
return value # don't alter the deferred's value
# This will either fire immediately, or delay until the appropriate time
value.addCallbacks(_resume, _resume)
@yield_to.when_type(types.GeneratorType)
def _yield_to_subgenerator(value, task):
"""Call a sub-generator, putting it on the task's call stack"""
task.append(value)
schedule(task)
def yieldable(f):
"""Declare that a function may be yielded to
Usage::
@yieldable
def do_something(task):
# code that will be invoked upon (yield do_something)
(yield do_something)
The function's return value will be ignored, and errors in it are NOT
caught! It must instead use the ``resume()``, ``schedule()``,
``throw()``, or ``yield_to()`` APIs to communicate with the yielding task
(which is provided as the sole argument.)
"""
f.__yieldable__ = True
return f
@yield_to.when_type(types.FunctionType)
def _yield_to_function(value, task):
"""Invoke task-management function"""
if getattr(value, '__yieldable__', None):
return value(task) # function is marked as yieldable
else:
raise TypeError(
"Function is not marked yieldable (maybe missing Return()?): %r" %
(value,)
)
@yieldable
def current_task(task):
"""Yield this function (don't call it) to obtain the current task
Usage: ``task = (yield current_task)``
The yielding coroutine is immediately resumed; no task switching will
occur.
"""
resume(task, task)
class Pause(object):
"""Object that can be yielded to temporarily pause execution
Usage::
yield Pause # allow other tasks to run, then resume ASAP
yield Pause(5) # don't run for at least five seconds
"""
seconds = 0
def __init__(self, seconds=0):
self.seconds = seconds
def __repr__(self):
return 'Pause(%s)' % self.seconds
@yield_to.when_type(Pause)
@yield_to.when_object(Pause)
def _yield_to_pause(value, task):
"""Allow other tasks to run, by moving this task to the end of the
queue"""
schedule(task, value.seconds)
class Return(object):
"""Return a value to your caller
Usage::
yield Return(42) # returns 42 to calling coroutine
yield Return # returns None to calling coroutine
"""
value = None
def __init__(self, value):
self.value = value
def __repr__(self):
return 'Return(%s)' % self.value
@yield_to.when_type(Return)
@yield_to.when_object(Return)
def _yield_to_return(value, task):
"""Return a value to the caller"""
task.pop().close() # ensure any ``finally`` clauses are run
resume(task, value.value)
def with_timeout(seconds, value):
"""Raise TimeoutError if `value` doesn't resume before `seconds` elapse
Example::
result = yield with_timeout(30, something(whatever))
This is equivalent to ``result = yield something(whatever)``, except that
if the current task isn't resumed in 30 seconds or less, a
``defer.TimeoutError`` will be raised in the blocked task. (Note: if the
The `value` passed to this routine may be any yieldable value, although it
makes no sense to yield a value that will just be returned to a parent
coroutine.
Note that the ``something(whatever)`` subtask may trap the
``TimeoutError``
and misinterpret its significance, so generally speaking you should use
this only for relatively low-level operations, and expose a timeout
parameter to your callers, to allow them to simply specify the timeout
rather than using this wrapper.
In fact, this is what most Twisted APIs do, providing a ``timeout``
keyword argument that you should use instead of this wrapper, as they will
then raise ``TimeoutError`` automatically after the specified time
expires.
So, if you're calling such an API, don't use this wrapper.
"""
# raise a timeout error in this task after seconds
delayedCall = reactor.callLater(
seconds,
throw, (yield current_task), (TimeoutError, TimeoutError(), None)
)
try:
yield Return((yield value)) # call the subtask and return its result
finally:
# Ensure timeout call doesn't fire after we've exited
if delayedCall.active():
delayedCall.cancel()
More information about the Python-Dev
mailing list