
I have a question about an approach I used... I'm worried that I've over-worked it and have over-looked a more elegant and standard solution. I have the need to fire off network connections in groups. Deferreds added to a DeferredList don't fit the bill (because there's no control over all the deferreds in the list). As an example, if you wanted to make a whole batch of concurrent connections, but didn't want to incur the overhead of firing off more than 20 simultaneous connections, you'd split your destination hosts up into groups of 20. As a group was completed, a callback could fire off the next group, etc. What's more, I didn't want to put this kind of control in a factory or a protocol. In my mind, that didn't seem the proper place for it... I have some test code that generates the following output. I created a class that instantiates a ClientFactory instance and then manages a set of deferreds and deferred lists. Here's some sample output: Entered main run() loop. deferred #1 is connecting to adytum.us... deferred #2 is connecting to del.icio.us... deferred #3 is connecting to google.com... Left main run() loop. deferred #1 connected! deferred #2 connected! deferred #3 connected! Finished with Group 1 deferred #4 is connecting to last.fm... deferred #5 is connecting to washingtonpost.com... deferred #6 is connecting to yahoo.com... deferred #5 connected! deferred #6 connected! deferred #4 connected! Finished with Group 2 deferred #7 is connecting to microsoft.com... deferred #8 is connecting to amazon.com... deferred #7 connected! deferred #8 connected! Finished with Group 3 Finished all groups. Here are some pertinent parts of the code (full pasting: http://pastebin.adytum.us/39/1 ): class DeferredsByGroup(object): [...] def initiate(self): self.groups = self.splitLoad(self.hosts, self.hosts_per_group) # get the first group and let the callback handle the next one host_group = self.groups.next() dl = self.setupGroupDeferred(host_group) dl.addCallback(self.getNextGroup) dl.addErrback(self.noMoreGroups) def splitLoad(self, host_list, per_group): group_count, remainder = divmod(len(host_list), per_group) if remainder: group_count += 1 for i in xrange(group_count): yield host_list[i*per_group:i*per_group+per_group] [...] def getNextGroup(self, null): group = self.groups.next() dl = self.setupGroupDeferred(group) dl.addCallback(self.getNextGroup) dl.addErrback(self.noMoreGroups) def setupHostDeferred(self, host): self.host_counter += 1 name = "deferred #%s" % self.host_counter f = Factory(host, name) d = f.deferred d.addCallback(self.updateData) return d def setupGroupDeferred(self, group): self.group_counter += 1 # iterate through each host in the group deferreds = [ self.setupHostDeferred(host) for host in group ] # set up this group as a deferred list dl = defer.DeferredList(deferreds) dl.addCallback(self.handleGroup) dl.addErrback(self.handleFailure) return dl [...] And this is instantiated in the following manner: hosts = [ 'adytum.us', 'del.icio.us', 'google.com', 'last.fm', 'washingtonpost.com', 'yahoo.com', 'microsoft.com', 'amazon.com', ] hosts_per_group = 3 runner = DeferredsByGroup(hosts, hosts_per_group) runner.initiate() reactor.run()

Duncan McGreggor wrote:
This functionality (or, something very close) is in the deferred module already, it's just non-obvious how you use it: sem = defer.DeferredSemaphore(count) dl = [sem.run(callable, item, *args, **named) for item in iterable] return defer.DeferredList(dl, consumeErrors=True) which, rather than "batching" (which means that you have 3, then 2, then 1 in play, then 3, then 2, then 1), keeps "count" items in play at any time (as long as there are available items), replacing each item that completes with one pending item. I wrap that bit of code with a little function called parallel in my code so I don't have to remember the trick every time. HTH, Mike -- ________________________________________________ Mike C. Fletcher Designer, VR Plumber, Coder http://www.vrplumber.com http://blog.vrplumber.com

On Jan 21, 2006, at 6:55 AM, Mike C. Fletcher wrote:
Ah, this is fantastic! Thanks for the heads up. I've been buried in the deferred file for the paste few days and have no idea how I missed this... I'm now reading through the entire file to see what other goodies I might have missed... I've replaced almost all of that test code, and it works like a charm. Your usage example was a huge time-saver -- thanks! d

