Submitting a job to an asyncio event loop
Hi, I noticed there is currently no standard solution to submit a job from a thread to an asyncio event loop. Here's what the asyncio documentation says about concurrency and multithreading:
To schedule a callback from a different thread, the BaseEventLoop.call_soon_threadsafe() method should be used. Example to schedule a coroutine from a different thread: loop.call_soon_threadsafe(asyncio.async, coro_func())
The issue with this method is the loss of the coroutine result. One way to deal with this issue is to connect the asyncio.Future returned by async (or ensure_future) to a concurrent.futures.Future. It is then possible to use a subclass of concurrent.futures.Executor to submit a callback to an asyncio event loop. Such an executor can also be used to set up communication between two event loops using run_in_executor. I posted an implementation called LoopExecutor on GitHub: https://github.com/vxgmichel/asyncio-loopexecutor The repo contains the loopexecutor module along with tests for several use cases. The README describes the whole thing (context, examples, issues, implementation). It is interesting to note that this executor is a bit different than ThreadPoolExecutor and ProcessPoolExecutor since it can also submit a coroutine function. Example: with LoopExecutor(loop) as executor: future = executor.submit(operator.add, 1, 2) assert future.result() == 3 future = executor.submit(asyncio.sleep, 0.1, result=3) assert future.result() == 3 This works in both cases because submit always cast the given function to a coroutine. That means it would also work with a function that returns a Future. Here's a few topic related to the current implementation that might be interesting to discuss: - possible drawback of casting the callback to a coroutine - possible drawback of concurrent.future.Future using asyncio.Future._copy_state - does LoopExecutor need to implement the shutdown method? - removing the limitation in run_in_executor (can't submit a coroutine function) - adding a generic Future connection function in asyncio - reimplementing wrap_future with the generic connection - adding LoopExecutor to asyncio (or concurrent.futures) At the moment, the interaction between asyncio and concurrent.futures only goes one way. It would be nice to have a standard solution (LoopExecutor or something else) to make it bidirectional. Thanks, Vincent
Hi Vincent, I've read your write-up with interest. You're right that it's a bit awkward to make calls from the threaded world into the asyncio world. Interestingly, there's much better support for passing work off from the asyncio event loop to a thread (run_in_executor()). Perhaps that's because the use case there was obvious from the start: some things that may block for I/O just don't have an async interface yet, so in order to use them from an asyncio task they must be off-loaded to a separate thread or else the entire event loop is blocked. (This is used for calling getaddrinfo(), for example.) I'm curious where you have encountered the opposite use case? I think if I had to do this myself I would go for a more minimalist interface: something like your submit() method but without the call to asyncio.coroutine(fn). Having the caller pass in the already-called coroutine object might simplify the signature even further. I'm not sure I see the advantage of trying to make this an executor -- but perhaps I'm missing something? --Guido On Sat, Sep 26, 2015 at 7:29 AM, Vincent Michel <vxgmichel@gmail.com> wrote:
Hi,
I noticed there is currently no standard solution to submit a job from a thread to an asyncio event loop.
Here's what the asyncio documentation says about concurrency and multithreading:
To schedule a callback from a different thread, the BaseEventLoop.call_soon_threadsafe() method should be used. Example to schedule a coroutine from a different thread: loop.call_soon_threadsafe(asyncio.async, coro_func())
The issue with this method is the loss of the coroutine result.
One way to deal with this issue is to connect the asyncio.Future returned by async (or ensure_future) to a concurrent.futures.Future. It is then possible to use a subclass of concurrent.futures.Executor to submit a callback to an asyncio event loop. Such an executor can also be used to set up communication between two event loops using run_in_executor.
I posted an implementation called LoopExecutor on GitHub: https://github.com/vxgmichel/asyncio-loopexecutor The repo contains the loopexecutor module along with tests for several use cases. The README describes the whole thing (context, examples, issues, implementation).
It is interesting to note that this executor is a bit different than ThreadPoolExecutor and ProcessPoolExecutor since it can also submit a coroutine function. Example:
with LoopExecutor(loop) as executor: future = executor.submit(operator.add, 1, 2) assert future.result() == 3 future = executor.submit(asyncio.sleep, 0.1, result=3) assert future.result() == 3
This works in both cases because submit always cast the given function to a coroutine. That means it would also work with a function that returns a Future.
Here's a few topic related to the current implementation that might be interesting to discuss:
- possible drawback of casting the callback to a coroutine - possible drawback of concurrent.future.Future using asyncio.Future._copy_state - does LoopExecutor need to implement the shutdown method? - removing the limitation in run_in_executor (can't submit a coroutine function) - adding a generic Future connection function in asyncio - reimplementing wrap_future with the generic connection - adding LoopExecutor to asyncio (or concurrent.futures)
At the moment, the interaction between asyncio and concurrent.futures only goes one way. It would be nice to have a standard solution (LoopExecutor or something else) to make it bidirectional.
Thanks,
Vincent
_______________________________________________ Python-ideas mailing list Python-ideas@python.org https://mail.python.org/mailman/listinfo/python-ideas Code of Conduct: http://python.org/psf/codeofconduct/
-- --Guido van Rossum (python.org/~guido)
Hi Guido, Thanks for your interest, I work for a synchrotron and we use the distributed control system TANGO. The main implementation is in C++, but we use a python binding called PyTango. The current server implementation (on the C++ side) does not feature an event loop but instead create a different thread for each client. TANGO: http://www.tango-controls.org/ PyTango: http://www.esrf.eu/computing/cs/tango/tango_doc/kernel_doc/pytango/latest/in... I wanted to add asyncio support to the library, so that we can benefit from single-threaded asynchronous programming. The problem is that client callbacks run in different threads and there is not much we can do about it until a pure python implementation is developed (and it's a lot of work). Instead, it is possible to use an asyncio event loop, run the server through run_in_executor (juste like you mentioned in your mail), and redirect all the client callbacks to the event loop. That's the part where job submission from a different thread comes in handy. A very similar solution has been developed using gevent, but I like explicit coroutines better :p Another use case is the communication between two event loops. From what I've seen, the current context (get/set event loop) is only related to the current thread. It makes it easy to run different event loops in different threads. Even though I'm not sure what the use case is, I suppose it's been done intentionally. Then the executor interface is useful to run things like: executor = LoopExecutor(other_loop) result = await my_loop.run_in_executor(executor, coro_func, *args) There is working example in the test directory: https://github.com/vxgmichel/asyncio-loopexecutor/blob/master/test/test_mult... *** The coroutine(fn) cast only makes sense if a subclass of Executor is used, in order to be consistent with the Executor.submit signature. Otherwise, passing an already-called coroutine is perfectly fine. I think it is a good idea to define a simple submit function like you recommended: def submit_to_loop(loop, coro): future = concurrent.futures.Future() callback = partial(schedule, coro, destination=future) loop.call_soon_threadsafe(callback) return future And then use the executor interface if we realize it is actually useful. It's really not a lot of code anyway: class LoopExecutor(concurrent.futures.Executor): def __init__(self, loop=None): self.loop = loop or asyncio.get_event_loop() def submit(self, fn, *args, **kwargs): coro = asyncio.coroutine(fn)(*args, **kwargs) return submit_to_loop(self.loop, coro) I'll update the repository. Cheers, Vincent 2015-09-27 4:52 GMT+02:00 Guido van Rossum <guido@python.org>:
Hi Vincent,
I've read your write-up with interest. You're right that it's a bit awkward to make calls from the threaded world into the asyncio world. Interestingly, there's much better support for passing work off from the asyncio event loop to a thread (run_in_executor()). Perhaps that's because the use case there was obvious from the start: some things that may block for I/O just don't have an async interface yet, so in order to use them from an asyncio task they must be off-loaded to a separate thread or else the entire event loop is blocked. (This is used for calling getaddrinfo(), for example.)
I'm curious where you have encountered the opposite use case?
I think if I had to do this myself I would go for a more minimalist interface: something like your submit() method but without the call to asyncio.coroutine(fn). Having the caller pass in the already-called coroutine object might simplify the signature even further. I'm not sure I see the advantage of trying to make this an executor -- but perhaps I'm missing something?
--Guido
On Sat, Sep 26, 2015 at 7:29 AM, Vincent Michel <vxgmichel@gmail.com> wrote:
Hi,
I noticed there is currently no standard solution to submit a job from a thread to an asyncio event loop.
Here's what the asyncio documentation says about concurrency and multithreading:
To schedule a callback from a different thread, the BaseEventLoop.call_soon_threadsafe() method should be used. Example to schedule a coroutine from a different thread: loop.call_soon_threadsafe(asyncio.async, coro_func())
The issue with this method is the loss of the coroutine result.
One way to deal with this issue is to connect the asyncio.Future returned by async (or ensure_future) to a concurrent.futures.Future. It is then possible to use a subclass of concurrent.futures.Executor to submit a callback to an asyncio event loop. Such an executor can also be used to set up communication between two event loops using run_in_executor.
I posted an implementation called LoopExecutor on GitHub: https://github.com/vxgmichel/asyncio-loopexecutor The repo contains the loopexecutor module along with tests for several use cases. The README describes the whole thing (context, examples, issues, implementation).
It is interesting to note that this executor is a bit different than ThreadPoolExecutor and ProcessPoolExecutor since it can also submit a coroutine function. Example:
with LoopExecutor(loop) as executor: future = executor.submit(operator.add, 1, 2) assert future.result() == 3 future = executor.submit(asyncio.sleep, 0.1, result=3) assert future.result() == 3
This works in both cases because submit always cast the given function to a coroutine. That means it would also work with a function that returns a Future.
Here's a few topic related to the current implementation that might be interesting to discuss:
- possible drawback of casting the callback to a coroutine - possible drawback of concurrent.future.Future using asyncio.Future._copy_state - does LoopExecutor need to implement the shutdown method? - removing the limitation in run_in_executor (can't submit a coroutine function) - adding a generic Future connection function in asyncio - reimplementing wrap_future with the generic connection - adding LoopExecutor to asyncio (or concurrent.futures)
At the moment, the interaction between asyncio and concurrent.futures only goes one way. It would be nice to have a standard solution (LoopExecutor or something else) to make it bidirectional.
Thanks,
Vincent
_______________________________________________ Python-ideas mailing list Python-ideas@python.org https://mail.python.org/mailman/listinfo/python-ideas Code of Conduct: http://python.org/psf/codeofconduct/
-- --Guido van Rossum (python.org/~guido)
OK, I think I understand your primary use case -- the C++ library calls callbacks in their own threads but you want the callback code to run in your event loop, where presumably it is structured as a coroutine and may use `yield from` or `await` to wait for other coroutines, tasks or futures. Then when that coroutine is done it returns a value which your machinery passes back as the result of a concurrent.futures.Future on which the callback thread is waiting. I don't think the use case involving multiple event loops in different threads is as clear. I am still waiting for someone who is actually trying to use this. It might be useful on a system where there is a system event loop that must be used for UI events (assuming this event loop can somehow be wrapped in a custom asyncio loop) and where an app might want to have a standard asyncio event loop for network I/O. Come to think of it, the ProactorEventLoop on Windows has both advantages and disadvantages, and some app might need to use both that and SelectorEventLoop. But this is a real pain (because you can't share any mutable state between event loops). On Sun, Sep 27, 2015 at 6:36 AM, Vincent Michel <vxgmichel@gmail.com> wrote:
Hi Guido,
Thanks for your interest,
I work for a synchrotron and we use the distributed control system TANGO. The main implementation is in C++, but we use a python binding called PyTango. The current server implementation (on the C++ side) does not feature an event loop but instead create a different thread for each client.
TANGO: http://www.tango-controls.org/ PyTango: http://www.esrf.eu/computing/cs/tango/tango_doc/kernel_doc/pytango/latest/in...
I wanted to add asyncio support to the library, so that we can benefit from single-threaded asynchronous programming. The problem is that client callbacks run in different threads and there is not much we can do about it until a pure python implementation is developed (and it's a lot of work). Instead, it is possible to use an asyncio event loop, run the server through run_in_executor (juste like you mentioned in your mail), and redirect all the client callbacks to the event loop. That's the part where job submission from a different thread comes in handy.
A very similar solution has been developed using gevent, but I like explicit coroutines better :p
Another use case is the communication between two event loops. From what I've seen, the current context (get/set event loop) is only related to the current thread. It makes it easy to run different event loops in different threads. Even though I'm not sure what the use case is, I suppose it's been done intentionally. Then the executor interface is useful to run things like:
executor = LoopExecutor(other_loop) result = await my_loop.run_in_executor(executor, coro_func, *args)
There is working example in the test directory:
https://github.com/vxgmichel/asyncio-loopexecutor/blob/master/test/test_mult...
***
The coroutine(fn) cast only makes sense if a subclass of Executor is used, in order to be consistent with the Executor.submit signature. Otherwise, passing an already-called coroutine is perfectly fine. I think it is a good idea to define a simple submit function like you recommended:
def submit_to_loop(loop, coro): future = concurrent.futures.Future() callback = partial(schedule, coro, destination=future) loop.call_soon_threadsafe(callback) return future
And then use the executor interface if we realize it is actually useful. It's really not a lot of code anyway:
class LoopExecutor(concurrent.futures.Executor):
def __init__(self, loop=None): self.loop = loop or asyncio.get_event_loop()
def submit(self, fn, *args, **kwargs): coro = asyncio.coroutine(fn)(*args, **kwargs) return submit_to_loop(self.loop, coro)
I'll update the repository.
Cheers,
Vincent
2015-09-27 4:52 GMT+02:00 Guido van Rossum <guido@python.org>:
Hi Vincent,
I've read your write-up with interest. You're right that it's a bit
awkward to make calls from the threaded world into the asyncio world. Interestingly, there's much better support for passing work off from the asyncio event loop to a thread (run_in_executor()). Perhaps that's because the use case there was obvious from the start: some things that may block for I/O just don't have an async interface yet, so in order to use them from an asyncio task they must be off-loaded to a separate thread or else the entire event loop is blocked. (This is used for calling getaddrinfo(), for example.)
I'm curious where you have encountered the opposite use case?
I think if I had to do this myself I would go for a more minimalist
interface: something like your submit() method but without the call to asyncio.coroutine(fn). Having the caller pass in the already-called coroutine object might simplify the signature even further. I'm not sure I see the advantage of trying to make this an executor -- but perhaps I'm missing something?
--Guido
On Sat, Sep 26, 2015 at 7:29 AM, Vincent Michel <vxgmichel@gmail.com>
Hi,
I noticed there is currently no standard solution to submit a job from
a thread to an asyncio event loop.
Here's what the asyncio documentation says about concurrency and
multithreading:
To schedule a callback from a different thread, the
BaseEventLoop.call_soon_threadsafe() method should be used.
Example to schedule a coroutine from a different thread: loop.call_soon_threadsafe(asyncio.async, coro_func())
The issue with this method is the loss of the coroutine result.
One way to deal with this issue is to connect the asyncio.Future returned by async (or ensure_future) to a concurrent.futures.Future. It is
wrote: then possible to use a subclass of concurrent.futures.Executor to submit a callback to an asyncio event loop. Such an executor can also be used to set up communication between two event loops using run_in_executor.
I posted an implementation called LoopExecutor on GitHub: https://github.com/vxgmichel/asyncio-loopexecutor The repo contains the loopexecutor module along with tests for several
use cases. The README describes the whole thing (context, examples, issues, implementation).
It is interesting to note that this executor is a bit different than
ThreadPoolExecutor and ProcessPoolExecutor since it can also submit a coroutine function. Example:
with LoopExecutor(loop) as executor: future = executor.submit(operator.add, 1, 2) assert future.result() == 3 future = executor.submit(asyncio.sleep, 0.1, result=3) assert future.result() == 3
This works in both cases because submit always cast the given function
to a coroutine. That means it would also work with a function that returns a Future.
Here's a few topic related to the current implementation that might be
interesting to discuss:
- possible drawback of casting the callback to a coroutine - possible drawback of concurrent.future.Future using
asyncio.Future._copy_state
- does LoopExecutor need to implement the shutdown method? - removing the limitation in run_in_executor (can't submit a coroutine function) - adding a generic Future connection function in asyncio - reimplementing wrap_future with the generic connection - adding LoopExecutor to asyncio (or concurrent.futures)
At the moment, the interaction between asyncio and concurrent.futures only goes one way. It would be nice to have a standard solution (LoopExecutor or something else) to make it bidirectional.
Thanks,
Vincent
_______________________________________________ Python-ideas mailing list Python-ideas@python.org https://mail.python.org/mailman/listinfo/python-ideas Code of Conduct: http://python.org/psf/codeofconduct/
-- --Guido van Rossum (python.org/~guido)
-- --Guido van Rossum (python.org/~guido)
Yes that's exactly it. No problem for the multiple event loops, it was a fun thing to play with. Then there's probably no reason to have a loop executor either. I think the important part is really the interface between asyncio futures and concurrent futures, since it is not trivial to write and maintain. In particular, getting exceptions and cancellation to work safely can be a bit tricky. 2015-09-27 18:42 GMT+02:00 Guido van Rossum <guido@python.org>:
OK, I think I understand your primary use case -- the C++ library calls callbacks in their own threads but you want the callback code to run in your event loop, where presumably it is structured as a coroutine and may use `yield from` or `await` to wait for other coroutines, tasks or futures. Then when that coroutine is done it returns a value which your machinery passes back as the result of a concurrent.futures.Future on which the callback thread is waiting.
I don't think the use case involving multiple event loops in different threads is as clear. I am still waiting for someone who is actually trying to use this. It might be useful on a system where there is a system event loop that must be used for UI events (assuming this event loop can somehow be wrapped in a custom asyncio loop) and where an app might want to have a standard asyncio event loop for network I/O. Come to think of it, the ProactorEventLoop on Windows has both advantages and disadvantages, and some app might need to use both that and SelectorEventLoop. But this is a real pain (because you can't share any mutable state between event loops).
On Sun, Sep 27, 2015 at 6:36 AM, Vincent Michel <vxgmichel@gmail.com> wrote:
Hi Guido,
Thanks for your interest,
I work for a synchrotron and we use the distributed control system TANGO. The main implementation is in C++, but we use a python binding called PyTango. The current server implementation (on the C++ side) does not feature an event loop but instead create a different thread for each client.
TANGO: http://www.tango-controls.org/ PyTango: http://www.esrf.eu/computing/cs/tango/tango_doc/kernel_doc/pytango/latest/in...
I wanted to add asyncio support to the library, so that we can benefit from single-threaded asynchronous programming. The problem is that client callbacks run in different threads and there is not much we can do about it until a pure python implementation is developed (and it's a lot of work). Instead, it is possible to use an asyncio event loop, run the server through run_in_executor (juste like you mentioned in your mail), and redirect all the client callbacks to the event loop. That's the part where job submission from a different thread comes in handy.
A very similar solution has been developed using gevent, but I like explicit coroutines better :p
Another use case is the communication between two event loops. From what I've seen, the current context (get/set event loop) is only related to the current thread. It makes it easy to run different event loops in different threads. Even though I'm not sure what the use case is, I suppose it's been done intentionally. Then the executor interface is useful to run things like:
executor = LoopExecutor(other_loop) result = await my_loop.run_in_executor(executor, coro_func, *args)
There is working example in the test directory:
https://github.com/vxgmichel/asyncio-loopexecutor/blob/master/test/test_mult...
***
The coroutine(fn) cast only makes sense if a subclass of Executor is used, in order to be consistent with the Executor.submit signature. Otherwise, passing an already-called coroutine is perfectly fine. I think it is a good idea to define a simple submit function like you recommended:
def submit_to_loop(loop, coro): future = concurrent.futures.Future() callback = partial(schedule, coro, destination=future) loop.call_soon_threadsafe(callback) return future
And then use the executor interface if we realize it is actually useful. It's really not a lot of code anyway:
class LoopExecutor(concurrent.futures.Executor):
def __init__(self, loop=None): self.loop = loop or asyncio.get_event_loop()
def submit(self, fn, *args, **kwargs): coro = asyncio.coroutine(fn)(*args, **kwargs) return submit_to_loop(self.loop, coro)
I'll update the repository.
Cheers,
Vincent
2015-09-27 4:52 GMT+02:00 Guido van Rossum <guido@python.org>:
Hi Vincent,
I've read your write-up with interest. You're right that it's a bit awkward to make calls from the threaded world into the asyncio world. Interestingly, there's much better support for passing work off from the asyncio event loop to a thread (run_in_executor()). Perhaps that's because the use case there was obvious from the start: some things that may block for I/O just don't have an async interface yet, so in order to use them from an asyncio task they must be off-loaded to a separate thread or else the entire event loop is blocked. (This is used for calling getaddrinfo(), for example.)
I'm curious where you have encountered the opposite use case?
I think if I had to do this myself I would go for a more minimalist interface: something like your submit() method but without the call to asyncio.coroutine(fn). Having the caller pass in the already-called coroutine object might simplify the signature even further. I'm not sure I see the advantage of trying to make this an executor -- but perhaps I'm missing something?
--Guido
On Sat, Sep 26, 2015 at 7:29 AM, Vincent Michel <vxgmichel@gmail.com> wrote:
Hi,
I noticed there is currently no standard solution to submit a job from a thread to an asyncio event loop.
Here's what the asyncio documentation says about concurrency and multithreading:
To schedule a callback from a different thread, the BaseEventLoop.call_soon_threadsafe() method should be used. Example to schedule a coroutine from a different thread: loop.call_soon_threadsafe(asyncio.async, coro_func())
The issue with this method is the loss of the coroutine result.
One way to deal with this issue is to connect the asyncio.Future returned by async (or ensure_future) to a concurrent.futures.Future. It is then possible to use a subclass of concurrent.futures.Executor to submit a callback to an asyncio event loop. Such an executor can also be used to set up communication between two event loops using run_in_executor.
I posted an implementation called LoopExecutor on GitHub: https://github.com/vxgmichel/asyncio-loopexecutor The repo contains the loopexecutor module along with tests for several use cases. The README describes the whole thing (context, examples, issues, implementation).
It is interesting to note that this executor is a bit different than ThreadPoolExecutor and ProcessPoolExecutor since it can also submit a coroutine function. Example:
with LoopExecutor(loop) as executor: future = executor.submit(operator.add, 1, 2) assert future.result() == 3 future = executor.submit(asyncio.sleep, 0.1, result=3) assert future.result() == 3
This works in both cases because submit always cast the given function to a coroutine. That means it would also work with a function that returns a Future.
Here's a few topic related to the current implementation that might be interesting to discuss:
- possible drawback of casting the callback to a coroutine - possible drawback of concurrent.future.Future using asyncio.Future._copy_state - does LoopExecutor need to implement the shutdown method? - removing the limitation in run_in_executor (can't submit a coroutine function) - adding a generic Future connection function in asyncio - reimplementing wrap_future with the generic connection - adding LoopExecutor to asyncio (or concurrent.futures)
At the moment, the interaction between asyncio and concurrent.futures only goes one way. It would be nice to have a standard solution (LoopExecutor or something else) to make it bidirectional.
Thanks,
Vincent
_______________________________________________ Python-ideas mailing list Python-ideas@python.org https://mail.python.org/mailman/listinfo/python-ideas Code of Conduct: http://python.org/psf/codeofconduct/
-- --Guido van Rossum (python.org/~guido)
-- --Guido van Rossum (python.org/~guido)
Do you want to propose a minimal patch to asyncio? A PR for https://github.com/python/asyncio would be the best thing to do. I'd leave the LoopExecutor out of it for now. The code could probably live at the bottom of futures.py. On Sun, Sep 27, 2015 at 1:29 PM, Vincent Michel <vxgmichel@gmail.com> wrote:
Yes that's exactly it. No problem for the multiple event loops, it was a fun thing to play with. Then there's probably no reason to have a loop executor either.
I think the important part is really the interface between asyncio futures and concurrent futures, since it is not trivial to write and maintain. In particular, getting exceptions and cancellation to work safely can be a bit tricky.
OK, I think I understand your primary use case -- the C++ library calls callbacks in their own threads but you want the callback code to run in your event loop, where presumably it is structured as a coroutine and may use `yield from` or `await` to wait for other coroutines, tasks or futures. Then when that coroutine is done it returns a value which your machinery
back as the result of a concurrent.futures.Future on which the callback thread is waiting.
I don't think the use case involving multiple event loops in different threads is as clear. I am still waiting for someone who is actually
to use this. It might be useful on a system where there is a system event loop that must be used for UI events (assuming this event loop can somehow be wrapped in a custom asyncio loop) and where an app might want to have a standard asyncio event loop for network I/O. Come to think of it, the ProactorEventLoop on Windows has both advantages and disadvantages, and some app might need to use both that and SelectorEventLoop. But this is a real pain (because you can't share any mutable state between event loops).
On Sun, Sep 27, 2015 at 6:36 AM, Vincent Michel <vxgmichel@gmail.com> wrote:
Hi Guido,
Thanks for your interest,
I work for a synchrotron and we use the distributed control system TANGO. The main implementation is in C++, but we use a python binding called PyTango. The current server implementation (on the C++ side) does not feature an event loop but instead create a different thread for each client.
TANGO: http://www.tango-controls.org/ PyTango:
http://www.esrf.eu/computing/cs/tango/tango_doc/kernel_doc/pytango/latest/in...
I wanted to add asyncio support to the library, so that we can benefit from single-threaded asynchronous programming. The problem is that client callbacks run in different threads and there is not much we can do about it until a pure python implementation is developed (and it's a lot of work). Instead, it is possible to use an asyncio event loop, run the server through run_in_executor (juste like you mentioned in your mail), and redirect all the client callbacks to the event loop. That's the part where job submission from a different thread comes in handy.
A very similar solution has been developed using gevent, but I like explicit coroutines better :p
Another use case is the communication between two event loops. From what I've seen, the current context (get/set event loop) is only related to the current thread. It makes it easy to run different event loops in different threads. Even though I'm not sure what the use case is, I suppose it's been done intentionally. Then the executor interface is useful to run things like:
executor = LoopExecutor(other_loop) result = await my_loop.run_in_executor(executor, coro_func, *args)
There is working example in the test directory:
https://github.com/vxgmichel/asyncio-loopexecutor/blob/master/test/test_mult...
***
The coroutine(fn) cast only makes sense if a subclass of Executor is used, in order to be consistent with the Executor.submit signature. Otherwise, passing an already-called coroutine is perfectly fine. I think it is a good idea to define a simple submit function like you recommended:
def submit_to_loop(loop, coro): future = concurrent.futures.Future() callback = partial(schedule, coro, destination=future) loop.call_soon_threadsafe(callback) return future
And then use the executor interface if we realize it is actually useful. It's really not a lot of code anyway:
class LoopExecutor(concurrent.futures.Executor):
def __init__(self, loop=None): self.loop = loop or asyncio.get_event_loop()
def submit(self, fn, *args, **kwargs): coro = asyncio.coroutine(fn)(*args, **kwargs) return submit_to_loop(self.loop, coro)
I'll update the repository.
Cheers,
Vincent
2015-09-27 4:52 GMT+02:00 Guido van Rossum <guido@python.org>:
Hi Vincent,
I've read your write-up with interest. You're right that it's a bit awkward to make calls from the threaded world into the asyncio world. Interestingly, there's much better support for passing work off from
asyncio event loop to a thread (run_in_executor()). Perhaps that's because the use case there was obvious from the start: some things that may block for I/O just don't have an async interface yet, so in order to use
an asyncio task they must be off-loaded to a separate thread or else
2015-09-27 18:42 GMT+02:00 Guido van Rossum <guido@python.org>: passes trying the them from the
entire event loop is blocked. (This is used for calling getaddrinfo(), for example.)
I'm curious where you have encountered the opposite use case?
I think if I had to do this myself I would go for a more minimalist interface: something like your submit() method but without the call to asyncio.coroutine(fn). Having the caller pass in the already-called coroutine object might simplify the signature even further. I'm not sure I see the advantage of trying to make this an executor -- but perhaps I'm missing something?
--Guido
On Sat, Sep 26, 2015 at 7:29 AM, Vincent Michel <vxgmichel@gmail.com> wrote:
Hi,
I noticed there is currently no standard solution to submit a job
from
a thread to an asyncio event loop.
Here's what the asyncio documentation says about concurrency and multithreading:
To schedule a callback from a different thread, the BaseEventLoop.call_soon_threadsafe() method should be used. Example to schedule a coroutine from a different thread: loop.call_soon_threadsafe(asyncio.async, coro_func())
The issue with this method is the loss of the coroutine result.
One way to deal with this issue is to connect the asyncio.Future returned by async (or ensure_future) to a concurrent.futures.Future. It is then possible to use a subclass of concurrent.futures.Executor to submit a callback to an asyncio event loop. Such an executor can also be used to set up communication between two event loops using run_in_executor.
I posted an implementation called LoopExecutor on GitHub: https://github.com/vxgmichel/asyncio-loopexecutor The repo contains the loopexecutor module along with tests for several use cases. The README describes the whole thing (context, examples, issues, implementation).
It is interesting to note that this executor is a bit different than ThreadPoolExecutor and ProcessPoolExecutor since it can also submit a coroutine function. Example:
with LoopExecutor(loop) as executor: future = executor.submit(operator.add, 1, 2) assert future.result() == 3 future = executor.submit(asyncio.sleep, 0.1, result=3) assert future.result() == 3
This works in both cases because submit always cast the given function to a coroutine. That means it would also work with a function that returns a Future.
Here's a few topic related to the current implementation that might be interesting to discuss:
- possible drawback of casting the callback to a coroutine - possible drawback of concurrent.future.Future using asyncio.Future._copy_state - does LoopExecutor need to implement the shutdown method? - removing the limitation in run_in_executor (can't submit a coroutine function) - adding a generic Future connection function in asyncio - reimplementing wrap_future with the generic connection - adding LoopExecutor to asyncio (or concurrent.futures)
At the moment, the interaction between asyncio and concurrent.futures only goes one way. It would be nice to have a standard solution (LoopExecutor or something else) to make it bidirectional.
Thanks,
Vincent
_______________________________________________ Python-ideas mailing list Python-ideas@python.org https://mail.python.org/mailman/listinfo/python-ideas Code of Conduct: http://python.org/psf/codeofconduct/
-- --Guido van Rossum (python.org/~guido)
-- --Guido van Rossum (python.org/~guido)
-- --Guido van Rossum (python.org/~guido)
Great, I'll do that! 2015-09-27 22:39 GMT+02:00 Guido van Rossum <guido@python.org>:
Do you want to propose a minimal patch to asyncio? A PR for https://github.com/python/asyncio would be the best thing to do. I'd leave the LoopExecutor out of it for now. The code could probably live at the bottom of futures.py.
On Sun, Sep 27, 2015 at 1:29 PM, Vincent Michel <vxgmichel@gmail.com> wrote:
Yes that's exactly it. No problem for the multiple event loops, it was a fun thing to play with. Then there's probably no reason to have a loop executor either.
I think the important part is really the interface between asyncio futures and concurrent futures, since it is not trivial to write and maintain. In particular, getting exceptions and cancellation to work safely can be a bit tricky.
2015-09-27 18:42 GMT+02:00 Guido van Rossum <guido@python.org>:
OK, I think I understand your primary use case -- the C++ library calls callbacks in their own threads but you want the callback code to run in your event loop, where presumably it is structured as a coroutine and may use `yield from` or `await` to wait for other coroutines, tasks or futures. Then when that coroutine is done it returns a value which your machinery passes back as the result of a concurrent.futures.Future on which the callback thread is waiting.
I don't think the use case involving multiple event loops in different threads is as clear. I am still waiting for someone who is actually trying to use this. It might be useful on a system where there is a system event loop that must be used for UI events (assuming this event loop can somehow be wrapped in a custom asyncio loop) and where an app might want to have a standard asyncio event loop for network I/O. Come to think of it, the ProactorEventLoop on Windows has both advantages and disadvantages, and some app might need to use both that and SelectorEventLoop. But this is a real pain (because you can't share any mutable state between event loops).
On Sun, Sep 27, 2015 at 6:36 AM, Vincent Michel <vxgmichel@gmail.com> wrote:
Hi Guido,
Thanks for your interest,
I work for a synchrotron and we use the distributed control system TANGO. The main implementation is in C++, but we use a python binding called PyTango. The current server implementation (on the C++ side) does not feature an event loop but instead create a different thread for each client.
TANGO: http://www.tango-controls.org/ PyTango:
http://www.esrf.eu/computing/cs/tango/tango_doc/kernel_doc/pytango/latest/in...
I wanted to add asyncio support to the library, so that we can benefit from single-threaded asynchronous programming. The problem is that client callbacks run in different threads and there is not much we can do about it until a pure python implementation is developed (and it's a lot of work). Instead, it is possible to use an asyncio event loop, run the server through run_in_executor (juste like you mentioned in your mail), and redirect all the client callbacks to the event loop. That's the part where job submission from a different thread comes in handy.
A very similar solution has been developed using gevent, but I like explicit coroutines better :p
Another use case is the communication between two event loops. From what I've seen, the current context (get/set event loop) is only related to the current thread. It makes it easy to run different event loops in different threads. Even though I'm not sure what the use case is, I suppose it's been done intentionally. Then the executor interface is useful to run things like:
executor = LoopExecutor(other_loop) result = await my_loop.run_in_executor(executor, coro_func, *args)
There is working example in the test directory:
https://github.com/vxgmichel/asyncio-loopexecutor/blob/master/test/test_mult...
***
The coroutine(fn) cast only makes sense if a subclass of Executor is used, in order to be consistent with the Executor.submit signature. Otherwise, passing an already-called coroutine is perfectly fine. I think it is a good idea to define a simple submit function like you recommended:
def submit_to_loop(loop, coro): future = concurrent.futures.Future() callback = partial(schedule, coro, destination=future) loop.call_soon_threadsafe(callback) return future
And then use the executor interface if we realize it is actually useful. It's really not a lot of code anyway:
class LoopExecutor(concurrent.futures.Executor):
def __init__(self, loop=None): self.loop = loop or asyncio.get_event_loop()
def submit(self, fn, *args, **kwargs): coro = asyncio.coroutine(fn)(*args, **kwargs) return submit_to_loop(self.loop, coro)
I'll update the repository.
Cheers,
Vincent
2015-09-27 4:52 GMT+02:00 Guido van Rossum <guido@python.org>:
Hi Vincent,
I've read your write-up with interest. You're right that it's a bit awkward to make calls from the threaded world into the asyncio world. Interestingly, there's much better support for passing work off from the asyncio event loop to a thread (run_in_executor()). Perhaps that's because the use case there was obvious from the start: some things that may block for I/O just don't have an async interface yet, so in order to use them from an asyncio task they must be off-loaded to a separate thread or else the entire event loop is blocked. (This is used for calling getaddrinfo(), for example.)
I'm curious where you have encountered the opposite use case?
I think if I had to do this myself I would go for a more minimalist interface: something like your submit() method but without the call to asyncio.coroutine(fn). Having the caller pass in the already-called coroutine object might simplify the signature even further. I'm not sure I see the advantage of trying to make this an executor -- but perhaps I'm missing something?
--Guido
On Sat, Sep 26, 2015 at 7:29 AM, Vincent Michel <vxgmichel@gmail.com> wrote:
Hi,
I noticed there is currently no standard solution to submit a job from a thread to an asyncio event loop.
Here's what the asyncio documentation says about concurrency and multithreading:
> To schedule a callback from a different thread, the > BaseEventLoop.call_soon_threadsafe() method should be used. > Example to schedule a coroutine from a different thread: > loop.call_soon_threadsafe(asyncio.async, coro_func())
The issue with this method is the loss of the coroutine result.
One way to deal with this issue is to connect the asyncio.Future returned by async (or ensure_future) to a concurrent.futures.Future. It is then possible to use a subclass of concurrent.futures.Executor to submit a callback to an asyncio event loop. Such an executor can also be used to set up communication between two event loops using run_in_executor.
I posted an implementation called LoopExecutor on GitHub: https://github.com/vxgmichel/asyncio-loopexecutor The repo contains the loopexecutor module along with tests for several use cases. The README describes the whole thing (context, examples, issues, implementation).
It is interesting to note that this executor is a bit different than ThreadPoolExecutor and ProcessPoolExecutor since it can also submit a coroutine function. Example:
with LoopExecutor(loop) as executor: future = executor.submit(operator.add, 1, 2) assert future.result() == 3 future = executor.submit(asyncio.sleep, 0.1, result=3) assert future.result() == 3
This works in both cases because submit always cast the given function to a coroutine. That means it would also work with a function that returns a Future.
Here's a few topic related to the current implementation that might be interesting to discuss:
- possible drawback of casting the callback to a coroutine - possible drawback of concurrent.future.Future using asyncio.Future._copy_state - does LoopExecutor need to implement the shutdown method? - removing the limitation in run_in_executor (can't submit a coroutine function) - adding a generic Future connection function in asyncio - reimplementing wrap_future with the generic connection - adding LoopExecutor to asyncio (or concurrent.futures)
At the moment, the interaction between asyncio and concurrent.futures only goes one way. It would be nice to have a standard solution (LoopExecutor or something else) to make it bidirectional.
Thanks,
Vincent
_______________________________________________ Python-ideas mailing list Python-ideas@python.org https://mail.python.org/mailman/listinfo/python-ideas Code of Conduct: http://python.org/psf/codeofconduct/
-- --Guido van Rossum (python.org/~guido)
-- --Guido van Rossum (python.org/~guido)
-- --Guido van Rossum (python.org/~guido)
On Sun, Sep 27, 2015 at 11:42 AM, Guido van Rossum <guido@python.org> wrote:
[...]
I don't think the use case involving multiple event loops in different threads is as clear. I am still waiting for someone who is actually trying to use this. It might be useful on a system where there is a system event loop that must be used for UI events (assuming this event loop can somehow be wrapped in a custom asyncio loop) and where an app might want to have a standard asyncio event loop for network I/O. Come to think of it, the ProactorEventLoop on Windows has both advantages and disadvantages, and some app might need to use both that and SelectorEventLoop. But this is a real pain (because you can't share any mutable state between event loops).
I'm not currently solving the problem this way, but I wanted to do something like this recently for a custom Mesos framework. The framework uses a pure-python library called "pesos" that in turn uses a pure-python libprocess library called "compactor". compactor runs user code in a private event loop (Mesos registration, etc). I also wanted to run my own private loop in another thread that interacts with Redis. This loop is expected to process some incoming updates as commands that must influence the compactor loop (start reconciliation or some other Mesos-related thing) and the most straightforward thing to me sounded exactly like this thread: submitting jobs from one loop to another. I haven't really delved into making the Redis part an async loop (it's just threaded right now) as I'm less experienced with writing such code, so maybe I am overlooking and/or conflating things, but seems reasonable. -- C Anthony
participants (3)
-
C Anthony Risinger
-
Guido van Rossum
-
Vincent Michel