[stdlib-sig] futures - a new package for asynchronous execution

Jeffrey Yasskin jyasskin at gmail.com
Sun Feb 21 04:41:34 CET 2010


Several comments:

* I see you using the Executors as context managers, but no mention in
the specification about what that does. You need to specify it. (Your
current implementation doesn't wait in __exit__, which I think is the
opposite of what you agreed with Antoine, but you can fix that after
we get general agreement on the interface.)

* I'd like users to be able to write Executors besides the simple
ThreadPoolExecutor and ProcessPoolExecutor you already have. To enable
that, could you document what the subclassing interface for Executor
looks like? that is, what code do user-written Executors need to
include? I don't think it should include direct access to
future._state like ThreadPoolExecutor uses, if at all possible.

* Could you specify in what circumstances a pure computational
Future-based program may deadlock? (Ideally, that would be "never".)
Your current implementation includes two such deadlocks, for which
I've attached a test.

* This is a nit, but I think that the parameter names for
ThreadPoolExecutor and ProcessPoolExecutor should be the same so
people can parametrize their code on those constructors. Right now
they're "max_threads" and "max_processes", respectively. I might
suggest "max_workers".

* You should document the exception that happens when you try to pass
a ProcessPoolExecutor as an argument to a task executing inside
another ProcessPoolExecutor, or make it not throw an exception and
document that.

* If it's intentional, you should probably document that if one
element of a map() times out, there's no way to come back and wait
longer to retrieve it or later elements.

* Do you want to make calling Executor.shutdown(wait=True) from within
the same Executor 1) detect the problem and raise an exception, 2)
deadlock, 3) unspecified behavior, or 4) wait for all other threads
and then let the current one continue?

* You still mention run_to_futures, run_to_results, and FutureList,
even though they're no longer proposed.

* wait() should probably return a named_tuple or an object so we don't
have people writing the unreadable "wait(fs)[0]".

* Instead of "call finishes" in the description of the return_when
parameter, you might describe the behavior in terms of futures
becoming done since that's the accessor function you're using.

* Is RETURN_IMMEDIATELY just a way to categorize futures into done and
not? Is that useful over [f for f in fs if f.done()]?

* After shutdown, is RuntimeError the right exception, or should there
be a more specific exception?

Otherwise, looks good. Thanks!

