Re: [stdlib-sig] futures - a new package for asynchronous execution
On Nov 8, 2009, at 7:01 PM, Jeffrey Yasskin wrote:
Did you mean to drop the list? Feel free to cc them back in when you reply.
No, that was a brain malfunction. Redirecting the discussion to the list.
On Sat, Nov 7, 2009 at 3:31 PM, Brian Quinlan <brian@sweetapp.com> wrote:
On 8 Nov 2009, at 06:37, Jeffrey Yasskin wrote:
On Sat, Nov 7, 2009 at 7:32 AM, Jesse Noller <jnoller@gmail.com> wrote:
On Sat, Nov 7, 2009 at 10:21 AM, Antoine Pitrou <solipsis@pitrou.net
wrote:
Which API? My comment wasn't aimed at the API of the package - in the time I got to scan it last night nothing jumped out at me as overly offensive API-wise.
Not offensive, but probably too complicated if it's meant to be a simple helper. Anyway, let's wait for the PEP.
The PEP is right here:
http://code.google.com/p/pythonfutures/source/browse/trunk/PEP.txt
I'm interested in hearing specific complaints about the API in the context of what it's trying to *do*. The only thing which jumped out at me was the number of methods on FutureList; but then again, each one of those makes conceptual sense, even if they are verbose - they're explicit on what's being done.
Overall, I like the idea of having futures in the standard library, and I like the idea of pulling common bits of multiprocessing and threading into a concurrent.* package. Here's my stream-of-consciousness review of the PEP. I'll try to ** things that really affect the API.
The "Interface" section should start with a conceptual description of what Executor, Future, and FutureList are. Something like "An Executor is an object you can hand tasks to, which will run them for you, usually in another thread or process. A Future represents a task that may or may not have completed yet, and which can be waited for and its value or exception queries. A FutureList is ... <haven't read that far>."
** The Executor interface is pretty redundant, and it's missing the most basic call. Fundamentally, all you need is an Executor.execute(callable) method returning None,
How do you extract the results?
To implement submit in terms of execute, you write something like:
def submit(executor, callable): future = Future() def worker(): try: result = callable() except: future.set_exception(sys.exc_info()) else: future.set_value(result) executor.execute(worker) return future
I see. I'm not sure if that abstraction is useful but I get it now.
and all the future-oriented methods can be built on top of that. I'd support using Executor.submit(callable) for the simplest method instead, which returns a Future, but there should be some way for implementers to only implement execute() and get submit either for free or with a 1-line definition. (I'm using method names from
http://java.sun.com/javase/6/docs/api/java/util/concurrent/ExecutorService.h... in case I'm unclear about the semantics here.) run_to_futures, run_to_results, and map should be implementable on top of the Future interface, and shouldn't need to be methods on Executor. I'd recommend they be demoted to helper functions in the concurrent.futures module unless there's a reason they need to be methods, and that reason should be documented in the PEP.
** run_to_futures() shouldn't take a return_when argument. It should be possible to wait for those conditions on any list of Futures. (_not_ just a FutureList)
I packaged up Futures into FutureLists to fix an annoyance that I have with the Java implementation - you have all of these Future objects but no convenient way of operating over them.
Yep, I totally agree with that annoyance. Note, though, that Java has the CompletionService to support nearly same use cases as run_to_futures.
CompletionService's use case is handling results as they finish (just like the callbacks do in Deferreds). The FutureList use case is querying e.g. which callables raised, which returned, which are still running?
I made the FutureList the unit of waiting because: 1. I couldn't think of a use case where this wasn't sufficient
Take your webcrawl example. In a couple years, when Futures are widely accepted, it's quite possible that urllib.request.urlopen() will return a Future instead of a file. Then I'd like to request a bunch of URLs and process each as they come back. With the run_to_futures (or CompletionService) API, urllib would instead have to take a set of requests to open at once, which makes its API much harder to design. With a wait-for-any function, urllib could continue to return a single Future and let its users combine several results.
If we go down this road then we should just switch to Twisted :-) Seriously, the idea is that no one would ever change their API to accommodate futures - they are a way of making a library with no notion of concurrency concurrent. But I am starting to be convinced that individual futures are a good idea because it makes the run/submit method easier to use.
Alternately, say you have an RPC system returning Futures. You've sent off RPCs A, B, and C. Now you need two separate subsystems D and E to do something with the results, except that D can continue when either A or B finishes, but E can continue when either B or C finishes. Can D and E express just what they need to express, or do they have to deal with futures they don't really care about?
2. It makes the internal locking semantics a bit easier and faster (if you can wait on any future then the wait has to acquire a lock for every future [in a consistent order to prevent deadlocks when other threads are doing the same thing with an intersecting set of futures], add a result listener for each and then great some sort of object to aggregate their state)
Yep. I suspect the extra overhead isn't significant compared to the cost of scheduling threads.
But I am thinking that maybe FutureLists aren't the right abstraction.
The code sample looks like Executor is a context manager. What does its __exit__ do? shutdown()? shutdown&awaitTermination? I prefer waiting in Executor.__exit__, since that makes it easier for users to avoid having tasks run after they've cleaned up data those tasks depend on. But that could be my C++ bias, where we have to be sure to free memory in the right places. Frank, does Java run into any problems with people cleaning things up that an Executor's tasks depend on without awaiting for the Executor first?
shutdown should explain why it's important. Specifically, since the Executor controls threads, and those threads hold a reference to the Executor, nothing will get garbage collected without the explicit call.
Actually, the threads hold a weakref to the Executor so they can exit (when the work queue is empty) if the Executor is collected. Here is the code from futures/thread.py:
while True: try: work_item = work_queue.get(block=True, timeout=0.1) except queue.Empty: executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: return
Oh, yeah, that sounds like it might work. So why does shutdown exist?
It does work - there are tests and everything :-) .shutdown exists for the same reason that .close exists on files: - Python does not guarantee any particular GC strategy - tracebacks and other objects may retain a reference in an unexpected way - sometimes you want to free your resources before the function exits
** What happens when FutureList.wait(FIRST_COMPLETED) is called twice? Does it return immediately the second time? Does it wait for the second task to finish? I'm inclined to think that FutureList should go away and be replaced by functions that just take lists of Futures.
It waits until a new future is completed.
That seems confusing, since it's no longer the "FIRST" completed.
Maybe "NEXT_COMPLETED" would be better. Cheers, Brian
In general, I think the has_done_futures(), exception_futures(), etc. are fine even though their results may be out of date by the time you inspect them. That's because any individual Future goes monotonically from not-started->running->(exception|value), so users can take advantage of even an out-of-date done_futures() result. However, it's dangerous to have several query functions, since users may think that running_futures() `union` done_futures() `union` cancelled_futures() covers the whole FutureList, but instead a Future can move between two of the sets between two of those calls. Instead, perhaps an atomic partition() function would be better, which returns a collection of sub-lists that cover the whole original set.
I would rename result() to get() (or maybe Antoine's suggestion of __call__) to match Java. I'm not sure exception() needs to exist.
--- More general points ---
** Java's Futures made a mistake in not supporting work stealing, and this has caused deadlocks at Google. Specifically, in a bounded-size thread or process pool, when a task in the pool can wait for work running in the same pool, you can fill up the pool with tasks that are waiting for tasks that haven't started running yet. To avoid this, Future.get() should be able to steal the task it's waiting on out of the pool's queue and run it immediately.
** I think both the Future-oriented blocking model and the callback-based model Deferreds support are important for different situations. Futures tend to be easier to program with, while Deferreds use fewer threads and can have more predictable latencies. It should be possible to create a Future from a Deferred or a Deferred from a Future without taking up a thread.
Hey everyone, Thanks for all the great feedback! I'm going to compile everyone's feedback and then send out a list of proposed changes. In the meantime, more discussion is welcome :-) Cheers, Brian
Hey all, I compiled a summary of people's feedback (about technical issues - I agree that the docs could be better but agreeing on the API seems like the first step) and have some API change proposals. Here is a summary of the feedback: - Use Twisted Deferreds rather than Futures - The API too complex - Make Future a callable and drop the .result()/.exception() methods - Remove .wait() from Executor - Make it easy to process results in the order of completion rather than in the order that the futures were generated - Executor context managers should wait until their workers complete before exiting - Extract Executor.map, etc. into separate functions/modules - FutureList has too many methods or is not necessary - Executor should have an easy way to produce a single future - Should be able to wait on an arbitrary list of futures - Should have a way of avoiding deadlock (will follow-up on this separately) Here is what I suggest as far as API changes (the docs suck, I'll polish them when we reach consensus): FutureList is eliminated completely. Future remains unchanged - I disagree that Deferreds would be better, that .exception() is not useful, and that .result() should be renamed .get() or .__call__(). But I am easily persuadable :-) The Executor ABC is simplified to only contain a single method: def Executor.submit(self, fn, *args, **kwargs) : Submits a call for execution and returns a Future representing the pending results of fn(*args, **kwargs) map becomes a utility function: def map(executor, *iterables, timeout=None) Equivalent to map(func, *iterables) but executed asynchronously and possibly out-of-order. The returned iterator raises a TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to run_to_results(). If timeout is not specified or None then there is no limit to the wait time. If a call raises an exception then that exception will be raised when its value is retrieved from the iterator. wait becomes a utility function that can wait on any iterable of Futures: def wait(futures, return_when=ALL_COMPLETED) Wait until the given condition is met for the given futures. This method should always be called using keyword arguments, which are: timeout can be used to control the maximum number of seconds to wait before returning. If timeout is not specified or None then there is no limit to the wait time. return_when indicates when the method should return. It must be one of the following constants: NEXT_COMPLETED NEXT_EXCEPTION ALL_COMPLETED a new utility function is added that iterates over the given Futures and returns the as they are completed: def itercompleted(futures, timeout=None): Returns an iterator that returns a completed Future from the given list when __next__() is called. If no Futures are completed then __next__() is called then __next__() waits until one does complete. Raises a TimeoutError if __next__() is called and no completed future is available after timeout seconds from the original call. The URL loading example becomes: import functools import urllib.request import futures URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] def load_url(url, timeout): return urllib.request.urlopen(url, timeout=timeout).read() with futures.ThreadPoolExecutor(50) as executor: fs = [executor.submit(load_url, url, timeout=30) for url in URLS] for future in futures.itercompleted(fs): if future.exception() is not None: print('%r generated an exception: %s' % (url, future.exception())) else: print('%r page is %d bytes' % (url, len(future.result()))) What do you think? Are we moving in the right direction? Cheers, Brian
I am very happy with those changes. I think deadlock should be addressed before the first release as it changes the detailed semantics of some of the operations, but you've promised to do that, so cool. :) I think it's fine to leave the embedding of Deferred-like things into futures and the embedding of futures into Deferred-like things until a later release. I expect it to be requested, but I don't think it'll be hard to add later. On Thu, Nov 12, 2009 at 6:38 PM, Brian Quinlan <brian@sweetapp.com> wrote:
Hey all,
I compiled a summary of people's feedback (about technical issues - I agree that the docs could be better but agreeing on the API seems like the first step) and have some API change proposals.
Here is a summary of the feedback: - Use Twisted Deferreds rather than Futures - The API too complex - Make Future a callable and drop the .result()/.exception() methods - Remove .wait() from Executor - Make it easy to process results in the order of completion rather than in the order that the futures were generated - Executor context managers should wait until their workers complete before exiting - Extract Executor.map, etc. into separate functions/modules - FutureList has too many methods or is not necessary - Executor should have an easy way to produce a single future - Should be able to wait on an arbitrary list of futures - Should have a way of avoiding deadlock (will follow-up on this separately)
Here is what I suggest as far as API changes (the docs suck, I'll polish them when we reach consensus):
FutureList is eliminated completely.
Future remains unchanged - I disagree that Deferreds would be better, that .exception() is not useful, and that .result() should be renamed .get() or .__call__(). But I am easily persuadable :-)
The Executor ABC is simplified to only contain a single method:
def Executor.submit(self, fn, *args, **kwargs) :
Submits a call for execution and returns a Future representing the pending results of fn(*args, **kwargs)
map becomes a utility function:
def map(executor, *iterables, timeout=None)
Equivalent to map(func, *iterables) but executed asynchronously and possibly out-of-order. The returned iterator raises a TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to run_to_results(). If timeout is not specified or None then there is no limit to the wait time. If a call raises an exception then that exception will be raised when its value is retrieved from the iterator.
wait becomes a utility function that can wait on any iterable of Futures:
def wait(futures, return_when=ALL_COMPLETED)
Wait until the given condition is met for the given futures. This method should always be called using keyword arguments, which are:
timeout can be used to control the maximum number of seconds to wait before returning. If timeout is not specified or None then there is no limit to the wait time.
return_when indicates when the method should return. It must be one of the following constants:
NEXT_COMPLETED NEXT_EXCEPTION ALL_COMPLETED
a new utility function is added that iterates over the given Futures and returns the as they are completed:
def itercompleted(futures, timeout=None):
Returns an iterator that returns a completed Future from the given list when __next__() is called. If no Futures are completed then __next__() is called then __next__() waits until one does complete. Raises a TimeoutError if __next__() is called and no completed future is available after timeout seconds from the original call.
The URL loading example becomes:
import functools import urllib.request import futures
URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/']
def load_url(url, timeout): return urllib.request.urlopen(url, timeout=timeout).read()
with futures.ThreadPoolExecutor(50) as executor: fs = [executor.submit(load_url, url, timeout=30) for url in URLS]
for future in futures.itercompleted(fs): if future.exception() is not None: print('%r generated an exception: %s' % (url, future.exception())) else: print('%r page is %d bytes' % (url, len(future.result())))
What do you think? Are we moving in the right direction?
Cheers, Brian
_______________________________________________ stdlib-sig mailing list stdlib-sig@python.org http://mail.python.org/mailman/listinfo/stdlib-sig
-- Namasté, Jeffrey Yasskin http://jeffrey.yasskin.info/
Hey,
Future remains unchanged - I disagree that Deferreds would be better, that .exception() is not useful, and that .result() should be renamed .get() or .__call__().
On what grounds do you disagree with the latter? Another question: is the caught exception an attribute of the future? If so, is there any mechanism to clean it up (and its traceback) once the future has been "consumed"?
map becomes a utility function:
def map(executor, *iterables, timeout=None)
Why? map() can be defined on the ABC, so that subclasses don't have to provide their own implementation. An utility function which looks like a method and shadows the name of a built-in looks like a bad choice to me.
wait becomes a utility function that can wait on any iterable of Futures:
def wait(futures, return_when=ALL_COMPLETED)
Does it work if the futures are executed by different executors? If not, it should be an Executor method.
return_when indicates when the method should return. It must be one of the following constants:
NEXT_COMPLETED NEXT_EXCEPTION ALL_COMPLETED
Can you outline the difference between NEXT_COMPLETED and NEXT_EXCEPTION? What happens if I ask for NEXT_COMPLETED but the next future to complete raises an exception? etc.
def itercompleted(futures, timeout=None):
Returns an iterator that returns a completed Future from the given list when __next__() is called. If no Futures are completed then __next__() is called then __next__() waits until one does complete.
What about futures which complete with an exception?
with futures.ThreadPoolExecutor(50) as executor: fs = [executor.submit(load_url, url, timeout=30) for url in URLS]
The use of "with" here still is counter-intuitive, because it does not clean up resources immediately as it would seem to do. "with" is always synchronous in other situations.
What do you think? Are we moving in the right direction?
Perhaps, yes, but there are still lots of dark areas. Besides, it's obvious that the package has to mature, and should be tested by other people.
Hey Antoine, Sorry for not getting back to you sooner - I actually thought that I did reply but I see now that I didn't. On 14 Nov 2009, at 01:22, Antoine Pitrou wrote:
Hey,
Future remains unchanged - I disagree that Deferreds would be better, that .exception() is not useful, and that .result() should be renamed .get() or .__call__().
On what grounds do you disagree with the latter?
It feels hacky. Getting the result doesn't feel so special that it deserves to be a call rather than a simple getter.
Another question: is the caught exception an attribute of the future?
Yes
If so, is there any mechanism to clean it up (and its traceback) once the future has been "consumed"?
No there isn't. That's a good point though. I wonder if futures will tend to long-lived after there results are available?
map becomes a utility function:
def map(executor, *iterables, timeout=None)
Why? map() can be defined on the ABC, so that subclasses don't have to provide their own implementation.
An utility function which looks like a method and shadows the name of a built-in looks like a bad choice to me.
Good point.
wait becomes a utility function that can wait on any iterable of Futures:
def wait(futures, return_when=ALL_COMPLETED)
Does it work if the futures are executed by different executors? If not, it should be an Executor method.
Yes, it goes.
return_when indicates when the method should return. It must be one of the following constants:
NEXT_COMPLETED NEXT_EXCEPTION ALL_COMPLETED
Can you outline the difference between NEXT_COMPLETED and NEXT_EXCEPTION? What happens if I ask for NEXT_COMPLETED but the next future to complete raises an exception? etc.
NEXT_COMPLETED includes futures that raise. Completed in this sense means "done running".
def itercompleted(futures, timeout=None):
Returns an iterator that returns a completed Future from the given list when __next__() is called. If no Futures are completed then __next__() is called then __next__() waits until one does complete.
What about futures which complete with an exception?
They are included.
with futures.ThreadPoolExecutor(50) as executor: fs = [executor.submit(load_url, url, timeout=30) for url in URLS]
The use of "with" here still is counter-intuitive, because it does not clean up resources immediately as it would seem to do. "with" is always synchronous in other situations.
Maybe waiting until all pending futures are done executing would be better.
What do you think? Are we moving in the right direction?
Perhaps, yes, but there are still lots of dark areas.
Besides, it's obvious that the package has to mature, and should be tested by other people.
It would be great if other people tested the API. I'm not sure what you mean by "mature" though. Cheers, Brian
It feels hacky. Getting the result doesn't feel so special that it deserves to be a call rather than a simple getter.
Well, it is special since the whole point of a future is to get that result. Like the whole point of a weakref is to get the underlying object. Of course this is pretty much bikeshedding...
It would be great if other people tested the API. I'm not sure what you mean by "mature" though.
What I mean is that it would be nice if it got reviewed, tested and criticized by actual users. I have not looked at the implementation though.
No there isn't. That's a good point though. I wonder if futures will tend to long-lived after there results are available?
It's hard to tell without anyone actually using them, but for advanced uses I suspect that futures may become more or less long-lived objects (like Deferreds are :-)). In a Web spider example, you could have a bunch of futures representing pending or completed HTTP fetches, and a worker thread processing the results on the fly when each of those futures gets ready. If the processing is non-trivial (or if it involves say a busy database) the worker thread could get quite a bit behind the completion of HTTP requests. Twisted has a whole machinery for that in its specialized "Failure" class, so as to keep the traceback information in string representation and at the same time relinquish all references to the frames involved in the traceback. I'm not sure we need the same degree of sophistication but we should keep in mind that it's a potential problem. (actually, perhaps this would deserve built-in support in the interpreter)
The use of "with" here still is counter-intuitive, because it does not clean up resources immediately as it would seem to do. "with" is always synchronous in other situations.
Maybe waiting until all pending futures are done executing would be better.
I think it would be better indeed. At least it would be more in line with the other uses of context managers. Regards Antoine.
Hi all, I've updated the implementation based on the feedback that I've received and the updated documentation of the API is here: http://sweetapp.com/futures-pep/ If you are still interested, please take a look and let me know what you think.. Cheers, Brian
Hello all, I'd like to point out an alternative module with respect to asynchronous computation: `stream` (which I wrote) supports ThreadPool, ProcessPool and Executor with a simpler API and implementation. My module takes a list-processing oriented view in which a ThreadPool/ProcessPool is simply a way of working with each stream element concurrently and output results possibly in out of order. A trivial example is: from stream import map range(10) >> ThreadPool(map(lambda x: x*x)) >> sum # returns 285 The URLs retrieving example is: import urllib2 from stream import ThreadPool URLs = [ 'http://www.cnn.com/', 'http://www.bbc.co.uk/', 'http://www.economist.com/', 'http://nonexistant.website.at.baddomain/', 'http://slashdot.org/', 'http://reddit.com/', 'http://news.ycombinator.com/', ] def retrieve(urls, timeout=10): for url in urls: yield url, urllib2.urlopen(url, timeout=timeout).read() if __name__ == '__main__': retrieved = URLs >> ThreadPool(retrieve, poolsize=len(URLs)) for url, content in retrieved: print '%r is %d bytes' % (url, len(content)) for url, exception in retrieved.failure: print '%r failed: %s' % (url, exception) Note that the main argument to ThreadPool is an iterator-processing function: one that takes an iterator and returns an iterator. A ThreadPool/Process simply distributes the input to workers running such function and gathers their output as a single stream. One important different between `stream` and `futures` is the order of returned results. The pool object itself is an iterable and the returned iterator's `next()` calls unblocks as soon as there is an output value. The order of output is the order of job completion, whereas for `futures.run_to_results()`, the order of the returned iterator is based on the submitted FutureList --- this means if the first item takes a long time to complete, subsequent processing of the output can not benefit from other results already available. The other difference is that there is no absolutely no abstraction but two bare iterables for client code to deal with: one iterable over the results, and one iterable over the failure; both are thread-safe. If delicate job control is necessary, an Executor can be used. It is implemented on top of the pool, and offers submit(*items) which returns job ids to be used for cancel() and status(). Jobs can be submitted and canceled concurrently. The documentation is available at <http://www.trinhhaianh.com/stream.py>. The code repository is located at <http://github.com/aht/stream.py>. The implementation of ThreadPool, ProcessPool and Executor is little more than 300 lines of code. Peace, -- // aht http://blog.onideas.ws
On 15 Jan 2010, at 21:50, Anh Hai Trinh wrote:
Hello all,
I'd like to point out an alternative module with respect to asynchronous computation: `stream` (which I wrote) supports ThreadPool, ProcessPool and Executor with a simpler API and implementation.
Neat! I'm not sure that I'd agree with the simpler API part though :-)
My module takes a list-processing oriented view in which a ThreadPool/ProcessPool is simply a way of working with each stream element concurrently and output results possibly in out of order.
A trivial example is:
from stream import map range(10) >> ThreadPool(map(lambda x: x*x)) >> sum # returns 285
I think that you are probably missing an import. The equivalent using futures would be: from futures import ThreadPoolExecutor sum(ThreadPoolExecutor.map(lambda x: x*x, range(10))
The URLs retrieving example is:
import urllib2 from stream import ThreadPool
URLs = [ 'http://www.cnn.com/', 'http://www.bbc.co.uk/', 'http://www.economist.com/', 'http://nonexistant.website.at.baddomain/', 'http://slashdot.org/', 'http://reddit.com/', 'http://news.ycombinator.com/', ]
def retrieve(urls, timeout=10): for url in urls: yield url, urllib2.urlopen(url, timeout=timeout).read()
if __name__ == '__main__': retrieved = URLs >> ThreadPool(retrieve, poolsize=len(URLs)) for url, content in retrieved: print '%r is %d bytes' % (url, len(content)) for url, exception in retrieved.failure: print '%r failed: %s' % (url, exception)
Note that the main argument to ThreadPool is an iterator-processing function: one that takes an iterator and returns an iterator. A ThreadPool/Process simply distributes the input to workers running such function and gathers their output as a single stream.
"retrieve" seems to take multiple url arguments. Does ThreadPool using some sort of balancing strategy if poolsize where set to < len(URLs)?
One important different between `stream` and `futures` is the order of returned results. The pool object itself is an iterable and the returned iterator's `next()` calls unblocks as soon as there is an output value. The order of output is the order of job completion, whereas for `futures.run_to_results()`, the order of the returned iterator is based on the submitted FutureList --- this means if the first item takes a long time to complete, subsequent processing of the output can not benefit from other results already available.
Right, which is why futures has a as_completed() function. One difference is between the two implementations is that streamed remembers the arguments that it is processing while futures discards them when it doesn't need them. This was done for memory consumption reasons but the streamed approach seems to lead to simpler code.
The other difference is that there is no absolutely no abstraction but two bare iterables for client code to deal with: one iterable over the results, and one iterable over the failure; both are thread-safe.
If delicate job control is necessary, an Executor can be used. It is implemented on top of the pool, and offers submit(*items) which returns job ids to be used for cancel() and status(). Jobs can be submitted and canceled concurrently.
What type is each "item" supposed to be? Can I wait on several items? What if they are created by different executors? Cheers, Brian
The documentation is available at <http://www.trinhhaianh.com/stream.py
.
The code repository is located at <http://github.com/aht/stream.py>. The implementation of ThreadPool, ProcessPool and Executor is little more than 300 lines of code.
Peace,
-- // aht http://blog.onideas.ws
I'm not sure that I'd agree with the simpler API part though :-)
I was referring to your old API. Still, we are both obviously very biased here :-p
Does ThreadPool using some sort of balancing strategy if poolsize where set to < len(URLs)?
Yes, of course! Otherwise it wouldn't really qualify as a pool.
"retrieve" seems to take multiple url arguments.
Correct. `retrieve` is simply a generator that retrieve URLs sequentially, the ThreadPool distributes the input stream so that each workers get an iterator over its work load.
If delicate job control is necessary, an Executor can be used. It is implemented on top of the pool, and offers submit(*items) which returns job ids to be used for cancel() and status(). Jobs can be submitted and canceled concurrently.
What type is each "item" supposed to be?
Whatever your iterator-processing function is supposed to process. The URLs example can be written using an Executor as: e = Executor(ThreadPool, retrieve) e.submit(*URLs) e.close() print list(e.result)
Can I wait on several items?
Do you mean wait for several particular input values to be completed? As of this moment, yes but rather inefficiently. I have not considered it is a useful feature, especially when taking a wholesale, list-processing view: that a worker pool process its input stream _out_of_order_. If you just want to wait for several particular items, it means you need their outputs _in_order_, so why do you want to use a worker pool in the first place? However, I'd be happy to implement something like Executor.submit(*items, wait=True). Cheers, aht
On 16 Jan 2010, at 00:56, Anh Hai Trinh wrote:
I'm not sure that I'd agree with the simpler API part though :-)
I was referring to your old API. Still, we are both obviously very biased here :-p
For sure. I'm definitely used to looking at Future-style code so I find the model intuitive.
Does ThreadPool using some sort of balancing strategy if poolsize where set to < len(URLs)?
Yes, of course! Otherwise it wouldn't really qualify as a pool.
"retrieve" seems to take multiple url arguments.
Correct. `retrieve` is simply a generator that retrieve URLs sequentially, the ThreadPool distributes the input stream so that each workers get an iterator over its work load.
That's a neat idea - it saves you the overhead of a function call.
If delicate job control is necessary, an Executor can be used. It is implemented on top of the pool, and offers submit(*items) which returns job ids to be used for cancel() and status(). Jobs can be submitted and canceled concurrently.
What type is each "item" supposed to be?
Whatever your iterator-processing function is supposed to process. The URLs example can be written using an Executor as:
e = Executor(ThreadPool, retrieve) e.submit(*URLs) e.close() print list(e.result)
There are two common scenarios where I have seen Future-like things used: 1. Do the same operation on different data e.g. copy some local files to several remote servers 2. Do several different operations on different data e.g. parallelizing code like this: db = setup_database(host, port) data = parse_big_xml_file(request.body) save_data_in_db(data, db) I'm trying to get a handle on how streams accommodates the second case. With futures, I would write something like this: db_future = executor.submit(setup_database, host, port) data_future = executor.submit(parse_big_xml_file, data) # Maybe do something else here. wait( [db_future, data_future], timeout=10, # If either function raises then we can't complete the operation so # there is no reason to make the user wait. return_when=FIRST_EXCEPTION) db = db_future.result(timeout=0) data = data.result(timeout=0) save_data_in_db(data, db) Cheers, Brian
Can I wait on several items?
Do you mean wait for several particular input values to be completed? As of this moment, yes but rather inefficiently. I have not considered it is a useful feature, especially when taking a wholesale, list-processing view: that a worker pool process its input stream _out_of_order_. If you just want to wait for several particular items, it means you need their outputs _in_order_, so why do you want to use a worker pool in the first place?
However, I'd be happy to implement something like Executor.submit(*items, wait=True).
Cheers, aht _______________________________________________ stdlib-sig mailing list stdlib-sig@python.org http://mail.python.org/mailman/listinfo/stdlib-sig
2. Do several different operations on different data e.g. parallelizing code like this:
db = setup_database(host, port) data = parse_big_xml_file(request.body) save_data_in_db(data, db)
I'm trying to get a handle on how streams accommodates the second case. With futures, I would write something like this:
db_future = executor.submit(setup_database, host, port) data_future = executor.submit(parse_big_xml_file, data) # Maybe do something else here. wait( [db_future, data_future], timeout=10, # If either function raises then we can't complete the operation so # there is no reason to make the user wait. return_when=FIRST_EXCEPTION)
db = db_future.result(timeout=0) data = data.result(timeout=0) save_data_in_db(data, db)
For this kind of scenario, I feel `futures` and friends are not needed. My solution is to explicit use different threads for different operations then use join() thread to wait for a particular operation. Threading concurrency means memory is shared and thread.join() can be used to synchronize events. Generally, I would be doubtful about any library that support parallelization of code that "do several different operations on different data". One could have put it as "write concurrent programs", to which the answer must be a complete concurrency model: threading, multiprocessing, Erlang, Goroutines and CSP channels, etc. Cheers, -- // aht http://blog.onideas.ws
On 17 Jan 2010, at 01:44, Anh Hai Trinh wrote:
2. Do several different operations on different data e.g. parallelizing code like this:
db = setup_database(host, port) data = parse_big_xml_file(request.body) save_data_in_db(data, db)
I'm trying to get a handle on how streams accommodates the second case. With futures, I would write something like this:
db_future = executor.submit(setup_database, host, port) data_future = executor.submit(parse_big_xml_file, data) # Maybe do something else here. wait( [db_future, data_future], timeout=10, # If either function raises then we can't complete the operation so # there is no reason to make the user wait. return_when=FIRST_EXCEPTION)
db = db_future.result(timeout=0) data = data.result(timeout=0) save_data_in_db(data, db)
For this kind of scenario, I feel `futures` and friends are not needed. My solution is to explicit use different threads for different operations then use join() thread to wait for a particular operation. Threading concurrency means memory is shared and thread.join() can be used to synchronize events.
It is definitely true that you can roll your own implementation using threads but the purpose of the futures library is to make that unnecessary.
Generally, I would be doubtful about any library that support parallelization of code that "do several different operations on different data". One could have put it as "write concurrent programs", to which the answer must be a complete concurrency model: threading, multiprocessing, Erlang, Goroutines and CSP channels, etc.
I don't understand your doubts. To me the example that I gave is simple and useful. Cheers, Brian
On Sat, Jan 16, 2010 at 5:22 PM, Brian Quinlan <brian@sweetapp.com> wrote:
It is definitely true that you can roll your own implementation using threads but the purpose of the futures library is to make that unnecessary.
I'd like to stress this; futures/pools/etc are common enough patterns (and I get requests to add more to multiprocessing) that it makes sense as an add-on to the stdlib. This is sugar; not magic. jesse
On Sun, Jan 17, 2010 at 5:22 AM, Brian Quinlan <brian@sweetapp.com> wrote:
db_future = executor.submit(setup_database, host, port) data_future = executor.submit(parse_big_xml_file, data) # Maybe do something else here. wait( [db_future, data_future], timeout=10, # If either function raises then we can't complete the operation so # there is no reason to make the user wait. return_when=FIRST_EXCEPTION)
db = db_future.result(timeout=0) data = data.result(timeout=0) save_data_in_db(data, db)
It is definitely true that you can roll your own implementation using threads but the purpose of the futures library is to make that unnecessary.
I don't understand your doubts. To me the example that I gave is simple and useful.
What I mean is that your example is simple enough to do with threads. Here: [...] def setup_db(): nonlocal db; db = setup_database(host, port) def parse_xml(): nonlocal data; data = parse_big_xml(file) db_thread = threading.Thread(target=setup_db) db_thread.start() parse_thread = threading.Thread(target=parse_xml) parse_thread.start() [...] # Do something else here. db_thread.join() parse_thread.join() save_data_in_db(data, db) I used "nonlocal" here but you usually do this within a method and refer to self.db, self.data.
I don't understand your doubts. To me the example that I gave is simple and useful.
My doubt is about the usefulness of futures' constructs for the kind of code that "Do several different operations on different data". I think ThreadPool/ProcessPool is really useful when you do 1. Same operation on different data 2. Different operations on same datum But 3. Different operations on different data is perhaps misusing it. It is a too general use case because dependency comes into play. What if the different operations depend on each other? A useful thread pool construct for this would be at a more fundamental level, e.g. Grand Central Dispatch. Perhaps you would give another example? Cheers, -- // aht http://blog.onideas.ws
Do you guys mind taking this discussion off-list? As of right now neither of your projects are old enough or well known enough to be considered for inclusion in the stdlib at this time so this is really not relevant to the stdlib SIG to continue here. -Brett On Sat, Jan 16, 2010 at 20:53, Anh Hai Trinh <anh.hai.trinh@gmail.com> wrote:
On Sun, Jan 17, 2010 at 5:22 AM, Brian Quinlan <brian@sweetapp.com> wrote:
db_future = executor.submit(setup_database, host, port) data_future = executor.submit(parse_big_xml_file, data) # Maybe do something else here. wait( [db_future, data_future], timeout=10, # If either function raises then we can't complete the operation so # there is no reason to make the user wait. return_when=FIRST_EXCEPTION)
db = db_future.result(timeout=0) data = data.result(timeout=0) save_data_in_db(data, db)
It is definitely true that you can roll your own implementation using threads but the purpose of the futures library is to make that unnecessary.
I don't understand your doubts. To me the example that I gave is simple and useful.
What I mean is that your example is simple enough to do with threads. Here:
[...]
def setup_db(): nonlocal db; db = setup_database(host, port)
def parse_xml(): nonlocal data; data = parse_big_xml(file)
db_thread = threading.Thread(target=setup_db) db_thread.start()
parse_thread = threading.Thread(target=parse_xml) parse_thread.start()
[...] # Do something else here.
db_thread.join() parse_thread.join() save_data_in_db(data, db)
I used "nonlocal" here but you usually do this within a method and refer to self.db, self.data.
I don't understand your doubts. To me the example that I gave is simple and useful.
My doubt is about the usefulness of futures' constructs for the kind of code that "Do several different operations on different data". I think ThreadPool/ProcessPool is really useful when you do
1. Same operation on different data 2. Different operations on same datum
But
3. Different operations on different data
is perhaps misusing it. It is a too general use case because dependency comes into play. What if the different operations depend on each other? A useful thread pool construct for this would be at a more fundamental level, e.g. Grand Central Dispatch.
Perhaps you would give another example?
Cheers, -- // aht http://blog.onideas.ws _______________________________________________ stdlib-sig mailing list stdlib-sig@python.org http://mail.python.org/mailman/listinfo/stdlib-sig
On Fri, Jan 15, 2010 at 02:50, Anh Hai Trinh <anh.hai.trinh@gmail.com> wrote:
Hello all,
I'd like to point out an alternative module with respect to asynchronous computation: `stream` (which I wrote) supports ThreadPool, ProcessPool and Executor with a simpler API and implementation.
My module takes a list-processing oriented view in which a ThreadPool/ProcessPool is simply a way of working with each stream element concurrently and output results possibly in out of order.
A trivial example is:
from stream import map range(10) >> ThreadPool(map(lambda x: x*x)) >> sum # returns 285
I have not looked at the code at all, but the overloading of binary shift is not going to be viewed as a good thing. I realize there is an analogy to C++ streams, but typically Python's stdlib frowns upon overloading operators like this beyond what a newbie would think an operator is meant to do. -Brett
The URLs retrieving example is:
import urllib2 from stream import ThreadPool
URLs = [ 'http://www.cnn.com/', 'http://www.bbc.co.uk/', 'http://www.economist.com/', 'http://nonexistant.website.at.baddomain/', 'http://slashdot.org/', 'http://reddit.com/', 'http://news.ycombinator.com/', ]
def retrieve(urls, timeout=10): for url in urls: yield url, urllib2.urlopen(url, timeout=timeout).read()
if __name__ == '__main__': retrieved = URLs >> ThreadPool(retrieve, poolsize=len(URLs)) for url, content in retrieved: print '%r is %d bytes' % (url, len(content)) for url, exception in retrieved.failure: print '%r failed: %s' % (url, exception)
Note that the main argument to ThreadPool is an iterator-processing function: one that takes an iterator and returns an iterator. A ThreadPool/Process simply distributes the input to workers running such function and gathers their output as a single stream.
One important different between `stream` and `futures` is the order of returned results. The pool object itself is an iterable and the returned iterator's `next()` calls unblocks as soon as there is an output value. The order of output is the order of job completion, whereas for `futures.run_to_results()`, the order of the returned iterator is based on the submitted FutureList --- this means if the first item takes a long time to complete, subsequent processing of the output can not benefit from other results already available.
The other difference is that there is no absolutely no abstraction but two bare iterables for client code to deal with: one iterable over the results, and one iterable over the failure; both are thread-safe.
If delicate job control is necessary, an Executor can be used. It is implemented on top of the pool, and offers submit(*items) which returns job ids to be used for cancel() and status(). Jobs can be submitted and canceled concurrently.
The documentation is available at <http://www.trinhhaianh.com/stream.py>.
The code repository is located at <http://github.com/aht/stream.py>. The implementation of ThreadPool, ProcessPool and Executor is little more than 300 lines of code.
Peace,
-- // aht http://blog.onideas.ws _______________________________________________ stdlib-sig mailing list stdlib-sig@python.org http://mail.python.org/mailman/listinfo/stdlib-sig
I've updated the PEP and included it inline. The interesting changes start in the "Specification" section. Cheers, Brian PEP: XXX Title: futures - execute computations asynchronously Version: $Revision$ Last-Modified: $Date$ Author: Brian Quinlan <brian@sweetapp.com> Status: Draft Type: Standards Track Content-Type: text/x-rst Created: 16-Oct-2009 Python-Version: 3.2 Post-History: ======== Abstract ======== This PEP proposes a design for a package that facilitates the evaluation of callables using threads and processes. ========== Motivation ========== Python currently has powerful primitives to construct multi-threaded and multi-process applications but parallelizing simple operations requires a lot of work i.e. explicitly launching processes/threads, constructing a work/ results queue, and waiting for completion or some other termination condition (e.g. failure, timeout). It is also difficult to design an application with a global process/thread limit when each component invents its own parallel execution strategy. ============= Specification ============= Check Prime Example ------------------- :: import futures import math PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True with futures.ProcessPoolExecutor() as executor: for number, is_prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, is_prime)) Web Crawl Example ----------------- :: import futures import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] def load_url(url, timeout): return urllib.request.urlopen(url, timeout=timeout).read() with futures.ThreadPoolExecutor(max_threads=5) as executor: future_to_url = dict((executor.submit(load_url, url, 60), url) for url in URLS) for future in futures.as_completed(future_to_url): url = future_to_url[future] if future.exception() is not None: print('%r generated an exception: %s' % (url, future.exception())) else: print('%r page is %d bytes' % (url, len(future.result()))) Interface --------- The proposed package provides two core classes: `Executor` and `Future`. An `Executor` receives asynchronous work requests (in terms of a callable and its arguments) and returns a `Future` to represent the execution of that work request. Executor '''''''' `Executor` is an abstract class that provides methods to execute calls asynchronously. `submit(fn, *args, **kwargs)` Schedules the callable to be executed as fn(*\*args*, *\*\*kwargs*) and returns a `Future` instance representing the execution of the function. `map(func, *iterables, timeout=None)` Equivalent to map(*func*, *\*iterables*) but executed asynchronously and possibly out-of-order. The returned iterator raises a `TimeoutError` if `__next__()` is called and the result isn't available after *timeout* seconds from the original call to `run_to_results()`. If *timeout* is not specified or ``None`` then there is no limit to the wait time. If a call raises an exception then that exception will be raised when its value is retrieved from the iterator. `Executor.shutdown(wait=False)` Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to `Executor.run_to_futures`, `Executor.run_to_results` and `Executor.map` made after shutdown will raise `RuntimeError`. If wait is `True` then the executor will not return until all the pending futures are done executing and the resources associated with the executor have been freed. ProcessPoolExecutor ''''''''''''''''''' The `ProcessPoolExecutor` class is an `Executor` subclass that uses a pool of processes to execute calls asynchronously. `__init__(max_processes)` Executes calls asynchronously using a pool of a most *max_processes* processes. If *max_processes* is ``None`` or not given then as many worker processes will be created as the machine has processors. ThreadPoolExecutor '''''''''''''''''' The `ThreadPoolExecutor` class is an `Executor` subclass that uses a pool of threads to execute calls asynchronously. `__init__(max_threads)` Executes calls asynchronously using a pool of at most *max_threads* threads. Future Objects '''''''''''''' The `Future` class encapsulates the asynchronous execution of a function or method call. `Future` instances are returned by `Executor.submit`. `cancel()` Attempt to cancel the call. If the call is currently being executed then it cannot be cancelled and the method will return `False`, otherwise the call will be cancelled and the method will return `True`. `Future.cancelled()` Return `True` if the call was successfully cancelled. `Future.done()` Return `True` if the call was successfully cancelled or finished running. `result(timeout=None)` Return the value returned by the call. If the call hasn't yet completed then this method will wait up to *timeout* seconds. If the call hasn't completed in *timeout* seconds then a `TimeoutError` will be raised. If *timeout* is not specified or ``None`` then there is no limit to the wait time. If the future is cancelled before completing then `CancelledError` will be raised. If the call raised then this method will raise the same exception. `exception(timeout=None)` Return the exception raised by the call. If the call hasn't yet completed then this method will wait up to *timeout* seconds. If the call hasn't completed in *timeout* seconds then a `TimeoutError` will be raised. If *timeout* is not specified or ``None`` then there is no limit to the wait time. If the future is cancelled before completing then `CancelledError` will be raised. If the call completed without raising then ``None`` is returned. `index` int indicating the index of the future in its `FutureList`. Module Functions '''''''''''''''' `wait(fs, timeout=None, return_when=ALL_COMPLETED)` Wait for the `Future` instances in the given sequence to complete. Returns a 2-tuple of sets. The first set contains the futures that completed (finished or were cancelled) before the wait completed. The second set contains uncompleted futures. This method should always be called using keyword arguments, which are: *fs* is the sequence of Future instances that should be waited on. *timeout* can be used to control the maximum number of seconds to wait before returning. If timeout is not specified or None then there is no limit to the wait time. *return_when* indicates when the method should return. It must be one of the following constants: ============================= ================================================== Constant Description ============================= ================================================== `FIRST_COMPLETED` The method will return when any call finishes. `FIRST_EXCEPTION` The method will return when any call raises an exception or when all calls finish. `ALL_COMPLETED` The method will return when all calls finish. `RETURN_IMMEDIATELY` The method will return immediately. ============================= ================================================== `as_completed(fs, timeout=None)` Returns an iterator over the Future instances given by *fs* that yields futures as they complete (finished or were cancelled). Any futures that completed before `as_completed()` was called will be yielded first. The returned iterator raises a `TimeoutError` if `__next__()` is called and the result isn’t available after *timeout* seconds from the original call to `as_completed()`. If *timeout* is not specified or `None` then there is no limit to the wait time. ========= Rationale ========= The proposed design of this module was heavily influenced by the the Java java.util.concurrent package [1]_. The conceptual basis of the module, as in Java, is the Future class, which represents the progress and result of an asynchronous computation. The Future class makes little commitment to the evaluation mode being used e.g. it can be be used to represent lazy or eager evaluation, for evaluation using threads, processes or remote procedure call. Futures are created by concrete implementations of the Executor class (called ExecutorService in Java). The reference implementation provides classes that use either a process a thread pool to eagerly evaluate computations. Futures have already been seen in Python as part of a popular Python cookbook recipe [2]_ and have discussed on the Python-3000 mailing list [3]_. The proposed design is explicit i.e. it requires that clients be aware that they are consuming Futures. It would be possible to design a module that would return proxy objects (in the style of `weakref`) that could be used transparently. It is possible to build a proxy implementation on top of the proposed explicit mechanism. The proposed design does not introduce any changes to Python language syntax or semantics. Special syntax could be introduced [4]_ to mark function and method calls as asynchronous. A proxy result would be returned while the operation is eagerly evaluated asynchronously, and execution would only block if the proxy object were used before the operation completed. ======================== Reference Implementation ======================== The reference implementation [5]_ contains a complete implementation of the proposed design. It has been tested on Linux and Mac OS X. ========== References ========== .. [1] `java.util.concurrent` package documentation `http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/package-summary... ` .. [2] Python Cookbook recipe 84317, "Easy threading with Futures" `http://code.activestate.com/recipes/84317/` .. [3] `Python-3000` thread, "mechanism for handling asynchronous concurrency" `http://mail.python.org/pipermail/python-3000/2006-April/ 000960.html` .. [4] `Python 3000` thread, "Futures in Python 3000 (was Re: mechanism for handling asynchronous concurrency)" `http://mail.python.org/pipermail/python-3000/2006-April/ 000970.html` .. [5] Reference `futures` implementation `http://code.google.com/p/pythonfutures ` ========= Copyright ========= This document has been placed in the public domain. .. Local Variables: mode: indented-text indent-tabs-mode: nil sentence-end-double-space: t fill-column: 70 coding: utf-8 End:
Several comments: * I see you using the Executors as context managers, but no mention in the specification about what that does. You need to specify it. (Your current implementation doesn't wait in __exit__, which I think is the opposite of what you agreed with Antoine, but you can fix that after we get general agreement on the interface.) * I'd like users to be able to write Executors besides the simple ThreadPoolExecutor and ProcessPoolExecutor you already have. To enable that, could you document what the subclassing interface for Executor looks like? that is, what code do user-written Executors need to include? I don't think it should include direct access to future._state like ThreadPoolExecutor uses, if at all possible. * Could you specify in what circumstances a pure computational Future-based program may deadlock? (Ideally, that would be "never".) Your current implementation includes two such deadlocks, for which I've attached a test. * This is a nit, but I think that the parameter names for ThreadPoolExecutor and ProcessPoolExecutor should be the same so people can parametrize their code on those constructors. Right now they're "max_threads" and "max_processes", respectively. I might suggest "max_workers". * You should document the exception that happens when you try to pass a ProcessPoolExecutor as an argument to a task executing inside another ProcessPoolExecutor, or make it not throw an exception and document that. * If it's intentional, you should probably document that if one element of a map() times out, there's no way to come back and wait longer to retrieve it or later elements. * Do you want to make calling Executor.shutdown(wait=True) from within the same Executor 1) detect the problem and raise an exception, 2) deadlock, 3) unspecified behavior, or 4) wait for all other threads and then let the current one continue? * You still mention run_to_futures, run_to_results, and FutureList, even though they're no longer proposed. * wait() should probably return a named_tuple or an object so we don't have people writing the unreadable "wait(fs)[0]". * Instead of "call finishes" in the description of the return_when parameter, you might describe the behavior in terms of futures becoming done since that's the accessor function you're using. * Is RETURN_IMMEDIATELY just a way to categorize futures into done and not? Is that useful over [f for f in fs if f.done()]? * After shutdown, is RuntimeError the right exception, or should there be a more specific exception? Otherwise, looks good. Thanks! On Fri, Jan 29, 2010 at 2:22 AM, Brian Quinlan <brian@sweetapp.com> wrote:
I've updated the PEP and included it inline. The interesting changes start in the "Specification" section.
Cheers, Brian
PEP: XXX Title: futures - execute computations asynchronously Version: $Revision$ Last-Modified: $Date$ Author: Brian Quinlan <brian@sweetapp.com> Status: Draft Type: Standards Track Content-Type: text/x-rst Created: 16-Oct-2009 Python-Version: 3.2 Post-History:
======== Abstract ========
This PEP proposes a design for a package that facilitates the evaluation of callables using threads and processes.
========== Motivation ==========
Python currently has powerful primitives to construct multi-threaded and multi-process applications but parallelizing simple operations requires a lot of work i.e. explicitly launching processes/threads, constructing a work/results queue, and waiting for completion or some other termination condition (e.g. failure, timeout). It is also difficult to design an application with a global process/thread limit when each component invents its own parallel execution strategy.
============= Specification =============
Check Prime Example -------------------
::
import futures import math
PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419]
def is_prime(n): if n % 2 == 0: return False
sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True
with futures.ProcessPoolExecutor() as executor: for number, is_prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, is_prime))
Web Crawl Example -----------------
::
import futures import urllib.request
URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/']
def load_url(url, timeout): return urllib.request.urlopen(url, timeout=timeout).read()
with futures.ThreadPoolExecutor(max_threads=5) as executor: future_to_url = dict((executor.submit(load_url, url, 60), url) for url in URLS)
for future in futures.as_completed(future_to_url): url = future_to_url[future] if future.exception() is not None: print('%r generated an exception: %s' % (url, future.exception())) else: print('%r page is %d bytes' % (url, len(future.result())))
Interface ---------
The proposed package provides two core classes: `Executor` and `Future`. An `Executor` receives asynchronous work requests (in terms of a callable and its arguments) and returns a `Future` to represent the execution of that work request.
Executor ''''''''
`Executor` is an abstract class that provides methods to execute calls asynchronously.
`submit(fn, *args, **kwargs)`
Schedules the callable to be executed as fn(*\*args*, *\*\*kwargs*) and returns a `Future` instance representing the execution of the function.
`map(func, *iterables, timeout=None)`
Equivalent to map(*func*, *\*iterables*) but executed asynchronously and possibly out-of-order. The returned iterator raises a `TimeoutError` if `__next__()` is called and the result isn't available after *timeout* seconds from the original call to `run_to_results()`. If *timeout* is not specified or ``None`` then there is no limit to the wait time. If a call raises an exception then that exception will be raised when its value is retrieved from the iterator.
`Executor.shutdown(wait=False)`
Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to `Executor.run_to_futures`, `Executor.run_to_results` and `Executor.map` made after shutdown will raise `RuntimeError`.
If wait is `True` then the executor will not return until all the pending futures are done executing and the resources associated with the executor have been freed.
ProcessPoolExecutor '''''''''''''''''''
The `ProcessPoolExecutor` class is an `Executor` subclass that uses a pool of processes to execute calls asynchronously.
`__init__(max_processes)`
Executes calls asynchronously using a pool of a most *max_processes* processes. If *max_processes* is ``None`` or not given then as many worker processes will be created as the machine has processors.
ThreadPoolExecutor ''''''''''''''''''
The `ThreadPoolExecutor` class is an `Executor` subclass that uses a pool of threads to execute calls asynchronously.
`__init__(max_threads)`
Executes calls asynchronously using a pool of at most *max_threads* threads.
Future Objects ''''''''''''''
The `Future` class encapsulates the asynchronous execution of a function or method call. `Future` instances are returned by `Executor.submit`.
`cancel()`
Attempt to cancel the call. If the call is currently being executed then it cannot be cancelled and the method will return `False`, otherwise the call will be cancelled and the method will return `True`.
`Future.cancelled()`
Return `True` if the call was successfully cancelled.
`Future.done()`
Return `True` if the call was successfully cancelled or finished running.
`result(timeout=None)`
Return the value returned by the call. If the call hasn't yet completed then this method will wait up to *timeout* seconds. If the call hasn't completed in *timeout* seconds then a `TimeoutError` will be raised. If *timeout* is not specified or ``None`` then there is no limit to the wait time.
If the future is cancelled before completing then `CancelledError` will be raised.
If the call raised then this method will raise the same exception.
`exception(timeout=None)`
Return the exception raised by the call. If the call hasn't yet completed then this method will wait up to *timeout* seconds. If the call hasn't completed in *timeout* seconds then a `TimeoutError` will be raised. If *timeout* is not specified or ``None`` then there is no limit to the wait time.
If the future is cancelled before completing then `CancelledError` will be raised.
If the call completed without raising then ``None`` is returned.
`index`
int indicating the index of the future in its `FutureList`.
Module Functions ''''''''''''''''
`wait(fs, timeout=None, return_when=ALL_COMPLETED)`
Wait for the `Future` instances in the given sequence to complete. Returns a 2-tuple of sets. The first set contains the futures that completed (finished or were cancelled) before the wait completed. The second set contains uncompleted futures.
This method should always be called using keyword arguments, which are:
*fs* is the sequence of Future instances that should be waited on.
*timeout* can be used to control the maximum number of seconds to wait before returning. If timeout is not specified or None then there is no limit to the wait time.
*return_when* indicates when the method should return. It must be one of the following constants:
============================= ================================================== Constant Description ============================= ================================================== `FIRST_COMPLETED` The method will return when any call finishes. `FIRST_EXCEPTION` The method will return when any call raises an exception or when all calls finish. `ALL_COMPLETED` The method will return when all calls finish. `RETURN_IMMEDIATELY` The method will return immediately. ============================= ==================================================
`as_completed(fs, timeout=None)`
Returns an iterator over the Future instances given by *fs* that yields futures as they complete (finished or were cancelled). Any futures that completed before `as_completed()` was called will be yielded first. The returned iterator raises a `TimeoutError` if `__next__()` is called and the result isn’t available after *timeout* seconds from the original call to `as_completed()`. If *timeout* is not specified or `None` then there is no limit to the wait time.
========= Rationale =========
The proposed design of this module was heavily influenced by the the Java java.util.concurrent package [1]_. The conceptual basis of the module, as in Java, is the Future class, which represents the progress and result of an asynchronous computation. The Future class makes little commitment to the evaluation mode being used e.g. it can be be used to represent lazy or eager evaluation, for evaluation using threads, processes or remote procedure call.
Futures are created by concrete implementations of the Executor class (called ExecutorService in Java). The reference implementation provides classes that use either a process a thread pool to eagerly evaluate computations.
Futures have already been seen in Python as part of a popular Python cookbook recipe [2]_ and have discussed on the Python-3000 mailing list [3]_.
The proposed design is explicit i.e. it requires that clients be aware that they are consuming Futures. It would be possible to design a module that would return proxy objects (in the style of `weakref`) that could be used transparently. It is possible to build a proxy implementation on top of the proposed explicit mechanism.
The proposed design does not introduce any changes to Python language syntax or semantics. Special syntax could be introduced [4]_ to mark function and method calls as asynchronous. A proxy result would be returned while the operation is eagerly evaluated asynchronously, and execution would only block if the proxy object were used before the operation completed.
======================== Reference Implementation ========================
The reference implementation [5]_ contains a complete implementation of the proposed design. It has been tested on Linux and Mac OS X.
========== References ==========
.. [1]
`java.util.concurrent` package documentation
`http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/package-summary.html`
.. [2]
Python Cookbook recipe 84317, "Easy threading with Futures" `http://code.activestate.com/recipes/84317/`
.. [3]
`Python-3000` thread, "mechanism for handling asynchronous concurrency" `http://mail.python.org/pipermail/python-3000/2006-April/000960.html`
.. [4]
`Python 3000` thread, "Futures in Python 3000 (was Re: mechanism for handling asynchronous concurrency)" `http://mail.python.org/pipermail/python-3000/2006-April/000970.html`
.. [5]
Reference `futures` implementation `http://code.google.com/p/pythonfutures`
========= Copyright =========
This document has been placed in the public domain.
.. Local Variables: mode: indented-text indent-tabs-mode: nil sentence-end-double-space: t fill-column: 70 coding: utf-8 End:
A few extra points. On 21 Feb 2010, at 14:41, Jeffrey Yasskin wrote:
* I'd like users to be able to write Executors besides the simple ThreadPoolExecutor and ProcessPoolExecutor you already have. To enable that, could you document what the subclassing interface for Executor looks like? that is, what code do user-written Executors need to include? I don't think it should include direct access to future._state like ThreadPoolExecutor uses, if at all possible.
One of the difficulties here is: 1. i don't want to commit to the internal implementation of Futures 2. it might be hard to make it clear which methods are public to users and which methods are public to executor implementors
* Could you specify in what circumstances a pure computational Future-based program may deadlock? (Ideally, that would be "never".) Your current implementation includes two such deadlocks, for which I've attached a test.
Thanks for the tests but I wasn't planning on changing this behavior. I don't really like the idea of using the calling thread to perform the wait because: 1. not all executors will be able to implement that behavior 2. it can only be made to work if no wait time is specified Cheers, Brian
participants (6)
-
Anh Hai Trinh
-
Antoine Pitrou
-
Brett Cannon
-
Brian Quinlan
-
Jeffrey Yasskin
-
Jesse Noller