Roland Hedberg <roland.hedberg@adm.umu.se> writes:
If the packet is well formed and the server knows what to do with it, it should reply to the client and then perform the action.
My problem is how I would go about doing this. Conceptually I could imaging having a workqueue where I would place the message and then from the point of view of the client-sever communication just forget about it.
Anyone who has done anything similar or has an idea on how to do this ?
I do this by subclassing twisted.web.xmlrpc.XMLRPC and handling the query in a work queue (appended below) that runs in its own thread. In my case the client doesn't care if it sent me well formed data or not, so immediately after putting the query in the work queue I return from the xmlrpc method and the client is free. In my case, I do some sanity checking of the query inside the queue. In your case you'd do the checking before stuffing the queue so you could inform the client. Here is a sketch of the server chopped out from my code: from twisted.web import xmlrpc class DataSource(xmlrpc.XMLRPC): "The XML-RPC listener" def __init__(self,services,sem): xmlrpc.XMLRPC.__init__(self) xmlrpc.addIntrospection(self) self.data_cq = CommandQueue() self.sem = sem self.services = services def _handle_sem_release(self,x): #print "DataSource._handle_sem_release(%s)"%str(x) self.sem.release() from twisted.python.failure import Failure if x.__class__ == Failure: if x.value[0] == "DEBUG": return x log.error(x) return x def xmlrpc_method(self,idstr,values): "Accept callbacks from Export API" d = self.sem.acquire() d.addCallback(lambda x: self.data_cq(self.services.method,idstr,values)) d.AddBoth(self._handle_sem_release) return 0 All the real work is done in the "services.method" method. You'll note that I use a Semaphore class (appended below). This is keep other operations not shown here from being executed in the middle of handling the query. BTW, the Semaphore and CommandQueue classes were developed with much help from this list. Thanks again! -Brett. # ------------------------------------------------ from twisted.internet import defer from Queue import Queue, Empty from twisted.python import failure class Semaphore(object): """Asynchronous semaphore stolen from: http://twistedmatrix.com/pipermail/twisted-python/2003-August/005323.html """ def __init__(self, value=1, verbose=None): self.queue = [] self.value = value def acquire(self): d = defer.Deferred() if self.value: self.value -= 1 d.callback(False) else: self.queue.append(d) return d def release(self): if self.queue: self.queue.pop(0).callback(True) else: self.value += 1 class CommandQueue: '''Queue up commands for serial calling. One must call the drain() method to start reading the internal queue. Most likely one wants to call this in a thread.''' all_queues = [] def __init__(self): "Create a CommandQueue" self.queue = Queue() self.stop = False CommandQueue.all_queues.append(self) from twisted.internet import reactor reactor.callInThread(self.drain) return def __call__(self,meth,*a,**k): '''Call meth(*a,**k) when it reaches end of queue. Returns a Deferred that will pass the return of meth.''' deferred = defer.Deferred() deferred.addErrback(self._error) self.queue.put((deferred,meth,a,k)) return deferred def _error(self,a): try: a.printTraceback(sys.stderr) except: print str(a) return a def drain(self): 'Drain the command queue until CommandQueue.stop is True' while not self.stop: try: d,meth,a,k = self.queue.get(True,1) except Empty: continue #print "calling %s(%s,%s)"%(meth.__name__,str(a),str(k)) try: res = meth(*a,**k) except Exception,err: res = failure.Failure(sys.exc_value) reactor.callFromThread(d.callback,res) #d.callback(meth(*a,**k)) #print "callback done" #print "drain closing" return 0