[Twisted-Python] patch implementing deferIterationToThread
Please find a patch to internet/defer.py and iternet/threads.py which enables an "iterator" to be deferred to a thread. This is useful since often you have a long running process (a database query for example) which you'd like to return results every once and a while rather than delivering all of the output at the end of the process. This patch adds to the given files (it doesn't change any code). The remainder of the message is hereby public domain. # # code to append to defer.py # class MultiCallDeferred(Deferred): """This is a verision of Deferred which can be invoked more than once. This is accomplished by cloning the current deferred object and carrying out the callbacks on the clone. """ def _startRunCallbacks(self, result, isError): clone = Deferred() clone.default = self.default for x in self.callbacks: clone.callbacks.append(x) clone._startRunCallbacks(result, isError) # # code to be added to threads.py # def _putIterationInDeferred(deferred, f, args, kwargs): """Send the results of an iteration to a deferred. The function called should return an object with a next() operator. This is the ideal mechanism to defer a generator. """ from twisted.internet import reactor try: itr = apply(f, args, kwargs) while 1: reactor.callFromThread(deferred.callback, itr.next()) except StopIteration: pass except: f = failure.Failure() reactor.callFromThread(deferred.errback, f) def deferIterationToThread(f, *args, **kwargs): """Run the results of an iterator in a thread.""" d = defer.MultiCallDeferred() reactor.callInThread(_putIterationInDeferred, d, f, args, kwargs) return d # # usage using iterators (or generators for simpler syntax) # from twisted.internet.threads import deferIterationToThread from twisted.internet import reactor class producer: def __init__(self): self.val = 9 def next(self): val = self.val if val < 1: raise StopIteration self.val -= 1 return val def bldr(): return producer() def printResult(x): print x d = deferIterationToThread(bldr) d.addCallback(printResult) try: # if you have generators from __future__ import generators def gene(start=99): while(start > 90): yield start start -= 1 d = deferIterationToThread(gene) d.addCallback(printResult) except: pass reactor.run()
participants (1)
-
Clark C. Evans