On 1/21/06, Mike C. Fletcher <mcfletch@rogers.com> wrote:
Oh good God, you mean twisted had this all the time? Would have saved me a (small) number of headches about 6 months back, though given how much my beast has grown since then, I'd probably have ended up replacing it anyway. I've added a documentation bug (1432), and I might see if I get some time to write some docs for it this afternoon. Moof

On 1/21/06, Duncan McGreggor <duncan.mcgreggor@gmail.com> wrote:
This is one approach. It has the characteristic that if one site in your group is considerably slower than the others, you will wait till all the sites in your group are finished before firing off the next group. This may or may not be good thing for your particular app. An alternative is to create a "pool" of connections that will consume from a queue of potential connections. you feed your list into a DeferredQueue, and create as many concurrent connection handlers as you want, that will all consume from that same queue. this has the characteristic that as long as you keep the queue full you are constantly running 20 connections. This may or may nto be an advantage in the case of your application. Or if you want to use the built-in twisted magic, take a look at twisted.protocols.policies.ThrottlingFactory and other similar things int he same package see if one can be adapted to your use. Keep in mind that twisted is not *actually* concurrent, so you may not need to throttle your connections that much, you might be able to let the reactor handle the connection load itself. Actually, given that the reactor handles a thread pool size, is there an equivalent "connection pool size" that can be manipulated from inside the programme? Does such a concept have any use or meaning? Moof - not a reactor expert, as you can see. Moof

Duncan McGreggor wrote:
This functionality (or, something very close) is in the deferred module already, it's just non-obvious how you use it: sem = defer.DeferredSemaphore(count) dl = [sem.run(callable, item, *args, **named) for item in iterable] return defer.DeferredList(dl, consumeErrors=True) which, rather than "batching" (which means that you have 3, then 2, then 1 in play, then 3, then 2, then 1), keeps "count" items in play at any time (as long as there are available items), replacing each item that completes with one pending item. I wrap that bit of code with a little function called parallel in my code so I don't have to remember the trick every time. HTH, Mike -- ________________________________________________ Mike C. Fletcher Designer, VR Plumber, Coder http://www.vrplumber.com http://blog.vrplumber.com

On Jan 21, 2006, at 6:55 AM, Mike C. Fletcher wrote:
Ah, this is fantastic! Thanks for the heads up. I've been buried in the deferred file for the paste few days and have no idea how I missed this... I'm now reading through the entire file to see what other goodies I might have missed... I've replaced almost all of that test code, and it works like a charm. Your usage example was a huge time-saver -- thanks! d

On 1/21/06, Mike C. Fletcher <mcfletch@rogers.com> wrote:
Oh good God, you mean twisted had this all the time? Would have saved me a (small) number of headches about 6 months back, though given how much my beast has grown since then, I'd probably have ended up replacing it anyway. I've added a documentation bug (1432), and I might see if I get some time to write some docs for it this afternoon. Moof

On 1/21/06, Duncan McGreggor <duncan.mcgreggor@gmail.com> wrote:
This is one approach. It has the characteristic that if one site in your group is considerably slower than the others, you will wait till all the sites in your group are finished before firing off the next group. This may or may not be good thing for your particular app. An alternative is to create a "pool" of connections that will consume from a queue of potential connections. you feed your list into a DeferredQueue, and create as many concurrent connection handlers as you want, that will all consume from that same queue. this has the characteristic that as long as you keep the queue full you are constantly running 20 connections. This may or may nto be an advantage in the case of your application. Or if you want to use the built-in twisted magic, take a look at twisted.protocols.policies.ThrottlingFactory and other similar things int he same package see if one can be adapted to your use. Keep in mind that twisted is not *actually* concurrent, so you may not need to throttle your connections that much, you might be able to let the reactor handle the connection load itself. Actually, given that the reactor handles a thread pool size, is there an equivalent "connection pool size" that can be manipulated from inside the programme? Does such a concept have any use or meaning? Moof - not a reactor expert, as you can see. Moof
participants (3)
-
Duncan McGreggor
-
Mike C. Fletcher
-
Moof