Re: [Twisted-Python] Lock class using Deferreds

On Fri, 05 Mar 2004 18:34:44 -0500, Christopher Armstrong <radix@twistedmatrix.com> wrote:
Andrew Bennetts wrote:
On Fri, Mar 05, 2004 at 11:19:01AM -0500, Itamar Shtull-Trauring wrote:
class Lock: """A lock for event driven systems."""
I have very very similiar looking code on my harddisk... what inspired you to write this? :)
I have something vaguely similar too; a class that handled a 'pipeline' (?) of Deferreds and didn't allow more than N at a time. My purpose for it was doing several thousand DNS queries and I only wanted at most N outstanding requests at a time, so I wouldn't overload the servers. Ok, so it's not that similar, but it's something that would probably go in the same module ;-)
Actually, it is extremely similar. It's called a semaphore :) This, along with a condition class would probably all make for a good module. I was thinking twisted.python.defertools, if only defer wasn't part of twisted.internet :( Jp

On Fri, 05 Mar 2004 18:34:44 -0500, Christopher Armstrong <radix@twistedmatrix.com> wrote: I have something vaguely similar too; a class that handled a 'pipeline' (?) of Deferreds and didn't allow more than N at a time. My purpose for it was doing several thousand DNS queries and I only wanted at most N outstanding requests at a time, so I wouldn't overload the servers. Ok, so it's not that similar, but it's something that would probably go in the same module ;-)
exarkun@divmod.com wrote: Actually, it is extremely similar. It's called a semaphore :)
To be sure :) class Semaphore: """A semaphore for event driven systems.""" def __init__(self, tokens): self.waiting = [] self.tokens = tokens self.limit = tokens def acquire(self): """Attempt to acquire the token. @return Deferred which returns on token acquisition. """ assert self.tokens >= 0 d = defer.Deferred() if not self.tokens: self.waiting.append(d) else: self.tokens = self.tokens - 1 d.callback(self) return d def release(self): """Release the token. Should be called by whoever did the acquire() when the shared resource is free. """ assert self.tokens < self.limit self.tokens = self.tokens + 1 if self.waiting: # someone is waiting to acquire token self.tokens = self.tokens - 1 d = self.waiting.pop(0) d.callback(self) def _releaseAndReturn(self, r): .... class Lock (Semaphore) : def __init__(self) : Semaphore.__init__(self, 1) Andy.

Andy Gayton wrote:
exarkun@divmod.com wrote: Actually, it is extremely similar. It's called a semaphore :)
To be sure :)
class Semaphore: """A semaphore for event driven systems."""
My thing looks somewhat different; there's no 'release'... I haven't really put much time into figuring out what's going on in this thread, but exarkun wanted me to post my code. It's got a couple of things in it that are app-specific and probably isn't optimally factored. class DeferredQueue: MAX_REQUESTS = 20 def __init__(self, source, request): self.deferreds = {} self.source = source self.request = request for _ in range(self.MAX_REQUESTS): self.getMore() def getMore(self): assert len(self.deferreds) < self.MAX_REQUESTS try: sink, input = self.source.next() except StopIteration: return d = self.request(input) self.deferreds[input] = d #XXX fixme app-specific d.addBoth(self._cbGotSync, input) d.addCallback(sink) def _err(f): #XXX fixme app-specific if f.check(NotSync, defer.TimeoutError): return print f return d.addErrback(_err) def _cbGotSync(self, result, input): del self.deferreds[input] self.getMore() return result -- Twisted | Christopher Armstrong: International Man of Twistery Radix | Release Manager, Twisted Project ---------+ http://radix.twistedmatrix.com/

Christopher Armstrong wrote:
haven't really put much time into figuring out what's going on in this thread,
Putting the Lock with similiar tools into a module I think ...
class DeferredQueue:
MAX_REQUESTS = 20
You twisted guys have a way of coding stuff that makes me go cross-eyed :) You're right - it is a bit different. DeferredQueue takes a known amount of work you've got to get through and feeds it out at a controlled rate - the modified Itamar's Lock is a similiar control but accepts spontaneous requests for work. I think the following would work similiar to yours Chris ? .. dnsRequest_Semaphore = Semaphore(20) for (sink, input) in source : dnsRequest_Semaphore.run( request, input ).addCallback( sink ) ???? is that right - still getting a handle on defers :) :( the above doesn't cover the error handling you are doing though .. I can't quite work out that bit ... Andy.

