Doubts about how implementing asynchronous timeouts through a heap

Josiah Carlson josiah.carlson at gmail.com
Thu Jul 10 14:30:11 EDT 2008


On Jul 9, 4:13 am, "Giampaolo Rodola'" <gne... at gmail.com> wrote:
> Hi,
> I'm trying to implement an asynchronous scheduler for asyncore to call
> functions at a later time without blocking the main loop.
> The logic behind it consists in:
>
> - adding the scheduled functions into a heapified list
> - calling a "scheduler" function at every loop which checks the
> scheduled functions due to expire soonest
>
> Note that, by using a heap, the first element of the list is always
> supposed to be the one with the lower timeout.
> Here's the code I wrote:
>
> <--- snippet --->
> import heapq
> import time
> import sys
>
> delayed_map = []
>
> class delayed_call:
>     """Calls a function at a later time.
>
>     The instance returned is an object that can be used to cancel the
>     scheduled call, by calling its cancel() method.
>     It also may be rescheduled by calling delay() or reset()} methods.
>     """
>
>     def __init__(self, delay, target, *args, **kwargs):
>         """
>         - delay: the number of seconds to wait
>         - target: the callable object to call later
>         - args: the arguments to call it with
>         - kwargs: the keyword arguments to call it with
>         """
>         assert callable(target), "%s is not callable" %target
>         assert sys.maxint >= delay >= 0, "%s is not greater than or
> equal " \
>                                            "to 0 seconds" % (delay)
>         self.__delay = delay
>         self.__target = target
>         self.__args = args
>         self.__kwargs = kwargs
>         # seconds from the epoch at which to call the function
>         self.timeout = time.time() + self.__delay
>         self.cancelled = False
>         heapq.heappush(delayed_map, self)
>
>     def __le__(self, other):
>         return self.timeout <= other.timeout
>
>     def active(self):
>         """Return True if this scheduler has not been cancelled."""
>         return not self.cancelled
>
>     def call(self):
>         """Call this scheduled function."""
>         self.__target(*self.__args, **self.__kwargs)
>
>     def reset(self):
>         """Reschedule this call resetting the current countdown."""
>         assert not self.cancelled, "Already cancelled"
>         self.timeout = time.time() + self.__delay
>         if delayed_map[0] is self:
>             heapq.heapify(delayed_map)
>
>     def delay(self, seconds):
>         """Reschedule this call for a later time."""
>         assert not self.cancelled, "Already cancelled."
>         assert sys.maxint >= seconds >= 0, "%s is not greater than or
> equal " \
>                                            "to 0 seconds" %(seconds)
>         self.__delay = seconds
>         self.reset()
>
>     def cancel(self):
>         """Unschedule this call."""
>         assert not self.cancelled, "Already cancelled"
>         del self.__target, self.__args, self.__kwargs
>         if self in delayed_map:
>             if delayed_map[0] is self:
>                 delayed_map.remove(self)
>                 heapq.heapify(delayed_map)
>             else:
>                 delayed_map.remove(self)
>         self.cancelled = True
>
> def fun(arg):
>     print arg
>
> a = delayed_call(0.6, fun, '0.6')
> b = delayed_call(0.5, fun, '0.5')
> c = delayed_call(0.4, fun, '0.4')
> d = delayed_call(0.3, fun, '0.3')
> e = delayed_call(0.2, fun, '0.2')
> f = delayed_call(0.1, fun, '0.1')
>
> while delayed_map:
>     now = time.time()
>     while delayed_map and now >= delayed_map[0].timeout:
>         delayed = heapq.heappop(delayed_map)
>         try:
>             delayed.call()
>         finally:
>             if not delayed.cancelled:
>                 delayed.cancel()
>     time.sleep(0.01)
> </--- snippet --->
>
> Here comes the questions.
> Since that the timeouts of the scheduled functions contained in the
> list can change when I reset() or cancel() them I don't know exactly
> *when* the list needs to be heapified().
> By doing some tests I came to the conclusion that I need the heapify()
> the list only when the function I reset() or cancel() is the *first of
> the list* but I'm not absolutely sure about it.
> When do you think it would be necessary calling heapify()?
> I wrote a short test suite which tests the code above and I didn't
> notice strange behaviors but since that I don't know much about the
> logic behind heaps I'd need some help.
> Thanks a lot in advance.
>
> --- Giampaolohttp://code.google.com/p/pyftpdlib/

According to a quick scan, there is some inefficiencies with your
code.  In particular, you don't need to re-heapify if the item you
need to remove is the first item; you only need to heappop().

In any case, if the Python standard library heapq module supported non-
lists as containers, then the pair heap implementation I wrote a
couple years ago would be perfect for this particular task.  Because
of the rewriting of heapq in C, without a bit of monkeypatching, we
can't re-use that implementation (which offered insert/remove of
arbitrary entries in the heap in O(logn) time, which is significantly
faster than the O(n) time of your implementation).

What I'm thinking is that we should add a pair heap implementation to
the heapq module (which can cache the pure Python functions if it
needs to use them), which would then allow us to use that (and others
to use it generally), add scheduling, etc.

Regardless, it's a 2.7/3.1 feature, so it's ok if we take it slow.

 - Josiah



More information about the Python-list mailing list