[Twisted-Python] Understanding Deferreds/callback further

Hi guys, I am still having trouble writing programs that will make the most use of deferreds. I will describe the problem I am trying to solve and the solution I've tried. The problem: I am trying to write a client that gets a job from a PB server, processes it, submits the result and gets another job. I need to be able to do this for multiple jobs (i.e. I need to be able to get 5 jobs, and start 5 similar chains: get job --> work on job --> submit result --> get another job). This looks like a perfectly solvable problem using deferreds since working on the job might take a while and using callbacks seems to be the most appropriate. So I have something like this in my main method: monitor = Monitor(server, port) monitor.connect() def connect(self): factory = pb.PBClientFactory() reactor.connectTCP(self.hostname, self.port, factory) return factory.login(self.credentials).addCallback( self._connected).addErrback( self._catch_failure) Then in self._connected I would like to start the chain of getting jobs/processing them/submitting jobs: def _connected(self, remoteobj): self.remoteobj = remoteobj for worker in self.workers: self.get_jobs(worker).addCallback( self._got_job, worker).addCallback( self.get_jobs, worker).addErrback( self._catch_failure) _got_job takes both a job and a worker and starts to process the job. Which for now returns a defer.succeed(). This is the part where I stumble on though, the exact implementation of how to make it so that one calls back the other. Any help would be appreciated. Please let me know also if you need more specific information. Thanks, Yi

Yi Qiang wrote:
This is the part where I stumble on though, the exact implementation of how to make it so that one calls back the other.
One what calls back the other what? You'll have issues with error handling and dropping workers if you're not careful to chain the deferreds correctly. I would try something like this personally. class Worker: def gotJob(self, j): self.jobID = j.ID # do whatever callInThread(DoesntHoldGIL, j).addCallbacks( self.doneJob, self.failedJob ).addCallback(self.noJob, self.noJob) def doneJob(self, result): pb.callRemote('jobDone', self.jobID, result) def failedJob(self, f): pb.callRemote('jobFailed', self.jobID, f.getErrorMessage()) def noJob(self, v): self.free = True workers = Pool(Worker) def checkForJobs(): for worker in workers: if worker.free: worker.free = False job = pb.callRemote('getJob') job.addCallbacks(worker.gotJob, worker.noJob) tsk = task.LoopingCall(checkForJobs) # run every 10 seconds, start now tsk.start(10, True)

On 11/3/06, Phil Mayers <p.mayers@imperial.ac.uk> wrote:
Hi Phil, Sorry it took me so long to get back to you. Thanks for your suggestion, but the thing I was trying to avoid is to have the Workers communicate with the PB server. There could be potentially many workers and when I thought about the problem, it seemed cleaner to design something where the client consisted of a 'Monitor' and 'Workers'. The monitor connects to the server, gets jobs, distributes jobs to the workers and watches for workers who finished their jobs. What do you think of this design? It is obviously a little more convoluted to program but might be easier to deal with in the future. I have little programming experience so this is all speculation from my side. Any insight would be greatly appreciated. Thanks, Yi

Yi Qiang wrote:
To be honest, the problem still seems a little abstract to me, and I'm having a hard time figuring out exactly what you're trying to achieve. Surely the clients have to submit *some* form of completion status to the PB server? The "Monitor" portion of the clients seem superfluous - what does it achieve that having the worker submit the completion does not? Without a more concrete description I'm afraid I don't have much more useful to add.

