[Twisted-Python] A resizable cooperator class for queuing and dispatching jobs
I just wrote a fun class that lets you - submit jobs to be dispatched to a queue - manage how many tasks are in progress at once - dynamically adjust that number - shut down cleanly, including - recovering jobs that were queued but hadn't been dispatched This uses a combination of a DeferredQueue, a task.Cooperator, and the DeferredPool I posted on Monday. For now I named it ResizableDispatchQueue (not a great name, suggestions welcome). You can pick it up from http://pastebin.com/f7dc9320e I can think of lots of uses. Here's a simple example. You want to write a server with a web interface that allows people to enter their phone number so you can send them an SMS. You anticipate lots of people will use the service. But sending SMS messages is quite slow, and the company that you ship those jobs off to is concerned that you'll overrun their service (or maybe they have an API limit, etc). So you need to queue up jobs locally and send them off at a certain rate. You'd like to be able to adjust that rate up or down. You also want to be able to shut your service down cleanly (i.e., not in the middle of a task), and when you restart it you want to be able to re-queue the jobs that were queued last time but which hadn't gone out. For example, suppose your function that sends the SMS is called sendSMS and that it takes a (number, message) tuple arg. Then: dispatcher = ResizableDispatchQueue(sendSMS) # Tell it to send at most 5 things at once. dispatcher.start(5) # Same as dispatcher.width = 5 # Later... send off some SMS messages. dispatcher.put((2127399921, 'Hello...')) dispatcher.put((5052929919, 'Test...')) # Later, bump up to 10 simultaneous jobs. dispatcher.width = 10 # Oops, turns out we're sending too fast, turn it down a little. dispatcher.narrow(3) # Get a copy of the list of pending jobs. jobs = dispatcher.pending() # Arrange to increase the number of jobs in an hour's time. reactor.callLater(3600, dispatcher.setWidth, 20) # Time to shutdown. Wait for any tasks underway to complete, and save # the list of jobs not yet dispatched. def saveJobs(jobs): pickle.dump(jobs, ...) d = dispatcher.stop() d.addCallback(saveJobs) On restart you just unpickle the old job list and pass its items to dispatcher.put(). I have a small test suite that's a bit weird (it schedules various things and tests how long the overall job takes and what's still pending when stop is called). It could be much better, but it does at least illustrate that the code seems to work. Let me know if you want it. There's also the issue about what to do when the dispatch function hits an error. An option could be added to re-queue the job, but it's perhaps better to let the dispatch function do that along with whatever else it needs. As usual, I'd be happy to hear comments and suggestions. I'll probably adjust this so the DeferredQueue uses a priority queue. Terry
On Dec 8, 2009, at 11:09 PM, Terry Jones wrote:
I just wrote a fun class that lets you
- submit jobs to be dispatched to a queue - manage how many tasks are in progress at once - dynamically adjust that number - shut down cleanly, including - recovering jobs that were queued but hadn't been dispatched
That does sound like fun!
# Later... send off some SMS messages. dispatcher.put((2127399921, 'Hello...')) dispatcher.put((5052929919, 'Test...'))
Not that this is really germane to the class in question, but, if you're going to be sending SMS messages, you should really represent the numbers as strings (well, actually, structured objects are always better, so PhoneNumber.fromString) because international phone prefixes sometimes - actually I think it would be more accurate to say "usually" - start with a zero.
# Later, bump up to 10 simultaneous jobs. dispatcher.width = 10
# Oops, turns out we're sending too fast, turn it down a little. dispatcher.narrow(3)
This seems somewhat asymmetric. Why are there 'widen' and 'narrow' methods if I can just set the 'width' attribute directly? I could always just do '.width +=' and '.width -=' if I have a relative value.
Terry Jones wrote:
I just wrote a fun class that lets you
- submit jobs to be dispatched to a queue - manage how many tasks are in progress at once - dynamically adjust that number - shut down cleanly, including - recovering jobs that were queued but hadn't been dispatched
This uses a combination of a DeferredQueue, a task.Cooperator, and the DeferredPool I posted on Monday. For now I named it ResizableDispatchQueue (not a great name, suggestions welcome). You can pick it up from http://pastebin.com/f7dc9320e
I can think of lots of uses. Here's a simple example.
You want to write a server with a web interface that allows people to enter their phone number so you can send them an SMS. You anticipate lots of people will use the service. But sending SMS messages is quite slow, and the company that you ship those jobs off to is concerned that you'll overrun their service (or maybe they have an API limit, etc). So you need to queue up jobs locally and send them off at a certain rate. You'd like to be able to adjust that rate up or down. You also want to be able to shut your service down cleanly (i.e., not in the middle of a task), and when you restart it you want to be able to re-queue the jobs that were queued last time but which hadn't gone out.
For example, suppose your function that sends the SMS is called sendSMS and that it takes a (number, message) tuple arg. Then:
dispatcher = ResizableDispatchQueue(sendSMS) # Tell it to send at most 5 things at once. dispatcher.start(5) # Same as dispatcher.width = 5
# Later... send off some SMS messages. dispatcher.put((2127399921, 'Hello...')) dispatcher.put((5052929919, 'Test...'))
# Later, bump up to 10 simultaneous jobs. dispatcher.width = 10
# Oops, turns out we're sending too fast, turn it down a little. dispatcher.narrow(3)
# Get a copy of the list of pending jobs. jobs = dispatcher.pending()
# Arrange to increase the number of jobs in an hour's time. reactor.callLater(3600, dispatcher.setWidth, 20)
# Time to shutdown. Wait for any tasks underway to complete, and save # the list of jobs not yet dispatched.
def saveJobs(jobs): pickle.dump(jobs, ...)
d = dispatcher.stop() d.addCallback(saveJobs)
On restart you just unpickle the old job list and pass its items to dispatcher.put().
I have a small test suite that's a bit weird (it schedules various things and tests how long the overall job takes and what's still pending when stop is called). It could be much better, but it does at least illustrate that the code seems to work. Let me know if you want it.
This is really nifty. I know I could use this.
There's also the issue about what to do when the dispatch function hits an error. An option could be added to re-queue the job, but it's perhaps better to let the dispatch function do that along with whatever else it needs.
One reason to have a separate error handler is to support generic error-handling strategies, like 're-try N times and then send an email here', etc. Though maybe you could do that with decorators on the dispatch function. It does mean the dispatch function needs to know about the task queue, though.
As usual, I'd be happy to hear comments and suggestions. I'll probably adjust this so the DeferredQueue uses a priority queue.
Having written something like this, though not as general or as elegant, several times, I've found that pause() and resume() is a very useful API. That's not the same as setting the width to 0 and then back again, as pause() and resume() don't require you to know or remember the current width of the queue. dave
participants (3)
-
Dave Peticolas
-
Glyph Lefkowitz
-
Terry Jones