Awaiting until a condition is met

Hello. I've been thinking it would be nice to be able to use await to suspend execution until a condition is met (as represented by a function or lambda which returns a boolean. Currently, as far as I can tell, the way to accomplish this is with a while loop which frequently checks the condition, suspending execution in between checks. My use case is in robotics, so I'll use that. Here's a minimal version of my current approach. start_moving_to(dest) while distance_to(dest) > tolerance: await asyncio.sleep(1 / 60) This requires choosing a somewhat arbitrary frequency with which to check the condition. But if the frequency is too high, the function may not be suspended often enough for other tasks to run and if it's too low, there's a significant delay before the condition's satisfaction can be noticed. So here's my proposal for syntax that I think is quite a bit cleaner: start_moving_to(dest) await lambda: distance_to(dest) <= tolerance This eliminates the need to choose a frequency and makes the code even more readable. Of course, a previously-defined function could be used in place of a lambda, which would allow for the even cleaner expression await has_arrived if has_arrived is already defined. While I only have a basic understanding of how the event loop works, it seems to me it should be fairly straightforward to add a list of that pairs conditions with callbacks and check those conditions on each pass through the list of registered callbacks, fairly similarly to how scheduled callbacks created with asyncio.sleep wait until enough time has passed. Looking forward to hearing people's thoughts.

On Sun, 15 May 2022 at 23:00, Aaron Fink <aaronpfink@gmail.com> wrote:
But the frequency has to be defined somewhere. How does Python know when to check the function? Syntax doesn't remove that question, it only moves it around. What I would recommend is having your own "wait for condition" function that hides that logic away. Something like this (untested): POLL_FREQUENCY = 60 async def poll_until(pred): while not pred(): await asyncio.sleep(1 / POLL_FREQUENCY) Then, if you want to change your polling frequency, it's in one place, and not part of the logic of "are we theeeeere yet" which now only needs to concern itself with positions. A possibly-more-elegant solution might be to design your own event loop with an intrinsic frame rate, always advancing by precisely the same sleep interval and then checking whatever needs to be checked. But that would be a lot more work :) ChrisA

The generally-accepted solution for this task is to set an Event, rather than just returning a Boolean: def move_to(dest, done_event): ... done_event.set() async def main(): dest = ... done_event = asyncio.Event() create_task(move_to(dest, done_event)) await done_event print('Arrived!') And naturally you could wrap an existing function that returns a Boolean: async def move_to(dest) -> bool: ... async def set_if_success(event, awaitable): result = await awaitable if result: event.set() async def main(): dest = ... done_event = asyncio.Event() create_task(set_if_success(done_event, move_to(dest))) await done_event print('Arrived!')

On 2022-05-15 03:10, Aaron Fink wrote:
I don't see how that could work. It's still just polling, so it could be expensive to check very often. All the callback is returning is True/False, which isn't very helpful for deciding how often it should check. As for your robotics example, you could adjust the frequency of checking according to how far it still has to go and how fast it's getting there - if it won't get anywhere close to the destination in, say, the next second, there's not much point in checking every 1/60 seconds. There can be a minimum frequency of checking, and you can increase the frequency as it gets very close to the destination.

On Sun, May 15, 2022 at 03:34:15PM +0100, MRAB wrote:

On 2022-05-14 10:10 p.m., Aaron Fink wrote:
start_moving_to(dest) await lambda: distance_to(dest) <= tolerance
Instead of evaluating an expression you could build a notification chain that you await upon. x = Variable(value=2) y = Variable(value=2) # we create a Future that watches x and y condition = sqrt(x * x + y * y) <= 1 # some other task is launched to do work await condition print("done") Here is an example implementation, import asyncio import math import operator import random from asyncio import Future class Variable(object): def __init__(self, value=None): self._value = value self.listeners = [] @property def value(self): return self._value @value.setter def value(self, value): self._value = value for l in self.listeners: l.update() def __mul__(self, other): return Function(operator.mul, self, other) def __add__(self, other): return Function(operator.add, self, other) def __le__(self, other): return Boolean(operator.le, self, other) def __lt__(self, other): return Boolean(operator.lt, self, other) class Function(Variable): def __init__(self, func, *params): Variable.__init__(self) self.func = func self.params = [] for p in params: if not isinstance(p, Variable): p = Variable(p) self.params.append(p) p.listeners.append(self) self.update() def update(self): self.value = self.func(*[p.value for p in self.params]) class Boolean(Future, Function): def __init__(self, func, *params): Function.__init__(self, func, *params) Future.__init__(self) def update(self): Function.update(self) if self.value and not self.done(): self.set_result(self.value) def __bool__(self): return self._value def sqrt(value): return Function(math.sqrt, value) x = Variable(2) y = Variable(2) condition = sqrt(x * x + y * y) < 1 async def worker(): while not condition: await asyncio.sleep(1) if random.Random().randrange(2): x.value = max(x.value-1, 0) else: y.value = max(y.value-1, 0) print(f"{x.value}, {y.value}") async def watch(): await condition print("watch done") async def main(): loop = asyncio.get_event_loop() t1 = loop.create_task(watch()) t2 = loop.create_task(worker()) await asyncio.wait([t1, t2]) loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()

