class micro_thread: # This has been generalized a bit to allow any number of _stdout and # _stdin pipes. # # Only set_stdins and wait are accessible at the Python level. def __init__(self, fn, args, kws, num_stdout=0, exception_handler = None): self.exception_handler = exception_handler self.deferred = deferred(self) self._stdout = [micro_pipe() for i in range(num_stdout)] self._stdin = [] self.deferred.add_callback('micro_thread.__init__', None, lambda obj, state: fn(*args, **kws), None) current_os_thread.reactor.schedule(self.deferred, None) def set_stdins(self, *pipes): assert not self._stdin, \ "Can only do set_stdins once on a micro_thread" self._stdin = pipes for pipe in pipes: pipe._stdin_thread(self) def _final_answer(self, answer): if self.exception_handler: if answer == NULL: # exception self.exception_handler(*sys.exc_info()) # else ignore answer elif self.waiter: if answer == NULL: # exception current_os_thread.reactor.schedule_exception(self.waiter, *sys.exc_info()) else: current_os_thread.reactor.schedule(self.waiter, answer) del self.waiter else: self.answer = answer del self.deferred for pipe in self._stdout: pipe.close() for pipe in self._stdin: pipe.close() def wait(self): if hasattr(self, 'answer'): return self.answer self.waiter = current_os_thread.reactor.current_deferred self.waiter.add_callback('micro_thread.wait', None, None, None) raise self.waiter class micro_pipe: # Only the iterator functions, close and throw are accessible at the # Python level. closed = False def __init__(self): pass def _stdin_thread(self, thread): assert not hasattr(self, 'stdin_thread'), \ "Can't assign the same pipe to multiple micro_thread stdins" self.stdin_thread = thread def close(self): self.closed = True if hasattr(self, 'next_waiter'): current_os_thread.reactor.schedule_exception(self.next_waiter, StopIteration) del self.next_waiter if hasattr(self, 'put_waiter'): current_os_thread.reactor.schedule_exception(self.put_waiter, MicroThreadExit) del self.put_waiter def __iter__(self): return self def __next__(self): if hasattr(self, 'value'): ans = self.value del self.value if hasattr(self, 'put_waiter'): # Somebody is waiting for this value! current_os_thread.reactor.schedule(self.put_waiter, None) del self.put_waiter return ans if hasattr(self, 'iterable'): try: return next(self.iterable) except StopIteration: del self.iterable current_os_thread.reactor.schedule(self.put_waiter, None) del self.put_waiter if self.closed: raise StopIteration self.next_waiter = current_os_thread.reactor.current_deferred self.next_waiter.add_callback('micro_pipe.__next__', None, None, None) raise self.next_waiter def throw(self, type, value=None, traceback=None): if self.closed: raise (type, value, traceback) if hasattr(self, 'iterable'): if hasattr(self.iterable, 'throw'): return self.iterable.throw(type, value, traceback) del self.iterable if hasattr(self, 'put_waiter'): current_os_thread.reactor.schedule_exception(self.put_waiter, type, value, traceback) del self.put_waiter else: self.exc = type, value, traceback if hasattr(self, 'value'): del self.value self.next_waiter = current_os_thread.reactor.current_deferred self.next_waiter.add_callback('micro_pipe.throw', None, None, None) raise self.next_waiter def _put(self, obj): if hasattr(self, 'exc'): # A throw has been done. type, value, traceback = self.exc del self.exc raise (type, value, traceback) elif hasattr(self, 'value'): # There is already a queued put value, we need to wait... self.put_waiter = current_os_thread.reactor.current_deferred self.put_waiter.add_callback('micro_pipe._put', None, self._put2, obj) raise self.put_waiter elif hasattr(self, 'next_waiter'): # Somebody is waiting for this value! current_os_thread.reactor.schedule(self.next_waiter, obj) del self.next_waiter else: self.value = obj def _put2(self, obj, state): if obj == NULL: # throw exception! raise # re-raise current exception else: self._put(state) def _take_from(self, iterable): # Can I really do this? This would mean that iterable.__next__ would # run in a different micro_thread (the reader, rather than the # sender)... self.iterable = iter(iterable) self.put_waiter = current_os_thread.reactor.current_deferred self.put_waiter.add_callback('micro_pipe._take_from', None, None, None) raise self.put_waiter def start_and_forget(function, *args, exception_handler=traceback.print_exception, **kws): micro_thread(function, args, kws, exception_handler) def start_in_parallel(function, *args, **kws): return micro_thread(function, args, kws) def generate(function, *args, **kws): return micro_thread(function, args, kws, num_stdout=1)._stdout[0] def put(obj, port=0): current_os_thread.reactor.current_deferred.micro_thread._stdout[port] \ ._put(obj) def take_from(iterable, port=0): current_os_thread.reactor.current_deferred.micro_thread._stdout[port] \ ._take_from(iterable) def iter(o=None, sentinel=None): if o is None: o = 0 if isinstance(o, int): o = current_os_thread.reactor.current_deferred.micro_thread._stdin[o] # from here, same as current iter builtin function def next(iterator=None, default=None): if iterator is None: iterator = 0 if isinstance(iterator, int): iterator = \ current_os_thread.reactor.current_deferred.micro_thread._stdin \ [iterator] # from here, same as current next builtin function class parallel_map: def __init__(self, fn, *iterables): self.threads = iter([start_in_parallel(fn, *args) for args in zip(*iterables)]) def __iter__(self): return self def __next__(self): return next(self.threads).wait()