[stdlib-sig] futures - a new package for asynchronous execution

Brian Quinlan brian at sweetapp.com
Fri Jan 15 12:28:44 CET 2010


On 15 Jan 2010, at 21:50, Anh Hai Trinh wrote:

> Hello all,
>
> I'd like to point out an alternative module with respect to
> asynchronous computation: `stream` (which I wrote) supports
> ThreadPool, ProcessPool and Executor with a simpler API and
> implementation.

Neat!

I'm not sure that I'd agree with the simpler API part though :-)

> My module takes a list-processing oriented view in which a
> ThreadPool/ProcessPool is simply a way of working with each stream
> element concurrently and output results possibly in out of order.
>
> A trivial example is:
>
>  from stream import map
>  range(10) >> ThreadPool(map(lambda x: x*x)) >> sum
>  # returns 285

I think that you are probably missing an import. The equivalent using  
futures would be:

from futures import ThreadPoolExecutor
sum(ThreadPoolExecutor.map(lambda x: x*x, range(10))


>
> The URLs retrieving example is:
>
>  import urllib2
>  from stream import ThreadPool
>
>  URLs = [
>     'http://www.cnn.com/',
>     'http://www.bbc.co.uk/',
>     'http://www.economist.com/',
>     'http://nonexistant.website.at.baddomain/',
>     'http://slashdot.org/',
>     'http://reddit.com/',
>     'http://news.ycombinator.com/',
>  ]
>
>  def retrieve(urls, timeout=10):
>     for url in urls:
>        yield url, urllib2.urlopen(url, timeout=timeout).read()
>
>  if __name__ == '__main__':
>     retrieved = URLs >> ThreadPool(retrieve, poolsize=len(URLs))
>     for url, content in retrieved:
>        print '%r is %d bytes' % (url, len(content))
>     for url, exception in retrieved.failure:
>        print '%r failed: %s' % (url, exception)
>
>
> Note that the main argument to ThreadPool is an iterator-processing
> function: one that takes an iterator and returns an iterator. A
> ThreadPool/Process simply distributes the input to workers running
> such function and gathers their output as a single stream.

"retrieve" seems to take multiple url arguments. Does ThreadPool using  
some sort of balancing strategy if poolsize where set to < len(URLs)?

> One important different between `stream` and `futures` is the order of
> returned results.  The pool object itself is an iterable and the
> returned iterator's `next()` calls unblocks as soon as there is an
> output value.  The order of output is the order of job completion,
> whereas for `futures.run_to_results()`, the order of the returned
> iterator is based on the submitted FutureList --- this means if the
> first item takes a long time to complete, subsequent processing of the
> output can not benefit from other results already available.

Right, which is why futures has a as_completed() function. One  
difference is between the two implementations is that streamed  
remembers the arguments that it is processing while futures discards  
them when it doesn't need them. This was done for memory consumption  
reasons but the streamed approach seems to lead to simpler code.


> The other difference is that there is no absolutely no abstraction but
> two bare iterables for client code to deal with: one iterable over the
> results, and one iterable over the failure; both are thread-safe.
>
> If delicate job control is necessary, an Executor can be used. It is
> implemented on top of the pool, and offers submit(*items) which
> returns job ids to be used for cancel() and status().  Jobs can be
> submitted and canceled concurrently.

What type is each "item" supposed to be?

Can I wait on several items? What if they are created by different  
executors?

Cheers,
Brian

> The documentation is available at <http://www.trinhhaianh.com/stream.py 
> >.
>
> The code repository is located at <http://github.com/aht/stream.py>.
> The implementation of ThreadPool, ProcessPool and Executor is little
> more than 300 lines of code.
>
>
> Peace,
>
> -- 
> // aht
> http://blog.onideas.ws



More information about the stdlib-sig mailing list