concurrency in Python
Gary D. Duzan
gduzan at gte.com
Mon Jan 31 10:18:36 EST 2000
gvwilson at my-deja.com wrote:
>
> Hi. I'm interested in hearing from anyone who's ever built a
> concurrency toolkit for Python that used a model other than
> threads + locks --- futures, tuple spaces, active expressions,
> concurrent aggregates, actors, or anything else.
I've done some future-ish stuff in python, built on threads.
The futures aren't transparent since (as far as I can tell) it
isn't quite possible (or certainly not straightforward) to
create a generic conditional proxy object. It does the job for
me, though, using the __call__ method to get at the actual
value, when available. By default, our future objects require
some outside agent to resolve the value; until then accessors
simply block. Optionally, a function can be passed to the future
object which is launched as a thread, with the result of the
function becoming the value of the future and unblocking any
blocked accessors. So you can do things like:
================================================================
import future
a = future.future(get_some_numeric_value, arg1, arg2)
b = future.future(get_some_other_numeric_value, arg3, arg4)
# get_some_numeric_value() and get_some_other_numeric_value()
# are off happily executing some nasty computations in their
# own threads.
z = a() + b()
# The above will block until both results are available, then
# do the addition.
================================================================
It isn't rigorously tested or anything, but I'll attach
our implementation in the hope that someone will find it
interesting or even (*gasp*) useful.
Gary Duzan
GTE Laboratories
p.s. We mostly use the manual resolution, since the values
we are interested in are actually generated by external calls
to CORBA callback objects running in other threads.
-------------- next part --------------
import threading
class future:
def __init__(self, resolver=None, *args, **kwargs):
self._lock = threading.Lock()
self._resolved = threading.Condition(self._lock)
if resolver is not None:
apply(self.set_resolver, tuple([resolver] + list(args)), kwargs)
def set_resolver(self, resolver, *args, **kwargs):
self._lock.acquire()
if self.__dict__.has_key("_value"):
self._lock.release()
raise AlreadyResolved
if not self.__dict__.has_key("_resolver_thread"):
self._resolver_thread = threading.Thread(target=self.run_resolver,
args=(resolver,args,kwargs))
self._lock.release()
self._resolver_thread.start()
else:
self._lock.release()
raise DuplicateResolver
def run_resolver(self, resolver, args, kwargs):
try:
value = apply(resolver, args, kwargs)
except:
import sys
self.set_exception(sys.exc_type, sys.exc_value, sys.exc_traceback)
return
self.set_value(value)
def set_value(self, value):
self._lock.acquire()
self._value = value
self._resolved.notifyAll()
self._lock.release()
def __call__(self):
self._lock.acquire()
while not self.__dict__.has_key("_value") and not self.__dict__.has_key("_exc_type"):
self._resolved.wait()
self._lock.release()
if self.__dict__.has_key("_exc_type"):
raise self._exc_type, self._exc_value, self._exc_traceback
return self._value
def set_exception(self, exc_type, exc_value=None, exc_traceback=None):
self._lock.acquire()
self._exc_type = exc_type
self._exc_value = exc_value
self._exc_traceback = exc_traceback
self._resolved.notifyAll()
self._lock.release()
def invalidate(self):
self._lock.acquire()
self._exc_type = ResolveFailure
self._exc_value = None
self._exc_traceback = None
try:
del self._value
except AttributeError:
pass
self._resolved.notifyAll()
self._lock.release()
class DuplicateResolver:
"""Attempt to attach multiple resolvers to a future."""
pass
class AlreadyResolved:
"""Attept to attach a resolver to a future which already has a value."""
pass
class ResolveFailure:
"""Value cannot be resolved."""
pass
More information about the Python-list
mailing list