Best way to implement a timed queue?
Jan Dries
jan.dries at dcube-resource.be
Thu Jan 4 10:51:27 EST 2007
Thomas Ploch wrote:
> I am having troubles with implementing a timed queue. I am using the
> 'Queue' module to manage several queues. But I want a timed access, i.e.
> only 2 fetches per second max. I am horribly stuck on even how I
> actually could write it. Has somebody done that before? And when yes,
> how is the best way to implement it?
I don't know about "the best way" to implement it, but I once solved a
similar problem with the following scenario.
The idea is that you create a "token" queue, a Queue object of the
desired depth, 2 in your case. You then have a separate thread that, in
a loop, puts two "tokens" on the queue and then puts itself to sleep for
one second.
And the worker thread or threads that must be doing whatever twice per
second try to get an object from the queue prior to doing their thing.
The following piece of code does the trick. Note that it allows worker
threads to do something two times every second, and not once every half
second, though it could easily be modified to accomodate that too.
Also note that the code below keeps running forever as worker threads
never stop working. You will probably want to change that.
Regards,
Jan
import time
from Queue import Queue,Full,Empty
from thread import start_new_thread
class Manager(object):
def __init__(self, number_of_workers=10, max_per_sec=5):
self.MAX_PER_SEC = max_per_sec
self.NUMBER_OF_WORKERS = number_of_workers
self.timelimit = Queue(self.MAX_PER_SEC)
self.donelock = Queue(0)
self.finished = False
def do_work(self,number):
print "Starting worker thread %s" % number
while True:
if self.get_time():
# do whatever can only be done x times per second
print "Worker %s doing work" % number
time.sleep(3) # simulate worker doing some work
self.signal_done()
def signal_done(self):
self.donelock.put(None,True)
def wait_done(self):
for i in range(0,self.MAX_PER_SEC):
self.donelock.get(True)
self.finished = True
def feed_time(self):
while not self.is_finished():
for i in range(0,self.MAX_PER_SEC):
self.insert_time()
time.sleep(1)
def insert_time(self):
try:
self.timelimit.put_nowait(None)
except Full:
pass
def get_time(self):
try:
self.timelimit.get(True,10)
return True
except Empty:
return False
def is_finished(self):
return self.finished
def start_worker_threads(self):
for i in range(0,self.NUMBER_OF_WORKERS):
start_new_thread(self.do_work,(i + 1,))
def start_time_thread(self):
start_new_thread(self.feed_time,())
def run(self):
self.start_time_thread()
self.start_worker_threads()
self.wait_done()
def main():
Manager(10,2).run()
if __name__ == "__main__":
main()
More information about the Python-list
mailing list