There are three kinds objects at the C level. - deferred - reactor - notifier None of these are directly exposed or accessible at the Python level. This levels the details of the implementation open for other flavors of python (e.g., jython, ironpython, pypy). class deferred(NotImplementedError): # There is one deferred per micro_thread object. as_exception = False def __init__(self, micro_thread): # keep a reference to the micro_thread object self.micro_thread = micro_thread # The queue maintains the callbacks in the order that they need to be # called (queue[0] first). This is the order of the add_callback # calls, which are the reverse order of the order that the C functions # were called on the C stack. # # When the callbacks are being called and popped off of the stack, one # of them may need to defer execution again before all of the # callbacks have been popped off the queue. In this case, the queue # has two levels: the new stuff (callbacks currently being gathered # as the C stack is unwound) and prior stuff (from the previous # deferred execution that hasn't be executed yet). # # The new stuff gets run before the prior stuff, so it is at the # front of queue, and the prior stuff is at the end of queue. # # queue_push marks the boundary between the new stuff and the prior # stuff. This is where the new stuff is being inserted so that the # prior stuff is always behind the new stuff. self.queue = [] self.queue_push = 0 self.last_caller = None def push(self, fn_name, fn, state): self.queue.insert(self.queue_push, (fn_name, fn, state)) self.queue_push += 1 def call(self, arg): fn_name, fn, state = self.queue.pop(0) return fn_name, fn(arg, state) def check_called_fn(self, called_name): # Returns False, ans if it sees a problem, or True, None if OK. if self.as_exception: return False, NULL if called_name != self.last_caller: self.error_fn_name = called_name self.as_exception = True # never set False again... self.message = \ "Deferred execution not yet implemented by: " + called_name self.args = (self.message,) # go back to prior callbacks and pass this deferred again as # an exception. ans = NULL # Signal exception with self still as the registered # exception. while self.queue_push > 0: name, ans = self.call(ans) self.queue_push -= 1 return False, ans return True, None def add_callback(self, caller_name, called_name, callback_fn, state): status, ans = self.check_called_fn(called_name) if not status: return ans self.last_caller = caller_name if callback_fn: self.push(caller_name, callback_fn, state) return NULL # self is still the registered exception def is_exception(self): return self.as_exception def callback(self, arg): while True: self.queue_push = 0 self.last_caller = None fn_name = None while not (arg == NULL and registered exception == self) and \ self.queue_push == 0 and \ self.queue: fn_name, arg = self.call(arg) if not self.queue: # The micro_thread is all done! self.micro_thread._final_answer(arg) # NULL for exception return if not (arg == NULL and registered exception == self): # So self.queue_push != 0. This means that the last callback # started to defer and then changed its mind! err = SystemError("abandoned callback") if arg == NULL: old_exception = registered exception err.__cause__ = old_exception register err as the current exception while self.queue_push > 0: name, ignore = self.call(NULL) # exception self.queue_push -= 1 if arg == NULL: re-register old_exception as the current exception else: status, arg = self.check_called_fn(fn_name) if status: return The os_thread.__init__ would do: self.reactor = reactor() mt = start_in_parallel(fn, *args, **kws) self.reactor.run() # when this returns the os_thread is done return mt.wait() # this should never defer NotifierList = [file_event_notifier] TimedWaitSeconds = 0.2 EventCheckingThreshold = 100 class reactor: def __init__(self): self.scheduled_deferreds = [] self.timers = {} self.next_timer = None self.notifiers = [n(self) for n in NotifierList] def get_notifier(self, name): for n in self.notifiers: if n.name() == name: return n raise KeyError("notifier name %s not registered" % name) def schedule(self, deferred, returned_obj): self.scheduled_deferreds.append((deferred, True, returned_obj)) def schedule_exception(self, deferred, exc_type, exc_value, exc_traceback): # Does this really need all three exc values, or just exc_value? self.scheduled_deferreds.append( (deferred, False, (exc_type, exc_value, exc_traceback))) def set_timer(self, notifier_obj, deferred_obj, seconds): expiration_time = time.time() + seconds self.timers[deferred_obj] = notifier_obj, expiration_time if not self.next_timer or \ expiration_time < self.timers[self.next_timer][0]: self.next_timer = deferred_obj def find_next_timer(self): self.next_timer = None nearest_time = infinity for deferred_obj, (notifier_obj, expiration_time) \ in self.timers.iteritems(): if expiration_time < nearest_time: self.next_timer = deferred nearest_time = expiration_time def clear_timer(self, notifier_obj, deferred_obj): del self.timers[deferred_obj] if self.next_timer == deferred_obj: self.find_next_timer() def check_time(self): while self.next_timer and \ time.time() >= self.timers[self.next_timer][1]: notifier_obj = self.timers[self.next_timer][0] notifier_obj.timeout(self.next_timer) self.find_next_timer() def run(self): # The parent os thread terminates when this returns. while True: # process up to EventCheckingThreshold scheduled deferreds: i = 0 while self.scheduled_deferreds and i < EventCheckingThreshold: self.check_time() deferred_obj, exc_flag, arg = self.scheduled_deferreds.pop(0) self.current_deferred = deferred_obj if exc_flag: register arg as current exception deferred_obj.callback(NULL) # exception clear registered exception else: deferred_obj.callback(arg) self.current_deferred = None i += 1 # check for events: if not self.scheduled_deferreds and not self.timers and \ len(self.notifiers) == 1: # nothing to do but wait! if self.notifiers[0].wait_forever() == 'empty': # nothing left to wait for! return # terminate the parent os_thread else: # first poll all notifiers, so that none of them get starved: empty_count = 0 # How many notifiers aren't waiting for # anything? for n in self.notifiers: if n.poll() == 'empty': empty_count += 1 else: non_empty_notifier = n if not self.scheduled_deferreds: # nothing turned up, so we need to go to sleep... if empty_count == len(self.notifiers): # nothing left to do! assert not self.timers return # terminate the parent os_thread if not self.next_timer and \ empty_count + 1 == len(self.notifiers): non_empty_notifier.wait_forever() else: timeout = min(max(0, self.timers[self.next_timer][1] - time.time()), TimedWaitSeconds) slept = False for n in self.notifiers: if slept: n.poll() elif n.timed_wait(timeout) == waited: slept = True class notifier: def __init__(self, reactor): self.reactor = reactor def name(self): return self.__class__.__name__ def timeout(self, deferred_obj): self.deregister(deferred_obj, TimedoutException) def defer(self, wait_reason, max_seconds = 0): # max_seconds of 0 means no time limit. deferred_obj = self.reactor.current_deferred self.register(deferred_obj, wait_reason, max_seconds) raise deferred_obj # start the process of deferring! class file_event_notifier(notifier): priority = 50 def __init__(self, reactor): super(notifier, self).__init__(reactor) self.registered_deferreds = {} def register(self, deferred, wait_reason, max_seconds): self.registered_deferreds[deferred] = wait_reason if max_seconds > 0: self.reactor.set_timer(self, deferred, max_seconds) def deregister(self, deferred_obj, returned_obj): del self.registered_deferreds[deferred_obj] self.reactor.schedule(deferred_obj, returned_obj) def poll(self): return self.timed_wait(0) def timed_wait(self, seconds): if not self.registered_deferreds: return 'empty' prepare and execute select.select based on self.registered_deferreds for fd in ready_fds: self.reactor.schedule(deferred_obj for fd, events for fd) return 'success' def wait_forever(self): return self.timed_wait(-1) You could also have signal_notifiers and gui_event_notifiers. # Here's pseudo-code for a modified C function receiving data from a socket. def modified_c_function(args): do stuff before recv... try: data = socket.recv() except "no data": try: # This raise the current_deferred as an exception. current_os_thread.reactor.get_notifier('file_event_notifier') \ .defer((socket.fileno(), 'input')) # So this line is never executed... except deferred, d: d.add_callback('modified_c_function', None, modified_c_function_cont, (socket, ...)) raise return modified_c_function_cont(None, (socket, ...)) def modified_c_function_cont(obj, state): data = socket.recv() do stuff after recv... return ans