[Twisted-Python] XMLRPC server help neede
-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 Hi ! I have a application, a sort of routing system, where nodes communicate using XML-RPC. It works but to my mind it's too slow. Now, changing to some other protocol then XML-RPC is not an option presently, so I don't want to know what other protocol that would do it much better. I might eventually get there but not at this point in time. One reason for the slowness is that I'm not able to get the server to reply to the client when I would like it to. This is how it should work: 1) server gets a message from the client, checks that it is well formed 2) server determins if it knows what to do with the message 3) server performs the appropriate action on the message Now, if the the message is not well formed or if the server does not know what to do with the packet it should reject it. So far everything works as planned. If the packet is well formed and the server knows what to do with it, it should reply to the client and then perform the action. My problem is how I would go about doing this. Conceptually I could imaging having a workqueue where I would place the message and then from the point of view of the client-sever communication just forget about it. Anyone who has done anything similar or has an idea on how to do this ? - -- Roland -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.1 (Darwin) iD8DBQFCb5I6Vxk7TEwZBFwRAmPLAJ0YcIRPRsqSbb7f5cxNBp4y+aXdwQCgmad7 ZFagUsqKhSRgrWr2LDt+iw4= =/uhT -----END PGP SIGNATURE-----
Roland Hedberg <roland.hedberg@adm.umu.se> writes:
If the packet is well formed and the server knows what to do with it, it should reply to the client and then perform the action.
My problem is how I would go about doing this. Conceptually I could imaging having a workqueue where I would place the message and then from the point of view of the client-sever communication just forget about it.
Anyone who has done anything similar or has an idea on how to do this ?
I do this by subclassing twisted.web.xmlrpc.XMLRPC and handling the query in a work queue (appended below) that runs in its own thread. In my case the client doesn't care if it sent me well formed data or not, so immediately after putting the query in the work queue I return from the xmlrpc method and the client is free. In my case, I do some sanity checking of the query inside the queue. In your case you'd do the checking before stuffing the queue so you could inform the client. Here is a sketch of the server chopped out from my code: from twisted.web import xmlrpc class DataSource(xmlrpc.XMLRPC): "The XML-RPC listener" def __init__(self,services,sem): xmlrpc.XMLRPC.__init__(self) xmlrpc.addIntrospection(self) self.data_cq = CommandQueue() self.sem = sem self.services = services def _handle_sem_release(self,x): #print "DataSource._handle_sem_release(%s)"%str(x) self.sem.release() from twisted.python.failure import Failure if x.__class__ == Failure: if x.value[0] == "DEBUG": return x log.error(x) return x def xmlrpc_method(self,idstr,values): "Accept callbacks from Export API" d = self.sem.acquire() d.addCallback(lambda x: self.data_cq(self.services.method,idstr,values)) d.AddBoth(self._handle_sem_release) return 0 All the real work is done in the "services.method" method. You'll note that I use a Semaphore class (appended below). This is keep other operations not shown here from being executed in the middle of handling the query. BTW, the Semaphore and CommandQueue classes were developed with much help from this list. Thanks again! -Brett. # ------------------------------------------------ from twisted.internet import defer from Queue import Queue, Empty from twisted.python import failure class Semaphore(object): """Asynchronous semaphore stolen from: http://twistedmatrix.com/pipermail/twisted-python/2003-August/005323.html """ def __init__(self, value=1, verbose=None): self.queue = [] self.value = value def acquire(self): d = defer.Deferred() if self.value: self.value -= 1 d.callback(False) else: self.queue.append(d) return d def release(self): if self.queue: self.queue.pop(0).callback(True) else: self.value += 1 class CommandQueue: '''Queue up commands for serial calling. One must call the drain() method to start reading the internal queue. Most likely one wants to call this in a thread.''' all_queues = [] def __init__(self): "Create a CommandQueue" self.queue = Queue() self.stop = False CommandQueue.all_queues.append(self) from twisted.internet import reactor reactor.callInThread(self.drain) return def __call__(self,meth,*a,**k): '''Call meth(*a,**k) when it reaches end of queue. Returns a Deferred that will pass the return of meth.''' deferred = defer.Deferred() deferred.addErrback(self._error) self.queue.put((deferred,meth,a,k)) return deferred def _error(self,a): try: a.printTraceback(sys.stderr) except: print str(a) return a def drain(self): 'Drain the command queue until CommandQueue.stop is True' while not self.stop: try: d,meth,a,k = self.queue.get(True,1) except Empty: continue #print "calling %s(%s,%s)"%(meth.__name__,str(a),str(k)) try: res = meth(*a,**k) except Exception,err: res = failure.Failure(sys.exc_value) reactor.callFromThread(d.callback,res) #d.callback(meth(*a,**k)) #print "callback done" #print "drain closing" return 0
participants (2)
-
Brett Viren
-
Roland Hedberg