Sorry,
I hit the send button too soon,
Mike
----------------------------------------------------------------
Michael Schneider
Senior Software Engineering Consultant
UGS PLM Solutions - an EDS Company
"The Greatest Performance Improvement Is the transitioning from a non-working state to the working state"
> -----Original Message-----
> From: Schneider, Michael
> Sent: Monday, March 01, 2004 11:28 AM
> To: 'twisted-python(a)twistedmatrix.com'
> Subject: tracking spawned jobs in a twisted application (code review
> request)
>
>
> Hello All,
>
> I have been pushing some code around.
>
> My problem is that I need to execute OS command line
> jobs with twisted.
>
> I started by morphing the twisted tutorial app into a
> job runner.
>
>
> My first path (It was wrong!!) was to morph the database connection
> pool to a compute pool using popen2.
>
> This group, suggested that I use reactor.spawn instead of popen2 .
>
> Attached is the resulting code.
>
> Basic Approach for Command protocol.processProtocol:
>
> 1) crate a JobPool Object to manage list of running jobs
> 2) Create Job Runner Object to wrap individual process info,
> and capture process output.
> (start time, stop time ...)
> 3) Create CmdProcProtcol to manage interaction with the reactor
>
>
> I morphed the twisted application: (Coded in comment section below)
> 1) Create JobPoolService
> 2) Create JobRunnerProtocol
> 3) Create XmlRpc Job Runner Interface that dispatches
> to job runner service.
>
>
>
> My questions are:
> - Is the the twisted way?
> - What are some alternative approaches?
> - Is there a better way?
>
> Thank you very much,
> Mike
>
> ----------------------------------------------------------------
> Michael Schneider
> Senior Software Engineering Consultant
> UGS PLM Solutions - an EDS Company
>
> "The Greatest Performance Improvement Is the transitioning
> from a non-working state to the working state"
>
# Twisted, the Framework of Your Internet
"""
Manage execution of multiple processes\
Usage Model:
1)Create Job runner Service in App
2)Create Job Runner Protocol in App
3) Create Xmlrpc to call
----- from service code in twisted app
m = RunTaskPool()
cmdLine="dir d:\"
def = m.runJob(cmdLine)
class JobRunnerService(service.Service):
__implements__ = service.Service.__implements__, IJobRunnerService
def __init__(self):
self.taskPool = None
def runJob(self, command):
print "In Twisted runJob, command = " + command
self.verifyRunTaskPool()
return self.taskPool.runJob(command)
def verifyRunTaskPool(self):
if(self.taskPool == None):
self.taskPool = RunTaskPool()
class JobRunnerProtocol(basic.LineReceiver):
def lineReceived(self, command):
d = self.factory.runJob(command)
d.addErrback(catchError)
def writeValue(value):
self.transport.write(value)
self.transport.write('\n\n')
self.transport.loseConnection()
d.addCallback(writeValue)
class JobStatusXR(xmlrpc.XMLRPC):
def __init__(self, service):
xmlrpc.XMLRPC.__init__(self)
self.service = service
def xmlrpc_runJob(self, command):
return self.service.runJob(command)
def xmlrpc_isAlive(self):
return 1
"""
from twisted.internet import protocol
from twisted.internet import reactor
from twisted.spread import pb
from twisted.python import reflect, log
from twisted.internet import defer
from time import sleep
from time import time
import os
class CmdProcProtocol(protocol.ProcessProtocol):
"""Twisted Protocol Class to run command line in a sub process """
def __init__(self, parentJobRunner):
self._exitStatus = 0
self.parentJobRunner = parentJobRunner
self.data = ""
def connectionMade(self):
"""Called at the start of the execution"""
#print "connectionMade!"
pass
def outReceived(self, data):
"""Called when process writes output"""
#print "outReceived! with %d bytes!" % len(data)
self.data = self.data + data
#print self.data
def errReceived(self, data):
#print "errReceived! with %d bytes!" % len(data)
pass
def inConnectionLost(self):
#print "inConnectionLost! stdin is closed! (we probably did it)"
pass
def outConnectionLost(self):
"""Program has closed stdout (program terminated)"""
#print "outConnectionLost! The child closed their stdout!"
#print "# now is the time to examine what they wrote"
#print "dir:", self.data
pass
def errConnectionLost(self):
"""Program has closed stderr (program terminated)"""
#print "errConnectionLost! The child closed their stderr."
pass
def processEnded(self, status_object):
"""Child process exited"""
self._exitStatus = status_object.value.exitCode
print "processEnded, status %d" % self._exitStatus
print "quitting"
print self.data
#notify parent with data returned , and exit status
self.parentJobRunner.signalRunComplete(self._exitStatus, self.data)
def kill(self):
"""Kill this process"""
#os.kill(self.transport.pid, signal.SIGKILL)
class JobRunner:
"""
I am a lightweight wrapper for run job.
"""
def __init__(self, parentJobRunnerPool, jobRunDeferred):
#command to execute in Command Proc Protocol
self.command = ""
#set
self.parentJobRunnerPool = parentJobRunnerPool
self._startTime = -1
self._endTime = -1
self._timeLimit = -1
self._exitcode = None
self._log = "job not yet started"
self._jobRunDeferred = jobRunDeferred
def getLogCallback(self,*args, **kw):
"""Called by Process Run Protocol when child process execution is complete """
#Notify JobRunner Pool That this job has completed execution
self.parentJobRunnerPool.jobCompleteCallback(self)
#return output of job
return self._log
def getExitCode(self):
return self._exitcode
def getStartTime(self):
"""Get Time Job Started Executing """
return self._startTime
def getEndTime(self):
"""Get Time Job Stopped Executing """
return self._endTime
def getRunTime(self):
""" get time executing"""
if(self._startTime < 0):
#job has not started yet, return -1
return -1
if(self._endTime < 0):
#job is running, but not yet finished, return time spent so far
return (time() - self._startTime)
#job is complete, return run time
return (self._endTime - self._startTime)
def runJob(self, *args, **kw):
"""Execute Job: NOTE: this function executes in its own thread """
#set execution start time
self.command = (args[0])[0]
print "In Run Job daemon command : " + self.command
self._startTime = time()
print "Start Running Command : " + str(self.command) + " " + str(self.getStartTime())
self._log = "Job is Running: "
#setup reactor to run job
self.runCommand(self.command)
def runCommand(self, commandStr):
"""Create Command Line Protocol, and call spawnProcess"""
commandStr = 'cmd.exe /c ' + commandStr
pp= CmdProcProtocol(self)
reactor.spawnProcess(pp,
commandStr ,
env=os.environ)
def signalRunComplete(self, exitCode, stdoutString):
# Pass Control back to caller to self.runJob(..)
# by triggering defered callback,
# this callback will return this object
self._exitCode = exitCode
self._log = stdoutString
# set execution time
self._endTime = time()
print "Done Running Job " + self.command
reactor.callFromThread(self._jobRunDeferred.callback, self)
class RunTaskPool(pb.Referenceable):
running = 0 # true when the pool is operating
def __init__(self, *args, **kw ):
"""See RunTaskPool.__doc__
"""
self.args = args
self.kw = kw
self._jobRunnerJobs = []
self.jobs = {} # running Jobs, hashed on thread id
self.startID = reactor.callWhenRunning(self.start)
def start(self):
"""Start of execution
"""
if not self.running:
self.shutdownID = reactor.addSystemEventTrigger('during',
'shutdown',
self.finalClose)
self.running = 1
def jobCompleteCallback(self, jobRunner):
"""Callback triggered by JobRunner when execution of job is complete """
print "Deleting Job Object from running queue"
if jobRunner in self._jobRunnerJobs:
self._jobRunnerJobs.remove(jobRunner)
def runJob(self, *args, **kw ):
"""run job and return the result as a defered object
"""
print "MLS: runJob - setting up to run in thread "
d = defer.Deferred()
jobRunner = JobRunner(self,d)
#Add to list of JobRunnerJobs managed
self._jobRunnerJobs.append(jobRunner)
print "MLS: in _runJobRunner "
# run Job in thread pool thread
jobRunner.runJob(args, kw)
# return defered used by jobRunner
d.addCallback(jobRunner.getLogCallback)
return d
def close(self):
"""Close all pool connections and shutdown the pool."""
from twisted.internet import reactor
if self.shutdownID:
reactor.removeSystemEventTrigger(self.shutdownID)
self.shutdownID = None
if self.startID:
reactor.removeSystemEventTrigger(self.startID)
self.startID = None
self.finalClose()
def finalClose(self):
"""This should only be called by the shutdown trigger."""
self.running = 0
for job in self.jobs.values():
if self.noisy:
log.msg(' closing: %s%s' % (
self.kw or ''))
job.close()
self.jobs.clear()
def __getstate__(self):
return {'noisy': self.noisy,
'min': self.min,
'max': self.max,
'kw': self.kw}
def __setstate__(self, state):
self.__dict__ = state
apply(self.__init__, self.kw)
def startup():
for x in range(1,14):
d = m.runJob()
d.addCallback(printSelf)
def printSelf(jobRunner):
print "Job Runner Done"