I'd like to propose an improvement to `concurrent.futures`. The library's
ThreadPoolExecutor and ProcessPoolExecutor are excellent tools, but there
is currently no mechanism for configuring which type of executor you want.
Also, there is no duck-typed class that behaves like an executor, but does
its processing in serial. Often times a develop will want to run a task in
parallel, but depending on the environment they may want to disable
threading or process execution. To address this I use a utility called a
`SerialExecutor` which shares an API with
ThreadPoolExecutor/ProcessPoolExecutor but executes processes sequentially
in the same python thread:
```python
import concurrent.futures
class SerialFuture( concurrent.futures.Future):
"""
Non-threading / multiprocessing version of future for drop in
compatibility
with concurrent.futures.
"""
def __init__(self, func, *args, **kw):
super(SerialFuture, self).__init__()
self.func = func
self.args = args
self.kw = kw
# self._condition = FakeCondition()
self._run_count = 0
# fake being finished to cause __get_result to be called
self._state = concurrent.futures._base.FINISHED
def _run(self):
result = self.func(*self.args, **self.kw)
self.set_result(result)
self._run_count += 1
def set_result(self, result):
"""
Overrides the implementation to revert to pre python3.8 behavior
"""
with self._condition:
self._result = result
self._state = concurrent.futures._base.FINISHED
for waiter in self._waiters:
waiter.add_result(self)
self._condition.notify_all()
self._invoke_callbacks()
def _Future__get_result(self):
# overrides private __getresult method
if not self._run_count:
self._run()
return self._result
class SerialExecutor(object):
"""
Implements the concurrent.futures API around a single-threaded backend
Example:
>>> with SerialExecutor() as executor:
>>> futures = []
>>> for i in range(100):
>>> f = executor.submit(lambda x: x + 1, i)
>>> futures.append(f)
>>> for f in concurrent.futures.as_completed(futures):
>>> assert f.result() > 0
>>> for i, f in enumerate(futures):
>>> assert i + 1 == f.result()
"""
def __enter__(self):
return self
def __exit__(self, ex_type, ex_value, tb):
pass
def submit(self, func, *args, **kw):
return SerialFuture(func, *args, **kw)
def shutdown(self):
pass
```
In order to make it easy to choose the type of parallel (or serial) backend
with minimal code changes I use the following "Executor" wrapper class
(although if this was integrated into concurrent.futures the name would
need to change to something better):
```python
class Executor(object):
"""
Wrapper around a specific executor.
Abstracts Serial, Thread, and Process Executor via arguments.
Args:
mode (str, default='thread'): either thread, serial, or process
max_workers (int, default=0): number of workers. If 0, serial is
forced.
"""
def __init__(self, mode='thread', max_workers=0):
from concurrent import futures
if mode == 'serial' or max_workers == 0:
backend = SerialExecutor()
elif mode == 'thread':
backend = futures.ThreadPoolExecutor(max_workers=max_workers)
elif mode == 'process':
backend = futures.ProcessPoolExecutor(max_workers=max_workers)
else:
raise KeyError(mode)
self.backend = backend
def __enter__(self):
return self.backend.__enter__()
def __exit__(self, ex_type, ex_value, tb):
return self.backend.__exit__(ex_type, ex_value, tb)
def submit(self, func, *args, **kw):
return self.backend.submit(func, *args, **kw)
def shutdown(self):
return self.backend.shutdown()
```
So in summary, I'm proposing to add a SerialExecutor and SerialFuture class
as an alternative to the ThreadPool / ProcessPool executors, and I'm also
advocating for some sort of "ParamatrizedExecutor", where the user can
construct it in "thread", "process", or "serial" model.
--
-Jon