# Copyright (c) 2001-2004 Twisted Matrix Laboratories. # See LICENSE for details. from twisted.internet.defer import Deferred from twisted.python.failure import Failure from twisted.internet import reactor from twisted.python.runtime import seconds from itertools import count from Queue import Queue, Empty from threading import Event class TwistedManager(object): def __init__(self): self.twistedQueue = Queue() self.key = count() self.results = {} self.events = {} self.running = False def getKey(self): # get a unique identifier return self.key.next() def run(self): # start the reactor if self.running: return self.running = True reactor.interleave(self.twistedQueue.put) while True: callback = self.twistedQueue.get() callback() if not self.running and self.twistedQueue.empty(): return def _stopIterating(self, value, key): e = self.events[key] del self.events[key] self.results[key] = value e.set() def stop(self): # stop the reactor key = self.getKey() e = Event() self.events[key] = e reactor.addSystemEventTrigger('after', 'shutdown', self._stopIterating, True, key) self.running = False reactor.stop() e.wait() def getDeferred(self, d): # get the result of a deferred or raise if it failed key = self.getKey() e = Event() self.events[key] = e d.addBoth(self._stopIterating, key) e.wait() res = self.results.pop(key) if isinstance(res, Failure): res.raiseException() return res