Doubts about how implementing asynchronous timeouts through a heap
Josiah Carlson
josiah.carlson at gmail.com
Thu Jul 10 20:31:41 EDT 2008
On Jul 10, 11:30 am, Josiah Carlson <josiah.carl... at gmail.com> wrote:
> On Jul 9, 4:13 am, "Giampaolo Rodola'" <gne... at gmail.com> wrote:
>
>
>
> > Hi,
> > I'm trying to implement an asynchronous scheduler for asyncore to call
> > functions at a later time without blocking the main loop.
> > The logic behind it consists in:
>
> > - adding the scheduled functions into a heapified list
> > - calling a "scheduler" function at every loop which checks the
> > scheduled functions due to expire soonest
>
> > Note that, by using a heap, the first element of the list is always
> > supposed to be the one with the lower timeout.
> > Here's the code I wrote:
>
> > <--- snippet --->
> > import heapq
> > import time
> > import sys
>
> > delayed_map = []
>
> > class delayed_call:
> > """Calls a function at a later time.
>
> > The instance returned is an object that can be used to cancel the
> > scheduled call, by calling its cancel() method.
> > It also may be rescheduled by calling delay() or reset()} methods.
> > """
>
> > def __init__(self, delay, target, *args, **kwargs):
> > """
> > - delay: the number of seconds to wait
> > - target: the callable object to call later
> > - args: the arguments to call it with
> > - kwargs: the keyword arguments to call it with
> > """
> > assert callable(target), "%s is not callable" %target
> > assert sys.maxint >= delay >= 0, "%s is not greater than or
> > equal " \
> > "to 0 seconds" % (delay)
> > self.__delay = delay
> > self.__target = target
> > self.__args = args
> > self.__kwargs = kwargs
> > # seconds from the epoch at which to call the function
> > self.timeout = time.time() + self.__delay
> > self.cancelled = False
> > heapq.heappush(delayed_map, self)
>
> > def __le__(self, other):
> > return self.timeout <= other.timeout
>
> > def active(self):
> > """Return True if this scheduler has not been cancelled."""
> > return not self.cancelled
>
> > def call(self):
> > """Call this scheduled function."""
> > self.__target(*self.__args, **self.__kwargs)
>
> > def reset(self):
> > """Reschedule this call resetting the current countdown."""
> > assert not self.cancelled, "Already cancelled"
> > self.timeout = time.time() + self.__delay
> > if delayed_map[0] is self:
> > heapq.heapify(delayed_map)
>
> > def delay(self, seconds):
> > """Reschedule this call for a later time."""
> > assert not self.cancelled, "Already cancelled."
> > assert sys.maxint >= seconds >= 0, "%s is not greater than or
> > equal " \
> > "to 0 seconds" %(seconds)
> > self.__delay = seconds
> > self.reset()
>
> > def cancel(self):
> > """Unschedule this call."""
> > assert not self.cancelled, "Already cancelled"
> > del self.__target, self.__args, self.__kwargs
> > if self in delayed_map:
> > if delayed_map[0] is self:
> > delayed_map.remove(self)
> > heapq.heapify(delayed_map)
> > else:
> > delayed_map.remove(self)
> > self.cancelled = True
>
> > def fun(arg):
> > print arg
>
> > a = delayed_call(0.6, fun, '0.6')
> > b = delayed_call(0.5, fun, '0.5')
> > c = delayed_call(0.4, fun, '0.4')
> > d = delayed_call(0.3, fun, '0.3')
> > e = delayed_call(0.2, fun, '0.2')
> > f = delayed_call(0.1, fun, '0.1')
>
> > while delayed_map:
> > now = time.time()
> > while delayed_map and now >= delayed_map[0].timeout:
> > delayed = heapq.heappop(delayed_map)
> > try:
> > delayed.call()
> > finally:
> > if not delayed.cancelled:
> > delayed.cancel()
> > time.sleep(0.01)
> > </--- snippet --->
>
> > Here comes the questions.
> > Since that the timeouts of the scheduled functions contained in the
> > list can change when I reset() or cancel() them I don't know exactly
> > *when* the list needs to be heapified().
> > By doing some tests I came to the conclusion that I need the heapify()
> > the list only when the function I reset() or cancel() is the *first of
> > the list* but I'm not absolutely sure about it.
> > When do you think it would be necessary calling heapify()?
> > I wrote a short test suite which tests the code above and I didn't
> > notice strange behaviors but since that I don't know much about the
> > logic behind heaps I'd need some help.
> > Thanks a lot in advance.
>
> > --- Giampaolohttp://code.google.com/p/pyftpdlib/
>
> According to a quick scan, there is some inefficiencies with your
> code. In particular, you don't need to re-heapify if the item you
> need to remove is the first item; you only need to heappop().
>
> In any case, if the Python standard library heapq module supported non-
> lists as containers, then the pair heap implementation I wrote a
> couple years ago would be perfect for this particular task. Because
> of the rewriting of heapq in C, without a bit of monkeypatching, we
> can't re-use that implementation (which offered insert/remove of
> arbitrary entries in the heap in O(logn) time, which is significantly
> faster than the O(n) time of your implementation).
>
> What I'm thinking is that we should add a pair heap implementation to
> the heapq module (which can cache the pure Python functions if it
> needs to use them), which would then allow us to use that (and others
> to use it generally), add scheduling, etc.
>
> Regardless, it's a 2.7/3.1 feature, so it's ok if we take it slow.
>
> - Josiah
Also...first I need to fix the unittest failures. ;)
- Josiah
More information about the Python-list
mailing list