[stdlib-sig] futures - a new package for asynchronous execution

Anh Hai Trinh anh.hai.trinh at gmail.com
Fri Jan 15 11:50:14 CET 2010


Hello all,

I'd like to point out an alternative module with respect to
asynchronous computation: `stream` (which I wrote) supports
ThreadPool, ProcessPool and Executor with a simpler API and
implementation.

My module takes a list-processing oriented view in which a
ThreadPool/ProcessPool is simply a way of working with each stream
element concurrently and output results possibly in out of order.

A trivial example is:

  from stream import map
  range(10) >> ThreadPool(map(lambda x: x*x)) >> sum
  # returns 285


The URLs retrieving example is:

  import urllib2
  from stream import ThreadPool

  URLs = [
     'http://www.cnn.com/',
     'http://www.bbc.co.uk/',
     'http://www.economist.com/',
     'http://nonexistant.website.at.baddomain/',
     'http://slashdot.org/',
     'http://reddit.com/',
     'http://news.ycombinator.com/',
  ]

  def retrieve(urls, timeout=10):
     for url in urls:
        yield url, urllib2.urlopen(url, timeout=timeout).read()

  if __name__ == '__main__':
     retrieved = URLs >> ThreadPool(retrieve, poolsize=len(URLs))
     for url, content in retrieved:
        print '%r is %d bytes' % (url, len(content))
     for url, exception in retrieved.failure:
        print '%r failed: %s' % (url, exception)


Note that the main argument to ThreadPool is an iterator-processing
function: one that takes an iterator and returns an iterator. A
ThreadPool/Process simply distributes the input to workers running
such function and gathers their output as a single stream.

One important different between `stream` and `futures` is the order of
returned results.  The pool object itself is an iterable and the
returned iterator's `next()` calls unblocks as soon as there is an
output value.  The order of output is the order of job completion,
whereas for `futures.run_to_results()`, the order of the returned
iterator is based on the submitted FutureList --- this means if the
first item takes a long time to complete, subsequent processing of the
output can not benefit from other results already available.

The other difference is that there is no absolutely no abstraction but
two bare iterables for client code to deal with: one iterable over the
results, and one iterable over the failure; both are thread-safe.

If delicate job control is necessary, an Executor can be used. It is
implemented on top of the pool, and offers submit(*items) which
returns job ids to be used for cancel() and status().  Jobs can be
submitted and canceled concurrently.

The documentation is available at <http://www.trinhhaianh.com/stream.py>.

The code repository is located at <http://github.com/aht/stream.py>.
The implementation of ThreadPool, ProcessPool and Executor is little
more than 300 lines of code.


Peace,

-- 
// aht
http://blog.onideas.ws


More information about the stdlib-sig mailing list