On 11/8/06, Phil Mayers <p.mayers@imperial.ac.uk> wrote:
Ok. The thing I am trying to design/code is a job distribution system, for a distributed computation model. There is a server which interacts with the job database (which is just a bsddb). The server accepts connections from clients and distributes jobs to them. The server also accepts connections from a 2nd type of client, which submits jobs to the server. The "Monitor" portion of the clients seem superfluous - what does it
achieve that having the worker submit the completion does not?
For the clients whose sole purpose it is to get and process jobs, I initially envisioned it containing 2 classes. One class is the Monitor class and one is the Worker class. The Monitor's responsibility would be coordinating the Workers since there can be multiple workers running on the same client. The monitor would coordinate tasks such as watching for activity from the workers (i.e. update the output from workers as it comes in) and also the monitor needs to be able to prematurely end a workers task if the job is marked as unnecessary. The workers of course have to submit their results back to the server, but in the system I envisioned the monitor takes care of that by continuously polling the status of the workers. You're right in that this system complicates things since now I have to handle communication between the client and the server, and on the client side I need to implement communication protocols between the monitor and the worker. However, it does seem to buy me some flexibility. Also, this will keep the Worker class quite simple since all it actually does it execute an external program to do it's job. Without a more concrete description I'm afraid I don't have much more
useful to add.
I hope this clears it up some more and am looking forward to your feedback. I think the most difficult problem I am facing right now is to decide on a good design. This is kind of vague in text and I hope to be able to show you some code soon, when I figure out how to publish a mercurial repository :) Thanks, Yi

On 11/8/06, Phil Mayers <p.mayers@imperial.ac.uk> wrote:
The "Monitor" portion of the clients seem superfluous - what does it
achieve that having the worker submit the completion does not?
Also, another reason why I wanted a monitor is because then I only have to establish on connection to the PB server, vs having each worker establish it's own connection, or passing around the remoteobj. Yi

Yi Qiang wrote:
(there's no need to CC me, I'm on the list) Ok fine, you want a (superfluous IMHO) instance of a monitor class. Do this: class Worker: def issueJob(self, job): # return a deferred return reactor.callInThread(nonGILnonBlocking, job) class Monitor: def connect(self): factory = pb.PB.ClientFactory() reactor.connectTCP(host, port, factory) factory.login(creds).addCallbacks( self.connected, self.connectfailed ) def connected(self, remoteobj): self.remoteobj = remoteobj self.workerpool = SomeWorkerPool() for worker in self.workerpool: self.startWorker(worker) def startWorker(worker): self.remoteobj.callRemote('getJob').addCallback( self.startWorker2, worker # startWorker2 returns a deferred - the callback chain will be # paused here when that happens and continued when *that* deferred # finishes, with the callback value ).addCallback( self.workerDone, self.workerFailed, (worker,) {}, (worker,) {}, ) def startWorker2(self, job, worker): deferred = worker.issueJob(job) return deferred def workerDone(self, result, worker): self.remoteobj.callRemote('jobDone', result) self.startWorker(worker) def workerFailed(self, failure, worker): self.remoteobj.callRemote('jobFailed', failure) self.startWorker(worker) I *still* think you need a job ID number to key successes and failures at the server side.

Yi Qiang wrote:
This is the part where I stumble on though, the exact implementation of how to make it so that one calls back the other.
One what calls back the other what? You'll have issues with error handling and dropping workers if you're not careful to chain the deferreds correctly. I would try something like this personally. class Worker: def gotJob(self, j): self.jobID = j.ID # do whatever callInThread(DoesntHoldGIL, j).addCallbacks( self.doneJob, self.failedJob ).addCallback(self.noJob, self.noJob) def doneJob(self, result): pb.callRemote('jobDone', self.jobID, result) def failedJob(self, f): pb.callRemote('jobFailed', self.jobID, f.getErrorMessage()) def noJob(self, v): self.free = True workers = Pool(Worker) def checkForJobs(): for worker in workers: if worker.free: worker.free = False job = pb.callRemote('getJob') job.addCallbacks(worker.gotJob, worker.noJob) tsk = task.LoopingCall(checkForJobs) # run every 10 seconds, start now tsk.start(10, True)

On 11/3/06, Phil Mayers <p.mayers@imperial.ac.uk> wrote:
Hi Phil, Sorry it took me so long to get back to you. Thanks for your suggestion, but the thing I was trying to avoid is to have the Workers communicate with the PB server. There could be potentially many workers and when I thought about the problem, it seemed cleaner to design something where the client consisted of a 'Monitor' and 'Workers'. The monitor connects to the server, gets jobs, distributes jobs to the workers and watches for workers who finished their jobs. What do you think of this design? It is obviously a little more convoluted to program but might be easier to deal with in the future. I have little programming experience so this is all speculation from my side. Any insight would be greatly appreciated. Thanks, Yi

