better scheduler with correct sleep times

qvx qvx3000 at gmail.com
Sat Oct 18 08:09:17 EDT 2008


I need a scheduler which can delay execution of a
function for certain period of time.
My attempt was something like this:

    def delay(self, func, arg, delay_sec=0):
        fire_at = wallclock() + delay_sec
        self.queue.put((fire_at, func, arg))

    def runner(self):
        while self.alive:
            fire_at, func, arg = self.queue.get(block=True)
            try:
                now = wallclock()
                if now < fire_at:
                    time.sleep(fire_at - now)
                func(arg)
            except Exception, e:
                log('DelayedTaskManager %s: %s\n' % (self.name, e))
            finally:
                self.queue.task_done()

But then I came up with the following case:

1. I call delay with delay_sec = 10
2. The scheduler goes to sleep for 10 seconds
3. In the meantime (lets say 1 second later) I delay
   another func but this time with delay_sec=0.5
4. The scheduler is sleeping and won't know call my
   second function for another 9 seconds insted of 0.5

I started googling for scheduler and found one in standard library
but ih has the same code as mine (it calls the  functions in the
right order and my doesn't, but it still waits too long).
The other schedulers from web are dealing with
repeating tasks and such.

So, I wrote this:

# modification of http://code.activestate.com/recipes/87369/
class PriorityMinQueue(Queue):
    def top(self):
        try:
            return self.queue[0]
        except IndexError:
            return None
    def _init(self, maxsize):
        self.maxsize = maxsize
        self.queue = []
    def _put(self, item):
        return heappush(self.queue, item)
    def _get(self):
        return heappop(self.queue)

class DelayedTaskManager:

    def __init__(self, name):
        self.name = name
        self.queue = PriorityMinQueue()
        # can't use queue.not_empty condition because it isn't
        # signaled with notifyAll so I have to use my own
        self.sleeper = threading.Condition()

    def start(self):
        log('start delayed task manager %s with %d elements\n' %
(self.name, self.queue.qsize()))
        self.alive = True
        self.thread = threading.Thread(target=self.runner)
        self.thread.setDaemon(True)
        self.thread.start()

    def stop(self):
        log('stop delayed task manager %s with %d elements\n' %
(self.name, self.queue.qsize()))
        self.alive = False
        self._wake()
        self.thread.join()

    def delay(self, delay_sec, func, *arg, **kw):
        # even if delay is 0 or less, put to queue
        # so the function gets executed concurrently
        fire_at = wallclock() + delay_sec
        self.queue.put((fire_at, func, arg, kw))
        self._wake()

    def _wake(self):
        with self.sleeper:
            self.sleeper.notify()

    def _wait(self, timeout):
        with self.sleeper:
            self.sleeper.wait(timeout)

    def runner(self):
        while self.alive:
            fire_at, func, arg, kw = self.queue.get(block=True)
            try:
                now = wallclock()
                while now < fire_at:
                    self._wait(fire_at - now)
                    if not self.alive: # canceled
                        log('delayed task manager %s was stoped\n',
self.name)
                        return self.queue.put((fire_at, func, arg,
kw))
                    top = self.queue.top()
                    if top is not None and top[0] < fire_at:
                        # temporally closer item, put back the old one
                        self.queue.put((fire_at, func, arg, kw))
                        self.queue.task_done()
                        fire_at, func, arg, kw = self.queue.get()
                    now = wallclock()
                func(*arg, **kw)
            except Exception, e:
                log('delayed task manager %s: %s\n', self.name, e)
            finally:
                self.queue.task_done()


Is there a better way or some library that does that?

My observations:

1. Threading module uses time.sleep instead of time.clock
   which results in less precise results (on windows platform)

    if sys.platform=="win32":  #take care of differences in clock
accuracy
        wallclock = time.clock
    else:
        wallclock = time.time

2. while analyzing threading module i noticed that wait() is
   implemented via loop and tiny sleep periods. I was expecting
   the usage of underlaying OS primitives and functions but
   then I remembered about GIL and quasi-multithreaded nature
   of Python. But still, isn't there a more precise method
   that interpreter itself could implement?

Thanks,
Tvrtko

P.S. This was Python 2.5



More information about the Python-list mailing list