raise exceptions in generators/functions from other tasks/functions
Dominic
nomail at nospam.no
Sat May 15 16:40:39 EDT 2004
Just in case someone is interested.
I came up with this solution since my previous
posting.
Maybe you've got some other ideas or critcism.
I'll listen ;-)
Ciao,
Dominic
P.S. Python is really powerful!
Use-Case:
def driveTask(handler):
print " enter drive"
yield "current speed"
while 1:
try:
handler.register(driveTask, (NoPower, WallHit))
while 1:
print " adjust engine"
yield "current speed"
print " steer"
except WallHit:
print "hit the wall :-("
except NoPower:
print "need some fresh batteries :-)"
====================================================================
Full-Source code:
from types import GeneratorType
class TaskHandler(object):
def __init__(self):
self.tasks = []
def addTask(self, task):
self.tasks.append(task)
def removeTask(self, task):
del self.tasks[task]
def loop(self):
for t in self.tasks:
# generator task
if type(t) == GeneratorType:
if not t.next():
# generator is done, remove it
self.tasks.remove(t)
else: # normal method/function
t()
from sys import settrace
class SignalingTaskHandler(TaskHandler):
def __init__(self):
super(SignalingTaskHandler, self).__init__()
self.function2signal = {}
self.activeSignals = ()
self.newSignals = []
self.loopTask = self.__loop()
def register(self, aFunction, signals):
self.function2signal[id(aFunction.func_code)] = signals
def signal(self, aSignal):
if not aSignal in self.newSignals:
self.newSignals.append(aSignal)
def loop(self):
self.loopTask.next()
def __loop(self):
while 1:
settrace(self.__intercept)
tmp = len(self.newSignals)
try:
super(SignalingTaskHandler, self).loop()
finally:
settrace(None)
# if we've reached "steady state", we make a step
# I think Esterel-language has similar signaling semantics
if tmp == len(self.newSignals):
self.__step()
yield True
def __step(self):
# switch to new signal set, discard old ones
self.activeSignals = tuple(self.newSignals)
self.newSignals = []
def __intercept(self, frame, event, arg):
def _i(frame, event, arg):
# we could return _i to trace inside function
# and not only calls to it
if self.activeSignals:
signals = self.function2signal[key]
# check if there is one signal to raise
for s in self.activeSignals:
if s in signals:
raise s
# every function gets it's own tracer, otherwise
# exceptions cannot be catched :-( Why?
key = id(frame.f_code)
if key in self.function2signal:
return _i
return None
# A Use Case :
class WallHit(Exception):
pass
class NoPower(Exception):
pass
def driveTask(handler):
print " enter drive"
yield "current speed"
while 1:
try:
handler.register(driveTask, (NoPower, WallHit))
while 1:
print " adjust engine"
yield "current speed"
print " steer"
except WallHit:
print "hit the wall :-("
except NoPower:
print "need some fresh batteries :-)"
counter = 0
def miscTask():
global counter
if counter == 4:
handler.signal(WallHit)
if counter == 5:
handler.signal(NoPower)
counter += 1
# "special" tasks with signaling
handler = SignalingTaskHandler()
tmp = driveTask(handler)
handler.addTask(tmp)
# main loop
mainTaskHandler = TaskHandler()
# well, there could be a lot more
mainTaskHandler.addTask(miscTask)
mainTaskHandler.addTask(handler.loop)
for _ in range(10):
mainTaskHandler.loop()
# enter drive
# adjust engine
# steer
# adjust engine
# steer
# adjust engine
# steer
# adjust engine
# hit the wall :-(
# adjust engine
# need some fresh batteries :-)
# adjust engine
# steer
# adjust engine
# steer
# adjust engine
# steer
# adjust engine
More information about the Python-list
mailing list