How to implement an async message bus
Ian Kelly
ian.g.kelly at gmail.com
Thu Oct 15 10:44:40 EDT 2015
On Thu, Oct 15, 2015 at 5:25 AM, Nagy László Zsolt <gandalf at shopzeus.com> wrote:
> I'm new to Python 3.5 async / await syntax. I would like to create a class -
> let's call it AsyncBus - that can be used to listen for keys, and send
> messages for keys ansynchronously.
>
> class BusTimeoutError(Exception):
> pass
>
> class AsyncBus(object):
> def __init__(self, add_timeout):
> """add_timeout should be a function that uses the current event loop
> implementation to call back"
> ....
>
> async def notify(self, key, message):
> """Notify a single waiter about a key. Return if somebody was
> waiting for it."""
> ....
>
> async def notifyall(self, key, message):
> """Notify all waiters. Return the number of waiters notified."""
> ....
>
> async def waitfor(self, keys, timeout=None):
> """Wait until a message comes in for any of the given key(s). Raise
> BusTimeoutError after the given timedelta."""
> ....
>
>
> Internally, the waitfor method would use the add_timeout to resume itself
> and raise the BusTimeoutError exception after the given timeout, and this
> should be the only place where a periodic (system time based) sleep would
> occur. The notify/notifyall methods would also resume execution in waitfor()
> call(s), but they would provide the message for the waiter instead of
> raising an exception.
My first instinct is to suggest that you not reinvent the wheel and
point you at the asyncio.Condition class. However, it apparently
doesn't support setting a timeout on wait, which seems odd since the
threading.Condition class that it's based on does. You could use
asyncio.wait to wait for it with a timeout, but that wouldn't remove
the waiter from the Condition. Maybe this would be a useful feature
request + patch.
> Question is: how to write the AsyncBus class? Here is what I have tried -
> but I don't know how to create the waiter object at the bottom.
>
>
> class BusTimeoutError(Exception):
> """Raised when the waiter has been waiting for too long."""
> pass
>
>
> class AsnycBus(object):
> """Asynchronous notification bus."""
>
> def __init__(self, add_timeout):
> self._waiters = {}
> self._add_timeout = add_timeout
>
> async def notify(self, key, message):
> """Notify a single waiter. Return if there was a waiter waiting for
> the key."""
> if key in self._waiters:
> self._waiters[key][0].send((key, message))
> return True
> else:
> return False
It looks like you're assuming that the waiter object will be a
coroutine and trying to call its send method directly rather than
going through the event loop? That seems like a bad idea. In asyncio,
a coroutine is something that you can await or schedule with
loop.create_task(). Don't try to use those low-level methods.
I think a better approach would be to make the waiter a Future and
signal it by setting its result. Something like this, as a rough
sketch:
async def waitfor(self, keys, timeout=None):
waiter = asyncio.Future()
for key in keys:
self._waiters[key].add(waiter)
handle = None
if timeout:
handle = asyncio.call_later(timeout, self._handle_timeout, waiter)
try:
return await waiter
finally:
# TODO: Use a context manager to add and remove the keys.
for key in keys:
self._waiters[key].discard(waiter)
if handle:
handle.cancel()
def notify(self, key, message):
if key in self._waiters and self._waiters[key]:
waiter = next(iter(self._waiters[key]))
waiter.set_result((key, message))
return True
return False
def _handle_timeout(self, waiter):
waiter.set_exception(new BusTimeoutError())
More information about the Python-list
mailing list