Yi Qiang wrote:
To be honest, the problem still seems a little abstract to me, and I'm having a hard time figuring out exactly what you're trying to achieve. Surely the clients have to submit *some* form of completion status to the PB server? The "Monitor" portion of the clients seem superfluous - what does it achieve that having the worker submit the completion does not? Without a more concrete description I'm afraid I don't have much more useful to add.

On 11/8/06, Phil Mayers <p.mayers@imperial.ac.uk> wrote:
Ok. The thing I am trying to design/code is a job distribution system, for a distributed computation model. There is a server which interacts with the job database (which is just a bsddb). The server accepts connections from clients and distributes jobs to them. The server also accepts connections from a 2nd type of client, which submits jobs to the server. The "Monitor" portion of the clients seem superfluous - what does it
achieve that having the worker submit the completion does not?
For the clients whose sole purpose it is to get and process jobs, I initially envisioned it containing 2 classes. One class is the Monitor class and one is the Worker class. The Monitor's responsibility would be coordinating the Workers since there can be multiple workers running on the same client. The monitor would coordinate tasks such as watching for activity from the workers (i.e. update the output from workers as it comes in) and also the monitor needs to be able to prematurely end a workers task if the job is marked as unnecessary. The workers of course have to submit their results back to the server, but in the system I envisioned the monitor takes care of that by continuously polling the status of the workers. You're right in that this system complicates things since now I have to handle communication between the client and the server, and on the client side I need to implement communication protocols between the monitor and the worker. However, it does seem to buy me some flexibility. Also, this will keep the Worker class quite simple since all it actually does it execute an external program to do it's job. Without a more concrete description I'm afraid I don't have much more
useful to add.
I hope this clears it up some more and am looking forward to your feedback. I think the most difficult problem I am facing right now is to decide on a good design. This is kind of vague in text and I hope to be able to show you some code soon, when I figure out how to publish a mercurial repository :) Thanks, Yi

On 11/8/06, Phil Mayers <p.mayers@imperial.ac.uk> wrote:
The "Monitor" portion of the clients seem superfluous - what does it
achieve that having the worker submit the completion does not?
Also, another reason why I wanted a monitor is because then I only have to establish on connection to the PB server, vs having each worker establish it's own connection, or passing around the remoteobj. Yi

Yi Qiang wrote:
(there's no need to CC me, I'm on the list) Ok fine, you want a (superfluous IMHO) instance of a monitor class. Do this: class Worker: def issueJob(self, job): # return a deferred return reactor.callInThread(nonGILnonBlocking, job) class Monitor: def connect(self): factory = pb.PB.ClientFactory() reactor.connectTCP(host, port, factory) factory.login(creds).addCallbacks( self.connected, self.connectfailed ) def connected(self, remoteobj): self.remoteobj = remoteobj self.workerpool = SomeWorkerPool() for worker in self.workerpool: self.startWorker(worker) def startWorker(worker): self.remoteobj.callRemote('getJob').addCallback( self.startWorker2, worker # startWorker2 returns a deferred - the callback chain will be # paused here when that happens and continued when *that* deferred # finishes, with the callback value ).addCallback( self.workerDone, self.workerFailed, (worker,) {}, (worker,) {}, ) def startWorker2(self, job, worker): deferred = worker.issueJob(job) return deferred def workerDone(self, result, worker): self.remoteobj.callRemote('jobDone', result) self.startWorker(worker) def workerFailed(self, failure, worker): self.remoteobj.callRemote('jobFailed', failure) self.startWorker(worker) I *still* think you need a job ID number to key successes and failures at the server side.
participants (2)
-
Phil Mayers
-
Yi Qiang