<html>
  <head>

    <meta http-equiv="content-type" content="text/html; charset=iso-8859-2">
  </head>
  <body bgcolor="#FFFFFF" text="#000000">
    I'm new to Python 3.5 async / await syntax. I would like to create a
    class - let's call it AsyncBus - that can be used to listen for
    keys, and send messages for keys ansynchronously.<br>
    <tt><br>
    </tt><tt>class BusTimeoutError(Exception):</tt><tt><br>
    </tt><tt>    pass</tt><tt><br>
    </tt><tt><br>
    </tt><tt>class AsyncBus(object):</tt><tt><br>
    </tt><tt>    def __init__(self, add_timeout):</tt><tt><br>
              """add_timeout should be a function that uses the current
      event loop implementation to call back"<br>
    </tt><tt>        ....</tt><tt><br>
    </tt><tt><br>
    </tt><tt>    async def notify(self, key, message):</tt><tt><br>
    </tt><tt>        """Notify a single waiter about a key.  Return if
      somebody was waiting for it."""</tt><tt><br>
    </tt><tt>        ....</tt><tt><br>
    </tt><tt><br>
    </tt><tt>    async def notifyall(self, key, message):</tt><tt><br>
    </tt><tt>        """Notify all waiters. Return the number of waiters
      notified."""</tt><tt><br>
    </tt><tt>        ....</tt><tt><br>
    </tt><tt><br>
    </tt><tt>    async def waitfor(self, keys, timeout=None):</tt><tt><br>
    </tt><tt>        """Wait until a message comes in for any of the
      given key(s). Raise BusTimeoutError after the given timedelta."""</tt><tt><br>
    </tt><tt>        ....</tt><tt><br>
    </tt><br>
    <br>
    Internally, the waitfor method would use the add_timeout to resume
    itself and raise the BusTimeoutError exception after the given
    timeout, and this should be the only place where a periodic (system
    time based) sleep would occur. The notify/notifyall methods would
    also resume execution in waitfor() call(s), but they would provide
    the message for the waiter instead of raising an exception.<br>
    <br>
    Here is an example use case:<br>
    <br>
    <ul>
      <li>Write a chat server, where all the users are running the chat
        client in a browser</li>
      <li>The browser sends long polling ajax request to the server,
        that returns any new messages immediatelly, or block for at most
        timeout=10 seconds before returning without any message. This
        long poll would be called in an infinite loop in the browser.
        Internally, long poll requests would end in bus.waitfor() calls
        on the server.<br>
      </li>
      <li>When the user sends a new message to another user, then
        bus.notifyall() is awaited. notifyall() awakens all
        bus.waitfor() calls, delivers the message to all clients, and
        finally gives back the number of clients notified to the sender
        of the message. The sender can see how many clients got the
        message.</li>
    </ul>
    I have examined code for long polling in other projects, and I have
    found that most of them use add_timeout to check for new messages
    periodically. I do not want to follow this practice.<br>
    <br>
    <ul>
      <li>let's say there are 1000 clients connected.<br>
      </li>
      <li>if I use a lower timeout (say 0.1 seconds) for periodic
        checks, then the server will be woken up 1000 times in ever 0.1
        seconds. Avg. in every 0.0001 seconds. It will do nothing
        usefull in 99.99% of that time,. That seems to be bad.</li>
      <li>if I use a higher timeout (say 10 seconds) then messages won't
        be delivered for an average of 5 seconds which is also bad.</li>
    </ul>
    So messages should NOT be delivered by periodic checks. They should
    be delivered from events triggered by incoming messages. In other
    words: when a new message comes in, it should wake up the clients
    waiting for messages (for the given user) and deliver the message
    instantaneously.<br>
    <br>
    Question is: how to write the AsyncBus class? Here is what I have
    tried - but I don't know how to create the waiter object at the
    bottom.<br>
    <br>
    <meta http-equiv="content-type" content="text/html;
      charset=iso-8859-2">
    <meta http-equiv="content-type" content="text/html;
      charset=iso-8859-2">
    <meta http-equiv="content-type" content="text/html;
      charset=iso-8859-2">
    <tt><br>
    </tt><tt>class BusTimeoutError(Exception):</tt><tt><br>
    </tt><tt>    """Raised when the waiter has been waiting for too
      long."""</tt><tt><br>
    </tt><tt>    pass</tt><tt><br>
    </tt><tt><br>
    </tt><tt><br>
    </tt><tt>class AsnycBus(object):</tt><tt><br>
    </tt><tt>    """Asynchronous notification bus."""</tt><tt><br>
    </tt><tt><br>
    </tt><tt>    def __init__(self, add_timeout):</tt><tt><br>
    </tt><tt>        self._waiters = {}</tt><tt><br>
    </tt><tt>        self._add_timeout = add_timeout</tt><tt><br>
    </tt><tt><br>
    </tt><tt>    async def notify(self, key, message):</tt><tt><br>
    </tt><tt>        """Notify a single waiter. Return if there was a
      waiter waiting for the key."""</tt><tt><br>
    </tt><tt>        if key in self._waiters:</tt><tt><br>
    </tt><tt>            self._waiters[key][0].send((key, message))</tt><tt><br>
    </tt><tt>            return True</tt><tt><br>
    </tt><tt>        else:</tt><tt><br>
    </tt><tt>            return False</tt><tt><br>
    </tt><tt><br>
    </tt><tt>    async def notifyall(self, key, message):</tt><tt><br>
    </tt><tt>        """Notify all waiters. Return the number of waiters
      notified."""</tt><tt><br>
    </tt><tt>        if key in self._waiters:</tt><tt><br>
    </tt><tt>            # Get all waiters</tt><tt><br>
    </tt><tt>            waiters = self._waiters[key]</tt><tt><br>
    </tt><tt>            for waiter in waiters:</tt><tt><br>
    </tt><tt>                # Send the message to the waiter</tt><tt><br>
    </tt><tt>                waiter.send((key, message))</tt><tt><br>
    </tt><tt>            return len(waiters)</tt><tt><br>
    </tt><tt>        else:</tt><tt><br>
    </tt><tt>            return 0</tt><tt><br>
    </tt><tt><br>
    </tt><tt>    async def waitfor(self, keys, timeout=None):</tt><tt><br>
    </tt><tt>        """Wait for keys.</tt><tt><br>
    </tt><tt><br>
    </tt><tt>                :arg keys: An iterable of immutable keys.</tt><tt><br>
    </tt><tt>                :arg timeout: Raise TimeoutError if nothing
      hits the bus for this amount of time.</tt><tt><br>
    </tt><tt>                    None means: wait indefinitely. It
      should be a datetime.timedelta object.</tt><tt><br>
    </tt><tt>        """</tt><tt><br>
    </tt><tt>        # Register for keys</tt><tt><br>
    </tt><tt>        if not keys:</tt><tt><br>
    </tt><tt>            raise Exception("Need some keys to wait
      for...")</tt><tt><br>
    </tt><tt><br>
    </tt><tt>        waiter = ???</tt><tt>??????????????????????????<br>
    </tt><tt><br>
    </tt><tt>        for key in keys:</tt><tt><br>
    </tt><tt>            if key in self._waiters:</tt><tt><br>
    </tt><tt>                self._waiters[key].append(waiter)</tt><tt><br>
    </tt><tt>            else:</tt><tt><br>
    </tt><tt>                self._waiters[key] = [waiter]</tt><tt><br>
    </tt><tt>        try:</tt><tt><br>
    </tt><tt>            # Add timeout and wake me up if nothing comes
      in.</tt><tt><br>
    </tt><tt>            if timeout:</tt><tt><br>
    </tt><tt>                self._add_timeout(timeout,
      functools.partial(waiter.throw, BusTimeoutError))</tt><tt><br>
    </tt><tt>            return await waiter</tt><tt><br>
    </tt><tt>        finally:</tt><tt><br>
    </tt><tt>            for key in keys:</tt><tt><br>
    </tt><tt>                if key in self._waiters:</tt><tt><br>
    </tt><tt>                    self._waiters[key].remove(waiter)</tt><tt><br>
    </tt><br>
  </body>
</html>