On Fri, Jan 29, 2010 at 2:22 AM, Brian Quinlan <brian at sweetapp.com> wrote:
> I've updated the PEP and included it inline. The interesting changes start
> in the "Specification" section.
>
> Cheers,
> Brian
>
> PEP:               XXX
> Title:             futures - execute computations asynchronously
> Version:           $Revision$
> Last-Modified:     $Date$
> Author:            Brian Quinlan <brian at sweetapp.com>
> Status:            Draft
> Type:              Standards Track
> Content-Type:      text/x-rst
> Created:           16-Oct-2009
> Python-Version:    3.2
> Post-History:
>
> ========
> Abstract
> ========
>
> This PEP proposes a design for a package that facilitates the evaluation of
> callables using threads and processes.
>
> ==========
> Motivation
> ==========
>
> Python currently has powerful primitives to construct multi-threaded and
> multi-process applications but parallelizing simple operations requires a
> lot of
> work i.e. explicitly launching processes/threads, constructing a
> work/results
> queue, and waiting for completion or some other termination condition (e.g.
> failure, timeout). It is also difficult to design an application with a
> global
> process/thread limit when each component invents its own parallel execution
> strategy.
>
> =============
> Specification
> =============
>
> Check Prime Example
> -------------------
>
> ::
>
>    import futures
>    import math
>
>    PRIMES = [
>        112272535095293,
>        112582705942171,
>        112272535095293,
>        115280095190773,
>        115797848077099,
>        1099726899285419]
>
>    def is_prime(n):
>        if n % 2 == 0:
>            return False
>
>        sqrt_n = int(math.floor(math.sqrt(n)))
>        for i in range(3, sqrt_n + 1, 2):
>            if n % i == 0:
>                return False
>        return True
>
>    with futures.ProcessPoolExecutor() as executor:
>        for number, is_prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
>            print('%d is prime: %s' % (number, is_prime))
>
> Web Crawl Example
> -----------------
>
> ::
>
>    import futures
>    import urllib.request
>
>    URLS = ['http://www.foxnews.com/',
>            'http://www.cnn.com/',
>            'http://europe.wsj.com/',
>            'http://www.bbc.co.uk/',
>            'http://some-made-up-domain.com/']
>
>    def load_url(url, timeout):
>        return urllib.request.urlopen(url, timeout=timeout).read()
>
>    with futures.ThreadPoolExecutor(max_threads=5) as executor:
>        future_to_url = dict((executor.submit(load_url, url, 60), url)
>                             for url in URLS)
>
>    for future in futures.as_completed(future_to_url):
>        url = future_to_url[future]
>        if future.exception() is not None:
>            print('%r generated an exception: %s' % (url,
> future.exception()))
>        else:
>            print('%r page is %d bytes' % (url, len(future.result())))
>
> Interface
> ---------
>
> The proposed package provides two core classes: `Executor` and `Future`.
> An `Executor` receives asynchronous work requests (in terms of a callable
> and
> its arguments) and returns a `Future` to represent the execution of that
> work request.
>
> Executor
> ''''''''
>
> `Executor` is an abstract class that provides methods to execute calls
> asynchronously.
>
> `submit(fn, *args, **kwargs)`
>
> Schedules the callable to be executed as fn(*\*args*, *\*\*kwargs*) and
> returns
> a `Future` instance representing the execution of the function.
>
> `map(func, *iterables, timeout=None)`
>
> Equivalent to map(*func*, *\*iterables*) but executed asynchronously and
> possibly out-of-order. The returned iterator raises a `TimeoutError` if
> `__next__()` is called and the result isn't available after *timeout*
> seconds
> from the original call to `run_to_results()`. If *timeout* is not specified
> or
> ``None`` then there is no limit to the wait time. If a call raises an
> exception
> then that exception will be raised when its value is retrieved from the
> iterator.
>
> `Executor.shutdown(wait=False)`
>
> Signal the executor that it should free any resources that it is using when
> the currently pending futures are done executing. Calls to
> `Executor.run_to_futures`, `Executor.run_to_results` and
> `Executor.map` made after shutdown will raise `RuntimeError`.
>
> If wait is `True` then the executor will not return until all the pending
> futures are done executing and the resources associated with the executor
> have been freed.
>
> ProcessPoolExecutor
> '''''''''''''''''''
>
> The `ProcessPoolExecutor` class is an `Executor` subclass that uses a pool
> of
> processes to execute calls asynchronously.
>
> `__init__(max_processes)`
>
> Executes calls asynchronously using a pool of a most *max_processes*
> processes. If *max_processes* is ``None`` or not given then as many worker
> processes will be created as the machine has processors.
>
> ThreadPoolExecutor
> ''''''''''''''''''
>
> The `ThreadPoolExecutor` class is an `Executor` subclass that uses a pool of
> threads to execute calls asynchronously.
>
> `__init__(max_threads)`
>
> Executes calls asynchronously using a pool of at most *max_threads* threads.
>
> Future Objects
> ''''''''''''''
>
> The `Future` class encapsulates the asynchronous execution of a function
> or method call. `Future` instances are returned by `Executor.submit`.
>
> `cancel()`
>
> Attempt to cancel the call. If the call is currently being executed then
> it cannot be cancelled and the method will return `False`, otherwise the
> call
> will be cancelled and the method will return `True`.
>
> `Future.cancelled()`
>
> Return `True` if the call was successfully cancelled.
>
> `Future.done()`
>
> Return `True` if the call was successfully cancelled or finished running.
>
> `result(timeout=None)`
>
> Return the value returned by the call. If the call hasn't yet completed then
> this method will wait up to *timeout* seconds. If the call hasn't completed
> in *timeout* seconds then a `TimeoutError` will be raised. If *timeout*
> is not specified or ``None`` then there is no limit to the wait time.
>
> If the future is cancelled before completing then `CancelledError` will
> be raised.
>
> If the call raised then this method will raise the same exception.
>
> `exception(timeout=None)`
>
> Return the exception raised by the call. If the call hasn't yet completed
> then this method will wait up to *timeout* seconds. If the call hasn't
> completed in *timeout* seconds then a `TimeoutError` will be raised.
> If *timeout* is not specified or ``None`` then there is no limit to the wait
> time.
>
> If the future is cancelled before completing then `CancelledError` will
> be raised.
>
> If the call completed without raising then ``None`` is returned.
>
> `index`
>
> int indicating the index of the future in its `FutureList`.
>
> Module Functions
> ''''''''''''''''
>
> `wait(fs, timeout=None, return_when=ALL_COMPLETED)`
>
> Wait for the `Future` instances in the given sequence to complete. Returns a
> 2-tuple of sets. The first set contains the futures that completed (finished
> or were cancelled) before the wait completed. The second set contains
> uncompleted futures.
>
> This method should always be called using keyword arguments, which are:
>
> *fs* is the sequence of Future instances that should be waited on.
>
> *timeout* can be used to control the maximum number of seconds to wait
> before
> returning. If timeout is not specified or None then there is no limit to the
> wait time.
>
> *return_when* indicates when the method should return. It must be one of the
> following constants:
>
> =============================
> ==================================================
>  Constant                      Description
> =============================
> ==================================================
> `FIRST_COMPLETED`             The method will return when any call finishes.
> `FIRST_EXCEPTION`             The method will return when any call raises an
>                              exception or when all calls finish.
> `ALL_COMPLETED`               The method will return when all calls finish.
> `RETURN_IMMEDIATELY`          The method will return immediately.
> =============================
> ==================================================
>
> `as_completed(fs, timeout=None)`
>
> Returns an iterator over the Future instances given by *fs* that yields
> futures
> as they complete (finished or were cancelled). Any futures that completed
> before `as_completed()` was called will be yielded first. The returned
> iterator
> raises a `TimeoutError` if `__next__()` is called and the result isn’t
> available
> after *timeout* seconds from the original call to `as_completed()`. If
> *timeout* is not specified or `None` then there is no limit to the wait
> time.
>
> =========
> Rationale
> =========
>
> The proposed design of this module was heavily influenced by the the Java
> java.util.concurrent package [1]_. The conceptual basis of the module, as in
> Java, is the Future class, which represents the progress and result of an
> asynchronous computation. The Future class makes little commitment to the
> evaluation mode being used e.g. it can be be used to represent lazy or eager
> evaluation, for evaluation using threads, processes or remote procedure
> call.
>
> Futures are created by concrete implementations of the Executor class
> (called ExecutorService in Java). The reference implementation provides
> classes that use either a process a thread pool to eagerly evaluate
> computations.
>
> Futures have already been seen in Python as part of a popular Python
> cookbook recipe [2]_ and have discussed on the Python-3000 mailing list
> [3]_.
>
> The proposed design is explicit i.e. it requires that clients be aware that
> they are consuming Futures. It would be possible to design a module that
> would return proxy objects (in the style of `weakref`) that could be used
> transparently. It is possible to build a proxy implementation on top of
> the proposed explicit mechanism.
>
> The proposed design does not introduce any changes to Python language syntax
> or semantics. Special syntax could be introduced [4]_ to mark function and
> method calls as asynchronous. A proxy result would be returned while the
> operation is eagerly evaluated asynchronously, and execution would only
> block if the proxy object were used before the operation completed.
>
> ========================
> Reference Implementation
> ========================
>
> The reference implementation [5]_ contains a complete implementation of the
> proposed design. It has been tested on Linux and Mac OS X.
>
> ==========
> References
> ==========
>
> .. [1]
>
>   `java.util.concurrent` package documentation
>
> `http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/package-summary.html`
>
> .. [2]
>
>   Python Cookbook recipe 84317, "Easy threading with Futures"
>   `http://code.activestate.com/recipes/84317/`
>
> .. [3]
>
>   `Python-3000` thread, "mechanism for handling asynchronous concurrency"
>   `http://mail.python.org/pipermail/python-3000/2006-April/000960.html`
>
>
> .. [4]
>
>   `Python 3000` thread, "Futures in Python 3000 (was Re: mechanism for
> handling asynchronous concurrency)"
>   `http://mail.python.org/pipermail/python-3000/2006-April/000970.html`
>
> .. [5]
>
>   Reference `futures` implementation
> `http://code.google.com/p/pythonfutures`
>
> =========
> Copyright
> =========
>
> This document has been placed in the public domain.
>
>
> ..
>   Local Variables:
>   mode: indented-text
>   indent-tabs-mode: nil
>   sentence-end-double-space: t
>   fill-column: 70
>   coding: utf-8
>   End:
-------------- next part --------------
A non-text attachment was scrubbed...
Name: deadlock_test.patch
Type: application/octet-stream
Size: 4826 bytes
Desc: not available
URL: <http://mail.python.org/pipermail/stdlib-sig/attachments/20100221/cb0fe590/attachment.obj>


More information about the stdlib-sig mailing list