<html>
<head>
<meta http-equiv="content-type" content="text/html; charset=iso-8859-2">
</head>
<body bgcolor="#FFFFFF" text="#000000">
I'm new to Python 3.5 async / await syntax. I would like to create a
class - let's call it AsyncBus - that can be used to listen for
keys, and send messages for keys ansynchronously.<br>
<tt><br>
</tt><tt>class BusTimeoutError(Exception):</tt><tt><br>
</tt><tt> pass</tt><tt><br>
</tt><tt><br>
</tt><tt>class AsyncBus(object):</tt><tt><br>
</tt><tt> def __init__(self, add_timeout):</tt><tt><br>
"""add_timeout should be a function that uses the current
event loop implementation to call back"<br>
</tt><tt> ....</tt><tt><br>
</tt><tt><br>
</tt><tt> async def notify(self, key, message):</tt><tt><br>
</tt><tt> """Notify a single waiter about a key. Return if
somebody was waiting for it."""</tt><tt><br>
</tt><tt> ....</tt><tt><br>
</tt><tt><br>
</tt><tt> async def notifyall(self, key, message):</tt><tt><br>
</tt><tt> """Notify all waiters. Return the number of waiters
notified."""</tt><tt><br>
</tt><tt> ....</tt><tt><br>
</tt><tt><br>
</tt><tt> async def waitfor(self, keys, timeout=None):</tt><tt><br>
</tt><tt> """Wait until a message comes in for any of the
given key(s). Raise BusTimeoutError after the given timedelta."""</tt><tt><br>
</tt><tt> ....</tt><tt><br>
</tt><br>
<br>
Internally, the waitfor method would use the add_timeout to resume
itself and raise the BusTimeoutError exception after the given
timeout, and this should be the only place where a periodic (system
time based) sleep would occur. The notify/notifyall methods would
also resume execution in waitfor() call(s), but they would provide
the message for the waiter instead of raising an exception.<br>
<br>
Here is an example use case:<br>
<br>
<ul>
<li>Write a chat server, where all the users are running the chat
client in a browser</li>
<li>The browser sends long polling ajax request to the server,
that returns any new messages immediatelly, or block for at most
timeout=10 seconds before returning without any message. This
long poll would be called in an infinite loop in the browser.
Internally, long poll requests would end in bus.waitfor() calls
on the server.<br>
</li>
<li>When the user sends a new message to another user, then
bus.notifyall() is awaited. notifyall() awakens all
bus.waitfor() calls, delivers the message to all clients, and
finally gives back the number of clients notified to the sender
of the message. The sender can see how many clients got the
message.</li>
</ul>
I have examined code for long polling in other projects, and I have
found that most of them use add_timeout to check for new messages
periodically. I do not want to follow this practice.<br>
<br>
<ul>
<li>let's say there are 1000 clients connected.<br>
</li>
<li>if I use a lower timeout (say 0.1 seconds) for periodic
checks, then the server will be woken up 1000 times in ever 0.1
seconds. Avg. in every 0.0001 seconds. It will do nothing
usefull in 99.99% of that time,. That seems to be bad.</li>
<li>if I use a higher timeout (say 10 seconds) then messages won't
be delivered for an average of 5 seconds which is also bad.</li>
</ul>
So messages should NOT be delivered by periodic checks. They should
be delivered from events triggered by incoming messages. In other
words: when a new message comes in, it should wake up the clients
waiting for messages (for the given user) and deliver the message
instantaneously.<br>
<br>
Question is: how to write the AsyncBus class? Here is what I have
tried - but I don't know how to create the waiter object at the
bottom.<br>
<br>
<meta http-equiv="content-type" content="text/html;
charset=iso-8859-2">
<meta http-equiv="content-type" content="text/html;
charset=iso-8859-2">
<meta http-equiv="content-type" content="text/html;
charset=iso-8859-2">
<tt><br>
</tt><tt>class BusTimeoutError(Exception):</tt><tt><br>
</tt><tt> """Raised when the waiter has been waiting for too
long."""</tt><tt><br>
</tt><tt> pass</tt><tt><br>
</tt><tt><br>
</tt><tt><br>
</tt><tt>class AsnycBus(object):</tt><tt><br>
</tt><tt> """Asynchronous notification bus."""</tt><tt><br>
</tt><tt><br>
</tt><tt> def __init__(self, add_timeout):</tt><tt><br>
</tt><tt> self._waiters = {}</tt><tt><br>
</tt><tt> self._add_timeout = add_timeout</tt><tt><br>
</tt><tt><br>
</tt><tt> async def notify(self, key, message):</tt><tt><br>
</tt><tt> """Notify a single waiter. Return if there was a
waiter waiting for the key."""</tt><tt><br>
</tt><tt> if key in self._waiters:</tt><tt><br>
</tt><tt> self._waiters[key][0].send((key, message))</tt><tt><br>
</tt><tt> return True</tt><tt><br>
</tt><tt> else:</tt><tt><br>
</tt><tt> return False</tt><tt><br>
</tt><tt><br>
</tt><tt> async def notifyall(self, key, message):</tt><tt><br>
</tt><tt> """Notify all waiters. Return the number of waiters
notified."""</tt><tt><br>
</tt><tt> if key in self._waiters:</tt><tt><br>
</tt><tt> # Get all waiters</tt><tt><br>
</tt><tt> waiters = self._waiters[key]</tt><tt><br>
</tt><tt> for waiter in waiters:</tt><tt><br>
</tt><tt> # Send the message to the waiter</tt><tt><br>
</tt><tt> waiter.send((key, message))</tt><tt><br>
</tt><tt> return len(waiters)</tt><tt><br>
</tt><tt> else:</tt><tt><br>
</tt><tt> return 0</tt><tt><br>
</tt><tt><br>
</tt><tt> async def waitfor(self, keys, timeout=None):</tt><tt><br>
</tt><tt> """Wait for keys.</tt><tt><br>
</tt><tt><br>
</tt><tt> :arg keys: An iterable of immutable keys.</tt><tt><br>
</tt><tt> :arg timeout: Raise TimeoutError if nothing
hits the bus for this amount of time.</tt><tt><br>
</tt><tt> None means: wait indefinitely. It
should be a datetime.timedelta object.</tt><tt><br>
</tt><tt> """</tt><tt><br>
</tt><tt> # Register for keys</tt><tt><br>
</tt><tt> if not keys:</tt><tt><br>
</tt><tt> raise Exception("Need some keys to wait
for...")</tt><tt><br>
</tt><tt><br>
</tt><tt> waiter = ???</tt><tt>??????????????????????????<br>
</tt><tt><br>
</tt><tt> for key in keys:</tt><tt><br>
</tt><tt> if key in self._waiters:</tt><tt><br>
</tt><tt> self._waiters[key].append(waiter)</tt><tt><br>
</tt><tt> else:</tt><tt><br>
</tt><tt> self._waiters[key] = [waiter]</tt><tt><br>
</tt><tt> try:</tt><tt><br>
</tt><tt> # Add timeout and wake me up if nothing comes
in.</tt><tt><br>
</tt><tt> if timeout:</tt><tt><br>
</tt><tt> self._add_timeout(timeout,
functools.partial(waiter.throw, BusTimeoutError))</tt><tt><br>
</tt><tt> return await waiter</tt><tt><br>
</tt><tt> finally:</tt><tt><br>
</tt><tt> for key in keys:</tt><tt><br>
</tt><tt> if key in self._waiters:</tt><tt><br>
</tt><tt> self._waiters[key].remove(waiter)</tt><tt><br>
</tt><br>
</body>
</html>