forked processes and testing
andrea crotti
andrea.crotti.0 at gmail.com
Wed Sep 12 06:20:02 EDT 2012
I wrote a decorator that takes a function, run it in a forked process
and return the PID:
def on_forked_process(func):
from os import fork
"""Decorator that forks the process, runs the function and gives
back control to the main process
"""
def _on_forked_process(*args, **kwargs):
pid = fork()
if pid == 0:
func(*args, **kwargs)
sys.exit(0)
else:
return pid
return _on_forked_process
It seems in general to work but I'm not able to test it, for example this fails:
def test_on_forked_process(self):
@utils.on_forked_process
def _dummy_func():
pass
with self.assertRaises(SystemExit):
retpid = _dummy_func()
# pid of the son process should be always > 0
self.assertTrue(retpid > 0)
and I'm not sure why, nose doesn't like the Exit apparently even if
it's happening in an unrelated proces..
Any idea of how to make it testable or improve it?
In theory probably I will not use it for production because I should
use something smarter to control the various processes I need to run,
but for my integration tests it's quite useful, because then I can
kill the processes like
except KeyboardInterrupt:
from os import kill
from signal import SIGTERM
print("Killing sink and worker")
kill(sink_pid, SIGTERM)
kill(worker_pid, SIGTERM)
More information about the Python-list
mailing list