
I have an SNMP poller written in Python that I'd like to turn into a long-running server process. The obvious thing (it is obvious - I really need the support infrastructure) is to convert it to twisted, since it provides easy support to turn the process into a server as well as a client (e.g. XMLRPC query mapping onto an SNMP fetch; using DBAPI to update node status, all hooked in via deferred, etc.) However, I've done a couple of proof-of-concept ports and am having scalability problems related to the way reactors work. I'm not entirely certain I've got the problem nailed, since the reactor sources are... complex... but I think what happens is this: The framework is heavily oriented towards getting IO out as soon as possible, and since there's no "scheduling" of receive events, protocol instances talking to "fast" clients eat all the CPU, and the socket buffer overflows since each datagramReceived triggers another write(). Possibly an example would help: class agent: def start(self): d = self.get_table('ifTable') d.addCallbacks(self.step2, self.die) def step2(self, ifTable): # Do some processing, then more data fetch d = self.get_table('ipAddrTable') d.addCallbacks(self.step3, self.die) def step3(self, ipAddrTable): # etc. etc. def do_timeouts(reactor): # get the list of sent PDUs now = time.time() for pdu in pdus: if pdu.due < now: pdu.deferred.errback(Timeout()) reactor.callLater(1, do_timeouts, reactor) import sys from twisted.internet import reactor for host in sys.argv[1:]: a = agent(host) reactor.callLater(0, a.start) reactor.callLater(1, do_timeouts, reactor) reactor.run() Now, I know there are scalability problems with callLater, but that's only being used for the startup because I can't think of a cleaner way of doing it - future activity is all triggered by deferreds. I'm handling the timeouts by running a once-per-second expiry which is sufficient for my needs. The *problem* is that the SNMP agents I'm talking to have an extremely wide range of performances, response times and quantity of data to retrieve, and the faster endpoints are consuming a disproportionate quantity of the CPU time; so much so in fact, that the UDP socket receive queue limit is being exceeded (I'm using a single UDP socket for all the clients, and breaking the responses back out based on SNMP message ID - this is to overcome FD limits). We're talking 750-1000 clients here BTW. What I really need is some way of making the reactor do a fair-queuing style of transmit/receive; I have some not-very-clear notion in my head of transport.write not actually doing the write, but just putting the output data onto a per-agent queue, and the reactor emptying those queues in a strictly round-robin fashion, *but* stopping immediately to service input data if it arrives. Hmm. I suppose I could implement that by subclassing the UDP transport: class queuedtransport: def write(self, data, host=None): if not host in self.host2queue: queue = max(self.host2queue.values()) + 1 self.host2queue[host] = queue else: queue = self.host2queue[host] self.queues.setdefault(queue, []).append(data) def queue(self): if not self.queues: return reactor.callLater(1, self.queue) while True: self.pos = (self.pos + 1) % len(self.queues) if not self.queues[self.pos]: continue self._write(self.queues[self.pos].pop(0)) break Or something - I'm fairly sure that code wouldn't quite work, but something similar. Does anyone have any suggestions how to handle this? My existing (home-grown, nasty) async framework can handle a lot of queries a second; upwards of several hundred, and it does a very good job of sharing out the load evenly so that fast systems complete quickly but the slower ones still get enough time to a) not suffer PDU timeouts and b) not exhaust inbound socket buffers. The way my current code works is: def receive(): r,w,x = select.select(fds,[],[]) for rfd in r: while True: try: pdu = r.read() if not pdu: break mark_answered(pdu) except: pass return len(r) else: return None while self.jobs: while True: if not receive(): break job = job.nextptr if job.waiting: if job.waiting.answered: job.receive() elif job.waiting.due < now: try: job.waiting.retry() except: # Out of retries job.cancel() You can probably see the way this works - on every loop, receive IO takes absolute precedence but goes into an input queue, and the input queue only has one "receive" event per IO-empty - *and* there's a pointer into the queue of jobs to ensure that they're serviced in a round-robin fashion if ready. Sadly, the pseudo-code above makes it look much cleaner than the real code, and I want the other protocol / service support along with other things like the deferred pattern and such. I would welcome any suggestions.