Andy Gayton wrote:
Christopher Armstrong wrote:
haven't really put much time into figuring out what's going on in this thread,
Putting the Lock with similiar tools into a module I think ...
I meant, I haven't bothered enough to actually grok all the code. :)
You twisted guys have a way of coding stuff that makes me go cross-eyed :)
Well, the app I was writing at the time was kind of weird. ;-)
I think the following would work similiar to yours Chris ? ..
dnsRequest_Semaphore = Semaphore(20) for (sink, input) in source : dnsRequest_Semaphore.run( request, input ).addCallback( sink )
Dunno, maybe someone else can actually grok what's going on here (I haven't sufficiently bothered to read your Semaphore class :P). If it is the same, then your Semaphore class is obviously better :) OTOH, my thing didn't call .next() on the source until its result was actually required, but this snippet is obviously queuing up the (potentially thousands) of operations in the Semaphore.. Probably doesn't actually matter.
???? is that right - still getting a handle on defers :) :(
the above doesn't cover the error handling you are doing though .. I can't quite work out that bit ...
As the comments pointed out, the bit of error-handling was application-specific. I made a mistake, btw: the *first* "xxx fixme app-specific" wasn't actually app-specific (the addBoth(self._cbGotSync)) -- it was just an unfortunately specifically-named method. The NotSync error handling was indeed specific to the app I was writing. -- Twisted | Christopher Armstrong: International Man of Twistery Radix | Release Manager, Twisted Project ---------+ http://radix.twistedmatrix.com/

Christopher Armstrong wrote:
dnsRequest_Semaphore = Semaphore(20) for (sink, input) in source : dnsRequest_Semaphore.run( request, input ).addCallback( sink )
OTOH, my thing didn't call .next() on the source until its result was actually required, but this snippet is obviously queuing up the (potentially thousands) of operations in the Semaphore.. Probably doesn't actually matter.
It might matter - if .next took even a small amount of time (say fetching jobs out of a text file) - then my for loop would block for a bit which is against the rules .. maybe these are different tasks after all .. It's a nice exercise for learning defers though :) The problem with my for loop is its effectively copying the queue stored in source and putting it in Semaphore upfront, instead of as needed. Perhaps something which is a cross between Lock and DeferredQueue would be better - if there is a queue of known work to be processed, leave it as an external queue instead of copying it ... Ok - a sunday afternoon spent with the defers howto later - I came up with this - which only does what your original does Chris but allows you to add new jobs on the fly and is 5 times longer in code :) .. I was a bit embarrassed by that so added the ability to prioritise jobs in the queue to justify the whole experience ;) -------------- from twisted.internet import reactor, defer class Semaphore: """A semaphore for event driven systems.""" """This hasn't changed from the earlier email.""" def __init__(self, tokens): self.waiting = [] self.tokens = tokens self.limit = tokens def acquire(self): """Attempt to acquire the token. @return Deferred which returns on token acquisition. """ assert self.tokens >= 0 d = defer.Deferred() if not self.tokens: self.waiting.append(d) else: self.tokens = self.tokens - 1 d.callback(self) return d def release(self): """Release the token. Should be called by whoever did the acquire() when the shared resource is free. """ assert self.tokens < self.limit self.tokens = self.tokens + 1 if self.waiting: # someone is waiting to acquire token self.tokens = self.tokens - 1 d = self.waiting.pop(0) d.callback(self) def _releaseAndReturn(self, r): self.release() return r def run(self, f, *args, **kwargs): """Acquire token, run function, release token. @return Deferred of function result. """ d = self.acquire() d.addCallback(lambda r: defer.maybeDeferred(f, *args, **kwargs).addBoth(self._releaseAndReturn)) return d class DeferredPriorityQueue: """An event driven priority queue""" def __init__( self, max_concurrent ) : self.semaphore = Semaphore( max_concurrent ) self.queue = {} self.processing = False def next(self) : """ @return the next job in the queue raises StopIteration when out of jobs """ if not len( self.queue ) : raise StopIteration highest_priority = min( self.queue.keys() ) try: return self.queue[highest_priority][0].next() except StopIteration : del self.queue[highest_priority][0] if not len( self.queue[highest_priority] ) : del self.queue[highest_priority] return self.next() def _cbTokenAcquired(self, semaphore, job) : """Runs queued up job once a token can be acquired Sets the job to release the token when completed and calls the jobs callback Then continues processing the queue """ (f, args, kwargs, cb) = job d = defer.maybeDeferred(f, *args, **kwargs).addBoth(self.semaphore._releaseAndReturn) d.addCallback(cb) self._processQueue() def _processQueue(self): """Grabs the next job in queue if available and then waits for a token""" try: job = self.next() except StopIteration : self.processing = False return self.semaphore.acquire().addCallback(self._cbTokenAcquired, job) def _startQueue(self) : """Start queue processing if its not already started""" if not self.processing: self.processing = True self._processQueue() def run( self, f, cb, *args, **kwargs ) : """Queue up a single job""" # not happy with this bit at all, but can't # think of another way to do it without making # it compulsory to pass in priority if kwargs.has_key("priority") : priority = kwargs["priority"] del kwargs["priority"] else : priority = 5 self.runQueue( iter([(f, args, kwargs, cb)]), priority ) def runQueue(self, queue, priority=5): """Queue up a (potentially external) queue of jobs""" if not self.queue.has_key( priority ) : self.queue[priority] = [] self.queue[priority].append( queue ) self._startQueue() # # test things out # def triple(x): d = defer.Deferred() reactor.callLater(2, d.callback, x * 3) return d def cbPrintResult( result ) : print result def sources() : for i in range(8) : yield ( triple, [3], {}, cbPrintResult ) # # sources can be run directly off a semaphore, # but if it takes a while to retrieve jobs # from the source # then the following will block until all # jobs are queued up # #s = Semaphore(2) #for (f, args, kwargs, cb) in sources() : # s.run( f, *args, **kwargs ).addCallback( cb ) q = DeferredPriorityQueue(2) q.runQueue( sources() ) q.run( triple, cbPrintResult, 4, priority=3 ) reactor.run() ------------ It might have its uses - perhaps to stop mass mailouts from holding up hadhoc mails ... I'm mainly trying to learn though by tweaking with Itamar's and Chris's code - so feedback/criticism would be welcome. Andy.
participants (3)
-
Andy Gayton
-
Christopher Armstrong
-
exarkun@divmod.com