[Twisted-Python] Running CPU bound function concurrently in twisted
![](https://secure.gravatar.com/avatar/f1869964cb230491eb5504ff6463f200.jpg?s=120&d=mm&r=g)
Hi, I tried posting on SO, but havent been able to solve the problem. https://stackoverflow.com/questions/56696562/running-long-blocking-calculati... I have a very noob question. I have a function, which takes a second or two to process (CPU bounded) and then there is a for loop which runs that for loop.. Lets take this example def func(x): return x*x res = [func(i) for i in range(20)] Now, think of func being something complicated.. But the above is the idea. How do I run this function concurrently/ simultaneously using twisted framework. Thanks
![](https://secure.gravatar.com/avatar/b21dcd444b9c16ac6bf50cda1999265a.jpg?s=120&d=mm&r=g)
Hi If your purpose is merely not to block the reactor, you can run your code in a separate thread by using deferToThread. If you want to benefit from multi cores, you may consider use https://github.com/twisted/ampoule or other similar tools. Regards gelin yan
![](https://secure.gravatar.com/avatar/f1869964cb230491eb5504ff6463f200.jpg?s=120&d=mm&r=g)
Thanks for pointing that out. Is there a dead simply example on how to parallelize a function using ampoule. Again, I have a function.. I have a for loop.. the function takes in an argument from loop.. but the for loop is extremely parallelizable. Also, I was going thru this awesome twisted tutorial from pycon 2107 ( https://www.youtube.com/watch?v=ztkBG4qLR_4) I found couple of use features. (defer.ensureDeferred and defer.gatherResults ) Can these two be used instead of deferToThread. When are the above defered used and when to use deferToThread. Thanks for entertaining my dumb questions. On Mon, Jun 24, 2019 at 1:28 AM Gelin Yan <dynamicgl@gmail.com> wrote:
![](https://secure.gravatar.com/avatar/f1b9eda8229c1ce71bcef8d6fd5eb804.jpg?s=120&d=mm&r=g)
Hi Cheng, deferToThread *returns* a deferred, and sends the work to a thread pool. The other two functions are useful for manipulating deferreds or other promises, but will not send work to a separate thread, so they cannot be used instead. The simple example is pretty simple, it's pretty much @inlineCallbacks def some_function(): d1 = deferToThread(cpu_heavy_square, 5) d2 = deferToThread(cpu_heavy_square, 6) five_squared, six_squared = yield gatherResults([d1, d2]) returnValue(five_squared + six_squared) Note that the tutorial below can be downloaded in notebook form from https://github.com/moshez/twisted-tutorial/blob/master/Twisted-Tutorial.ipyn... However, as explained, this will *defer to threads*, but will *not* use multiple cores, since Python is single-core. This is not entirely accurate: if your heavy functions are using numpy, numpy will release the GIL sometimes. Otherwise, you might want to use ampoule, but there are no easy tutorials for it that I'm aware of. You might want to consider something like dask + deferToThread for using multiple cores. See https://github.com/dask/dask-tutorial/blob/master/01_dask.delayed.ipynb for a simple example: you just plan your computation, and run the `compute` method in a thread. In this case, the only thing the thread is doing is avoiding blocking the reactor, while dask internally apportions computations. (Thank you for the kind of words about the tutorial -- I'm one of the presenters.) Moshe Z. On Mon, Jun 24, 2019, at 11:34, Chengi Liu wrote:
![](https://secure.gravatar.com/avatar/a93db92c60e9d5435d204407264457f2.jpg?s=120&d=mm&r=g)
As a clarification to the above, parallelization of Python code across cores is not unique to Twisted; all Python code has this same limitation. To use multiple cores with Python code, you need multiple Python processes (as has been pointed out). One way to achieve this is to have the multiple processes talking to each other (using some kind of RPC protocol). Another way is to simply spawn some number of subprocesses (and Twisted has good support for running subprocesses). So, for example, if you write a CLI tool that can be told to run "part of your problem" then your parent Twisted process can simply spawn some number of those with appropriate arguments to split up the problem (e.g. give each process 1 / num_cores of the problem). This will incur some startup penalty as each process starts up (especially if you're using PyPy, which you should be if you care about speed) but is way simpler. Obviously, an RPC-style communication system avoids the startup penalty (but can be more complex). -- meejah
![](https://secure.gravatar.com/avatar/f1869964cb230491eb5504ff6463f200.jpg?s=120&d=mm&r=g)
Thanks Moshe & Meejah & Gelin for the suggestions and advice. This is super helpful. I think, I am able to move forward with this. Let me just summarize this.. My usecase is.. fetch the data.. and then you assemble the data. Fetching data is network bound, assembling data is like CPU bound. Can you guys confirm if what I am doing makes sense? def compute_heavy_function(x): return x*x @defer.inlinecallbacks def network_get(x): x = yield treq.get('http://localhost:8081') content = yield x.content() defer.returnValue(val) @defer.inlinecallbacks def twisted_do_your_magic(): nets, cpus = [], [] for i in range(10): t = defer.ensureDeferred(network_get(i)) nets.append(t) d = threads.deferToThread(compute_heavy_function, i) cpus.append(d) cpu_res = yield defer.gatherResults(cpus) network_res = yield defer.gatherResults(nets) defer.returnValue({'cpu': cpu_res, 'network': network_res}) if __name__ == '__main__': twisted_do_your_magic() reactor.callLater(2, reactor.stop) reactor.run() I ran it locally.. it seems to be running fine. But just want to make sure that I got the concept on what to deferToThread & what to "ensureDeferred".
Moshe.. One last question.. I was trying to follow the tutorial on video lecture.. But, I wasnt able to make it run on python3. Say, I have an async function async def foo(): resp = await treq.get("localhost:1234/foo") content = await resp.content() return json.loads(content.decode("utf-8") async def func(): d1 = defer.ensureDeffered(foo()) d2 = defer.ensureDeffered(foo()) res = await defer.gatherResults([d1, d2]) return res if __name__ == '__main__' x = func() reactor.callLater(2, reactor.stop) reactor.run() In this case, I get an error (x = func() in main code block).. RuntimeWarning: coroutine 'func' was never awaited How do i fix this. Again, thanks for all the help, support and advice in getting me started with twisted. On Mon, Jun 24, 2019 at 3:49 PM meejah <meejah@meejah.ca> wrote:
![](https://secure.gravatar.com/avatar/f1869964cb230491eb5504ff6463f200.jpg?s=120&d=mm&r=g)
The stack overflow link: https://stackoverflow.com/questions/56696562/running-long-blocking-calculati... On Mon, Jun 24, 2019 at 7:26 PM Chengi Liu <chengi.liu.86@gmail.com> wrote:
![](https://secure.gravatar.com/avatar/f1b9eda8229c1ce71bcef8d6fd5eb804.jpg?s=120&d=mm&r=g)
On Mon, Jun 24, 2019, at 16:27, Chengi Liu wrote:
cpu_res *= yield *defer.*gatherResults*(cpus)
Remember: This will not block the reactor (Good!) but will still limit you to one CPU core (in general, some caveats, but true.) If you are CPU bound, this is woefully underutilizing modern CPU resources (again, some caveats here).
if __name__ == '__main__':
In general, the best thing to do is to to use twisted.internet.task.react for main. This would also solve the other problem you point out below. https://twistedmatrix.com/documents/current/api/twisted.internet.task.html#r... has a good example.
![](https://secure.gravatar.com/avatar/f1869964cb230491eb5504ff6463f200.jpg?s=120&d=mm&r=g)
I think I got around the second problem (thanks for sharing the react documentation) How do I use more multiple CPU cores? I guess atleast the fact that twisted app wont be blocking is good, but what would be a good way to use multiple cores. I do see that meejah suggested using multiple python processes while setting up the reactor? My question is that.. say.. I set up multiple twisted based python processes by leveraging the spawnProcess.. And then we setup a resource 'foo'.. When the request is hit to `foo` end point... will the parallelization happen here.. say.. I have spawned 2 processes and one request came and hit the 'foo' end point.. I am guessing.. the fact that 2 processes is running would help responding to multiple requests simultaneously. I think that level of parallelization is good and I will take it.. But here.. what I want is... when you hit "foo".. then there is a function that is parallelizable.. Like.. you hit "foo" endpoint.. I fetch data from somewhere.. (yay treq).. and then there is a function that is called when we hit "foo" end point and that function is extremely parallelizable over for loop. Now, what is great is that with all your help, I am able to make this flow non-blocking. But, would definitely like to exploit parallellism wherever feasible. On Mon, Jun 24, 2019 at 7:37 PM Moshe Zadka <moshez@zadka.club> wrote:
![](https://secure.gravatar.com/avatar/f1b9eda8229c1ce71bcef8d6fd5eb804.jpg?s=120&d=mm&r=g)
On Mon, Jun 24, 2019, at 17:24, Chengi Liu wrote:
How do I use more multiple CPU cores? I guess atleast the fact that twisted app wont be blocking is good, but what would be a good way to use multiple cores.
It *really* depends on what is the function that is CPU heavy, and why it is CPU heavy. You have a lot of options here, and they're all subtly different. But I'll start with noting that this is *no longer* a Twisted question: using multiple cores from Python is genuinely a problem for which there is no single good solution, just a number of different ones with trade-offs. As I mentioned, you can use `dask`. I think that might be the simplest solution for you -- dask has a lot of good tutorials, and if your only goal is using multiple CPUs, it has an "auto-setup". Another option is to see if you can use a library that natively releases the Global Interpreter Lock. If, for example, your function is numeric computation heavy, than numpy does release the GIL. You can also use ampoule, though the set-up becomes non-trivial -- you will need to either change your process initialization or maintain a process pool from the main process. Either option can become non-trivial fast. Finally, the option of writing the CPU heavy code in Cython, and then releasing the GIL explicitly around the area it is not needed, exists. To sum up: * Dask * Something that releases the GIL, like numpy * Setting up an ampoule process pool * Move CPU heavy logic to Cython, and release GIL explicitly As I said, none of these options are really Twisted-related (the only one vaguely connected is the ampoule one, but even then, the fact that you'll use Twisted to manage the pool is incidental). There are even more exotic options, of course, but I think this e-mail might already be confusing enough. Moshe Z.
![](https://secure.gravatar.com/avatar/e1554622707bedd9202884900430b838.jpg?s=120&d=mm&r=g)
On Jun 24, 2019, at 4:26 PM, Chengi Liu <chengi.liu.86@gmail.com> wrote:
t = defer.ensureDeferred(network_get(i))
The meaning of "ensureDeferred" is, "ensure [that this thing, which may be a coroutine or Deferred, is a] Deferred". You don't need to use it with things decorated with @inlineCallbacks, only with `async def`; it does nothing in your example. We should really have a more explicit version of this, like a classmethod, Deferred.fromCoroutine(), that makes more sense in terms of what context you need to use it in. (The naming here comes from asyncio.ensure_future, but that does have the somewhat more explicit loop.create_task to pair with it.) -glyph
![](https://secure.gravatar.com/avatar/b21dcd444b9c16ac6bf50cda1999265a.jpg?s=120&d=mm&r=g)
Hi If your purpose is merely not to block the reactor, you can run your code in a separate thread by using deferToThread. If you want to benefit from multi cores, you may consider use https://github.com/twisted/ampoule or other similar tools. Regards gelin yan
![](https://secure.gravatar.com/avatar/f1869964cb230491eb5504ff6463f200.jpg?s=120&d=mm&r=g)
Thanks for pointing that out. Is there a dead simply example on how to parallelize a function using ampoule. Again, I have a function.. I have a for loop.. the function takes in an argument from loop.. but the for loop is extremely parallelizable. Also, I was going thru this awesome twisted tutorial from pycon 2107 ( https://www.youtube.com/watch?v=ztkBG4qLR_4) I found couple of use features. (defer.ensureDeferred and defer.gatherResults ) Can these two be used instead of deferToThread. When are the above defered used and when to use deferToThread. Thanks for entertaining my dumb questions. On Mon, Jun 24, 2019 at 1:28 AM Gelin Yan <dynamicgl@gmail.com> wrote:
![](https://secure.gravatar.com/avatar/f1b9eda8229c1ce71bcef8d6fd5eb804.jpg?s=120&d=mm&r=g)
Hi Cheng, deferToThread *returns* a deferred, and sends the work to a thread pool. The other two functions are useful for manipulating deferreds or other promises, but will not send work to a separate thread, so they cannot be used instead. The simple example is pretty simple, it's pretty much @inlineCallbacks def some_function(): d1 = deferToThread(cpu_heavy_square, 5) d2 = deferToThread(cpu_heavy_square, 6) five_squared, six_squared = yield gatherResults([d1, d2]) returnValue(five_squared + six_squared) Note that the tutorial below can be downloaded in notebook form from https://github.com/moshez/twisted-tutorial/blob/master/Twisted-Tutorial.ipyn... However, as explained, this will *defer to threads*, but will *not* use multiple cores, since Python is single-core. This is not entirely accurate: if your heavy functions are using numpy, numpy will release the GIL sometimes. Otherwise, you might want to use ampoule, but there are no easy tutorials for it that I'm aware of. You might want to consider something like dask + deferToThread for using multiple cores. See https://github.com/dask/dask-tutorial/blob/master/01_dask.delayed.ipynb for a simple example: you just plan your computation, and run the `compute` method in a thread. In this case, the only thing the thread is doing is avoiding blocking the reactor, while dask internally apportions computations. (Thank you for the kind of words about the tutorial -- I'm one of the presenters.) Moshe Z. On Mon, Jun 24, 2019, at 11:34, Chengi Liu wrote:
![](https://secure.gravatar.com/avatar/a93db92c60e9d5435d204407264457f2.jpg?s=120&d=mm&r=g)
As a clarification to the above, parallelization of Python code across cores is not unique to Twisted; all Python code has this same limitation. To use multiple cores with Python code, you need multiple Python processes (as has been pointed out). One way to achieve this is to have the multiple processes talking to each other (using some kind of RPC protocol). Another way is to simply spawn some number of subprocesses (and Twisted has good support for running subprocesses). So, for example, if you write a CLI tool that can be told to run "part of your problem" then your parent Twisted process can simply spawn some number of those with appropriate arguments to split up the problem (e.g. give each process 1 / num_cores of the problem). This will incur some startup penalty as each process starts up (especially if you're using PyPy, which you should be if you care about speed) but is way simpler. Obviously, an RPC-style communication system avoids the startup penalty (but can be more complex). -- meejah
![](https://secure.gravatar.com/avatar/f1869964cb230491eb5504ff6463f200.jpg?s=120&d=mm&r=g)
Thanks Moshe & Meejah & Gelin for the suggestions and advice. This is super helpful. I think, I am able to move forward with this. Let me just summarize this.. My usecase is.. fetch the data.. and then you assemble the data. Fetching data is network bound, assembling data is like CPU bound. Can you guys confirm if what I am doing makes sense? def compute_heavy_function(x): return x*x @defer.inlinecallbacks def network_get(x): x = yield treq.get('http://localhost:8081') content = yield x.content() defer.returnValue(val) @defer.inlinecallbacks def twisted_do_your_magic(): nets, cpus = [], [] for i in range(10): t = defer.ensureDeferred(network_get(i)) nets.append(t) d = threads.deferToThread(compute_heavy_function, i) cpus.append(d) cpu_res = yield defer.gatherResults(cpus) network_res = yield defer.gatherResults(nets) defer.returnValue({'cpu': cpu_res, 'network': network_res}) if __name__ == '__main__': twisted_do_your_magic() reactor.callLater(2, reactor.stop) reactor.run() I ran it locally.. it seems to be running fine. But just want to make sure that I got the concept on what to deferToThread & what to "ensureDeferred".
Moshe.. One last question.. I was trying to follow the tutorial on video lecture.. But, I wasnt able to make it run on python3. Say, I have an async function async def foo(): resp = await treq.get("localhost:1234/foo") content = await resp.content() return json.loads(content.decode("utf-8") async def func(): d1 = defer.ensureDeffered(foo()) d2 = defer.ensureDeffered(foo()) res = await defer.gatherResults([d1, d2]) return res if __name__ == '__main__' x = func() reactor.callLater(2, reactor.stop) reactor.run() In this case, I get an error (x = func() in main code block).. RuntimeWarning: coroutine 'func' was never awaited How do i fix this. Again, thanks for all the help, support and advice in getting me started with twisted. On Mon, Jun 24, 2019 at 3:49 PM meejah <meejah@meejah.ca> wrote:
![](https://secure.gravatar.com/avatar/f1869964cb230491eb5504ff6463f200.jpg?s=120&d=mm&r=g)
The stack overflow link: https://stackoverflow.com/questions/56696562/running-long-blocking-calculati... On Mon, Jun 24, 2019 at 7:26 PM Chengi Liu <chengi.liu.86@gmail.com> wrote:
![](https://secure.gravatar.com/avatar/f1b9eda8229c1ce71bcef8d6fd5eb804.jpg?s=120&d=mm&r=g)
On Mon, Jun 24, 2019, at 16:27, Chengi Liu wrote:
cpu_res *= yield *defer.*gatherResults*(cpus)
Remember: This will not block the reactor (Good!) but will still limit you to one CPU core (in general, some caveats, but true.) If you are CPU bound, this is woefully underutilizing modern CPU resources (again, some caveats here).
if __name__ == '__main__':
In general, the best thing to do is to to use twisted.internet.task.react for main. This would also solve the other problem you point out below. https://twistedmatrix.com/documents/current/api/twisted.internet.task.html#r... has a good example.
![](https://secure.gravatar.com/avatar/f1869964cb230491eb5504ff6463f200.jpg?s=120&d=mm&r=g)
I think I got around the second problem (thanks for sharing the react documentation) How do I use more multiple CPU cores? I guess atleast the fact that twisted app wont be blocking is good, but what would be a good way to use multiple cores. I do see that meejah suggested using multiple python processes while setting up the reactor? My question is that.. say.. I set up multiple twisted based python processes by leveraging the spawnProcess.. And then we setup a resource 'foo'.. When the request is hit to `foo` end point... will the parallelization happen here.. say.. I have spawned 2 processes and one request came and hit the 'foo' end point.. I am guessing.. the fact that 2 processes is running would help responding to multiple requests simultaneously. I think that level of parallelization is good and I will take it.. But here.. what I want is... when you hit "foo".. then there is a function that is parallelizable.. Like.. you hit "foo" endpoint.. I fetch data from somewhere.. (yay treq).. and then there is a function that is called when we hit "foo" end point and that function is extremely parallelizable over for loop. Now, what is great is that with all your help, I am able to make this flow non-blocking. But, would definitely like to exploit parallellism wherever feasible. On Mon, Jun 24, 2019 at 7:37 PM Moshe Zadka <moshez@zadka.club> wrote:
![](https://secure.gravatar.com/avatar/f1b9eda8229c1ce71bcef8d6fd5eb804.jpg?s=120&d=mm&r=g)
On Mon, Jun 24, 2019, at 17:24, Chengi Liu wrote:
How do I use more multiple CPU cores? I guess atleast the fact that twisted app wont be blocking is good, but what would be a good way to use multiple cores.
It *really* depends on what is the function that is CPU heavy, and why it is CPU heavy. You have a lot of options here, and they're all subtly different. But I'll start with noting that this is *no longer* a Twisted question: using multiple cores from Python is genuinely a problem for which there is no single good solution, just a number of different ones with trade-offs. As I mentioned, you can use `dask`. I think that might be the simplest solution for you -- dask has a lot of good tutorials, and if your only goal is using multiple CPUs, it has an "auto-setup". Another option is to see if you can use a library that natively releases the Global Interpreter Lock. If, for example, your function is numeric computation heavy, than numpy does release the GIL. You can also use ampoule, though the set-up becomes non-trivial -- you will need to either change your process initialization or maintain a process pool from the main process. Either option can become non-trivial fast. Finally, the option of writing the CPU heavy code in Cython, and then releasing the GIL explicitly around the area it is not needed, exists. To sum up: * Dask * Something that releases the GIL, like numpy * Setting up an ampoule process pool * Move CPU heavy logic to Cython, and release GIL explicitly As I said, none of these options are really Twisted-related (the only one vaguely connected is the ampoule one, but even then, the fact that you'll use Twisted to manage the pool is incidental). There are even more exotic options, of course, but I think this e-mail might already be confusing enough. Moshe Z.
![](https://secure.gravatar.com/avatar/e1554622707bedd9202884900430b838.jpg?s=120&d=mm&r=g)
On Jun 24, 2019, at 4:26 PM, Chengi Liu <chengi.liu.86@gmail.com> wrote:
t = defer.ensureDeferred(network_get(i))
The meaning of "ensureDeferred" is, "ensure [that this thing, which may be a coroutine or Deferred, is a] Deferred". You don't need to use it with things decorated with @inlineCallbacks, only with `async def`; it does nothing in your example. We should really have a more explicit version of this, like a classmethod, Deferred.fromCoroutine(), that makes more sense in terms of what context you need to use it in. (The naming here comes from asyncio.ensure_future, but that does have the somewhat more explicit loop.create_task to pair with it.) -glyph
participants (5)
-
Chengi Liu
-
Gelin Yan
-
Glyph
-
meejah
-
Moshe Zadka