I am using twisted trial to run test cases for an application. The
application however uses stackless python and has a custom stackless
reactor. I implemented this reactor like this...
-------------------- stacklessreactor.py -----------------------
# Use epoll() as our base reactor
from twisted.internet.epollreactor import EPollReactor as StacklessBaseReactor
# seconds between running the greenthreads. 0.0 for flat out 100% CPU
STACKLESS_MAX_PUMP_RATE = 0.1
"""This reactor does the stackless greenthread pumping in the main thread, interwoven with the reactor pump"""
def doIteration(self, timeout):
"""Calls the base reactors doIteration, and then fires off all the stackless threads"""
if timeout > STACKLESS_MAX_PUMP_RATE:
timeout = STACKLESS_MAX_PUMP_RATE
Install the stackless() reactor.
p = StacklessReactor()
from twisted.internet.main import installReactor
And I install this as my reactor in my application with...
...placed right at the top of my .tac python file. And this all works.
Running the app with twistd, the custom reactor is installed and is used
as the reactor for the app.
Now however, I come to write tests and run them with trial. I *need* the
tests to be run under the stackless reactor or things simply wont work
(a lot of the code I need to test are stackless tasklets).
When I go "/usr/local/stackless/bin/trial --help-reactors" I get the
kqueue kqueue(2)-based reactor.
win32 Win32 WaitForMultipleObjects-based reactor.
epoll epoll(4)-based reactor.
iocp Win32 IO Completion Ports-based reactor.
gtk Gtk1 integration reactor.
cf CoreFoundation integration reactor.
gtk2 Gtk2 integration reactor.
default The best reactor for the current platform.
debug-gui Semi-functional debugging/introspection reactor.
poll poll(2)-based reactor.
glib2 GLib2 event-loop integration reactor.
select select(2)-based reactor.
wx wxPython integration reactor.
qt QT integration reactor
One of these I can use by passing in --reactor=name.
So the question is, is there a way of getting the trial framework to use
my custom reactor? Is there a way to get my reactor into that list
somehow? Is this not a supported feature of trial?
And... if this isn't a supported feature, what is the best way to get a
TestCase that will run under that reactor?
Look forward to any help people can offer me.
With kind regards
> Since this is a generator function, it will automatically raise
> StopIteration once control-flow falls off the end of the function, so your
> while-loop could just be written:
Ah yes, thanks a lot.
These days I often find myself writing code to talk to services that are
periodically briefly unavailable. An error of some kind occurs and the
correct (and documented) action to take is just to retry the original call.
Examples include using Amazon's S3 service and the Twitter API. In both of
these services, transient failures happen fairly frequently.
So I wrote the class below to retry calls, and tried to make it fairly
general. I'd be happy to hear comments on it, because it's pretty simple
and if it can be made bullet proof I imagine others will use it too.
First off, here's the class that handles the calling:
from twisted.internet import reactor, defer, task
from twisted.python import log, failure
"""Calls a function repeatedly, passing it args and kw args. Failures
are passed to a user-supplied failure testing function. If the failure
is ignored, the function is called again after a delay whose duration
is obtained from a user-supplied iterator. The start method (below)
returns a deferred that fires with the eventual non-error result of
calling the supplied function, or fires its errback if no successful
result can be obtained before the delay backoff iterator raises
def __init__(self, f, *args, **kw):
self._f = f
self._args = args
self._kw = kw
def _err(self, fail):
fail = self._failureTester(fail)
if isinstance(fail, failure.Failure):
log.msg('RetryingCall: Ignoring %r' % (fail,))
delay = self._backoffIterator.next()
log.msg('StopIteration in RetryingCall: ran out of attempts.')
d = task.deferLater(reactor, delay,
self._f, *self._args, **self._kw)
def start(self, backoffIterator=None, failureTester=None):
self._backoffIterator = backoffIterator or simpleBackoffIterator()
self._failureTester = failureTester or (lambda f: f)
self._deferred = defer.Deferred()
You call the constructor with function and its args. When you call start()
you get back a deferred that eventually fires with the result of the call,
or an error. BTW, I called it "start" to mirror task.LoopingCall.
There's a helper function for producing successive inter-call delays:
from operator import mul
from functools import partial
def simpleBackoffIterator(maxResults=10, maxDelay=5.0, now=True,
assert maxResults > 0
remaining = maxResults
delay = initDelay
incFunc = incFunc or partial(mul, 2.0)
remaining -= 1
if remaining == 0:
yield (delay if delay < maxDelay else maxDelay)
delay = incFunc(delay)
remaining -= 1
By default this will generate the sequence of inter-call delays 0.0, 0.01,
0.02, 0.04, 0.08, 0.16, 0.32, 0.64, 1.28, 2.56 and it should be easy to see
how you could write your own. Or you can just supply a list, etc. When the
backoff iterator finishes, the RetryingCall class gives up on trying to get
a non-error result from the function. In that case you just get a
StopIteration exception in the failure that start() deferred returns (I was
originally returning the original failure, but decided to simplify. If you
want that, you can keep it yourself in an error tracking class, see below).
You get to specify a function for testing failures. If it ever raises or
returns a failure, the start() deferred's errback is called. The failure
tester can just ignore whatever failures should be considered transient.
So, for example, if you were calling S3 and wanted to ignore 504 errors,
you could supply a failureTester arg like this:
from twisted.web import error, http
def test(self, failure):
if int(failure.value.status) != http.GATEWAY_TIMEOUT:
As another example, while using the Twitter API you might want to allow a
range of HTTP errors and also exactly one 404 error, seeing as a 404
*might* be an error on the part of Twitter (I don't mean to suggest that
actually happens). It's probably definitive - but, why not try it once
again just to be more sure? So, pass RetryingCall a failureTester that's
an instance of a class like this:
okErrs = (http.INTERNAL_SERVER_ERROR,
self.seen404 = False
def __call__(self, failure):
status = int(failure.value.status)
if status == http.NOT_FOUND:
self.seen404 = True
elif status not in self.okErrs:
Changing existing code to use RetryingCall is pretty trivial. Take something like this
from twisted.web import client
d = client.getPage(
and change it to look like this:
r = RetryingCall(client.getPage,
d = r.start(failureTester=TwitterFailureTester())
I wrote the above last night, so I don't know if it's fully robust. But I
dropped it into some of my own stuff last night and it seems to work. I
also have a small test suite in case anyone wants it.
My questions are: Is this robust? Can/should it be improved? Any criticism
of the code (especially wrt Twisted best practices) would be very welcome.
> Looks pretty good. To streamline the usage and make the intent more
> apparent from a brief glance at the header you might consider turning it
> into a decorator, like
Thanks for the suggestion & the pointer. I'd not seen that page.
I don't think this is well suited to decorators, at least not with the
kinds of usages I am imagining. If you decorate a function, it's done once
and for all. So anyone who calls the function gets the single
one-size-fits-all decorated behavior. I'd rather the behavior was left in
the hands of the caller. That's kind of the point: give the caller flexible
control over what happens if something goes wrong, including passing in
your custom failure handler, etc.
Maybe a hybrid approach would be more useful: write a function which,
passed a function, returns a retrying version of that function that returns
a deferred that fires when the original function has succeeded (or
ultimately failed). The result could then be passed around, called by
multiple pieces of code, etc. Hmmm...
Following up on my own post:
> When the backoff iterator finishes, the RetryingCall class gives up on
> trying to get a non-error result from the function. In that case you just
> get a StopIteration exception in the failure that start() deferred
> returns (I was originally returning the original failure, but decided to
> simplify. If you want that, you can keep it yourself in an error tracking
> class, see below).
Thinking about it a bit more it seems more sensible that the start method's
default failureTester function ignores all failures, and that in the case
of an eventual failure, you get the first ever (or last ever) failure back.
Those changes would make the RetryingCall class more useful (as it stands
with the defaults you get a failure back the first time something goes
wrong, so in the default case the class doesn't even do what it says).
I wont post the changes here, as I don't even know if anyone else is
interested. Contact me if you want an updated version.