On Sun, 15 May 2022 at 23:00, Aaron Fink <aaronpfink@gmail.com> wrote:
But the frequency has to be defined somewhere. How does Python know when to check the function? Syntax doesn't remove that question, it only moves it around. What I would recommend is having your own "wait for condition" function that hides that logic away. Something like this (untested): POLL_FREQUENCY = 60 async def poll_until(pred): while not pred(): await asyncio.sleep(1 / POLL_FREQUENCY) Then, if you want to change your polling frequency, it's in one place, and not part of the logic of "are we theeeeere yet" which now only needs to concern itself with positions. A possibly-more-elegant solution might be to design your own event loop with an intrinsic frame rate, always advancing by precisely the same sleep interval and then checking whatever needs to be checked. But that would be a lot more work :) ChrisA

The generally-accepted solution for this task is to set an Event, rather than just returning a Boolean: def move_to(dest, done_event): ... done_event.set() async def main(): dest = ... done_event = asyncio.Event() create_task(move_to(dest, done_event)) await done_event print('Arrived!') And naturally you could wrap an existing function that returns a Boolean: async def move_to(dest) -> bool: ... async def set_if_success(event, awaitable): result = await awaitable if result: event.set() async def main(): dest = ... done_event = asyncio.Event() create_task(set_if_success(done_event, move_to(dest))) await done_event print('Arrived!')

On 2022-05-15 03:10, Aaron Fink wrote:
I don't see how that could work. It's still just polling, so it could be expensive to check very often. All the callback is returning is True/False, which isn't very helpful for deciding how often it should check. As for your robotics example, you could adjust the frequency of checking according to how far it still has to go and how fast it's getting there - if it won't get anywhere close to the destination in, say, the next second, there's not much point in checking every 1/60 seconds. There can be a minimum frequency of checking, and you can increase the frequency as it gets very close to the destination.

On Sun, May 15, 2022 at 03:34:15PM +0100, MRAB wrote:

On 2022-05-14 10:10 p.m., Aaron Fink wrote:
start_moving_to(dest) await lambda: distance_to(dest) <= tolerance
Instead of evaluating an expression you could build a notification chain that you await upon. x = Variable(value=2) y = Variable(value=2) # we create a Future that watches x and y condition = sqrt(x * x + y * y) <= 1 # some other task is launched to do work await condition print("done") Here is an example implementation, import asyncio import math import operator import random from asyncio import Future class Variable(object): def __init__(self, value=None): self._value = value self.listeners = [] @property def value(self): return self._value @value.setter def value(self, value): self._value = value for l in self.listeners: l.update() def __mul__(self, other): return Function(operator.mul, self, other) def __add__(self, other): return Function(operator.add, self, other) def __le__(self, other): return Boolean(operator.le, self, other) def __lt__(self, other): return Boolean(operator.lt, self, other) class Function(Variable): def __init__(self, func, *params): Variable.__init__(self) self.func = func self.params = [] for p in params: if not isinstance(p, Variable): p = Variable(p) self.params.append(p) p.listeners.append(self) self.update() def update(self): self.value = self.func(*[p.value for p in self.params]) class Boolean(Future, Function): def __init__(self, func, *params): Function.__init__(self, func, *params) Future.__init__(self) def update(self): Function.update(self) if self.value and not self.done(): self.set_result(self.value) def __bool__(self): return self._value def sqrt(value): return Function(math.sqrt, value) x = Variable(2) y = Variable(2) condition = sqrt(x * x + y * y) < 1 async def worker(): while not condition: await asyncio.sleep(1) if random.Random().randrange(2): x.value = max(x.value-1, 0) else: y.value = max(y.value-1, 0) print(f"{x.value}, {y.value}") async def watch(): await condition print("watch done") async def main(): loop = asyncio.get_event_loop() t1 = loop.create_task(watch()) t2 = loop.create_task(worker()) await asyncio.wait([t1, t2]) loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
participants (6)
-
Aaron Fink
-
Chris Angelico
-
Greg Werbin
-
Kyle Lahnakoski
-
MRAB
-
Steven D'Aprano