[Twisted-Python] Twisted scalability with hundreds of outbound (client) connections
![](https://secure.gravatar.com/avatar/426d6dbf6554a9b3fca1fd04e6b75f38.jpg?s=120&d=mm&r=g)
I have an SNMP poller written in Python that I'd like to turn into a long-running server process. The obvious thing (it is obvious - I really need the support infrastructure) is to convert it to twisted, since it provides easy support to turn the process into a server as well as a client (e.g. XMLRPC query mapping onto an SNMP fetch; using DBAPI to update node status, all hooked in via deferred, etc.) However, I've done a couple of proof-of-concept ports and am having scalability problems related to the way reactors work. I'm not entirely certain I've got the problem nailed, since the reactor sources are... complex... but I think what happens is this: The framework is heavily oriented towards getting IO out as soon as possible, and since there's no "scheduling" of receive events, protocol instances talking to "fast" clients eat all the CPU, and the socket buffer overflows since each datagramReceived triggers another write(). Possibly an example would help: class agent: def start(self): d = self.get_table('ifTable') d.addCallbacks(self.step2, self.die) def step2(self, ifTable): # Do some processing, then more data fetch d = self.get_table('ipAddrTable') d.addCallbacks(self.step3, self.die) def step3(self, ipAddrTable): # etc. etc. def do_timeouts(reactor): # get the list of sent PDUs now = time.time() for pdu in pdus: if pdu.due < now: pdu.deferred.errback(Timeout()) reactor.callLater(1, do_timeouts, reactor) import sys from twisted.internet import reactor for host in sys.argv[1:]: a = agent(host) reactor.callLater(0, a.start) reactor.callLater(1, do_timeouts, reactor) reactor.run() Now, I know there are scalability problems with callLater, but that's only being used for the startup because I can't think of a cleaner way of doing it - future activity is all triggered by deferreds. I'm handling the timeouts by running a once-per-second expiry which is sufficient for my needs. The *problem* is that the SNMP agents I'm talking to have an extremely wide range of performances, response times and quantity of data to retrieve, and the faster endpoints are consuming a disproportionate quantity of the CPU time; so much so in fact, that the UDP socket receive queue limit is being exceeded (I'm using a single UDP socket for all the clients, and breaking the responses back out based on SNMP message ID - this is to overcome FD limits). We're talking 750-1000 clients here BTW. What I really need is some way of making the reactor do a fair-queuing style of transmit/receive; I have some not-very-clear notion in my head of transport.write not actually doing the write, but just putting the output data onto a per-agent queue, and the reactor emptying those queues in a strictly round-robin fashion, *but* stopping immediately to service input data if it arrives. Hmm. I suppose I could implement that by subclassing the UDP transport: class queuedtransport: def write(self, data, host=None): if not host in self.host2queue: queue = max(self.host2queue.values()) + 1 self.host2queue[host] = queue else: queue = self.host2queue[host] self.queues.setdefault(queue, []).append(data) def queue(self): if not self.queues: return reactor.callLater(1, self.queue) while True: self.pos = (self.pos + 1) % len(self.queues) if not self.queues[self.pos]: continue self._write(self.queues[self.pos].pop(0)) break Or something - I'm fairly sure that code wouldn't quite work, but something similar. Does anyone have any suggestions how to handle this? My existing (home-grown, nasty) async framework can handle a lot of queries a second; upwards of several hundred, and it does a very good job of sharing out the load evenly so that fast systems complete quickly but the slower ones still get enough time to a) not suffer PDU timeouts and b) not exhaust inbound socket buffers. The way my current code works is: def receive(): r,w,x = select.select(fds,[],[]) for rfd in r: while True: try: pdu = r.read() if not pdu: break mark_answered(pdu) except: pass return len(r) else: return None while self.jobs: while True: if not receive(): break job = job.nextptr if job.waiting: if job.waiting.answered: job.receive() elif job.waiting.due < now: try: job.waiting.retry() except: # Out of retries job.cancel() You can probably see the way this works - on every loop, receive IO takes absolute precedence but goes into an input queue, and the input queue only has one "receive" event per IO-empty - *and* there's a pointer into the queue of jobs to ensure that they're serviced in a round-robin fashion if ready. Sadly, the pseudo-code above makes it look much cleaner than the real code, and I want the other protocol / service support along with other things like the deferred pattern and such. I would welcome any suggestions.
![](https://secure.gravatar.com/avatar/d7875f8cfd8ba9262bfff2bf6f6f9b35.jpg?s=120&d=mm&r=g)
Rather than doing queuing in the transport, do it in your DatagramProtocol code - instead of doing transport.write add stuff to a queue that schedules writes fairly for different machines.
![](https://secure.gravatar.com/avatar/426d6dbf6554a9b3fca1fd04e6b75f38.jpg?s=120&d=mm&r=g)
Itamar Shtull-Trauring wrote:
Rather than doing queuing in the transport, do it in your DatagramProtocol code - instead of doing transport.write add stuff to a queue that schedules writes fairly for different machines.
If I do this, I'll need: def xmit(self): while True: txqueue = self.txcurrent self.txcurrent = txqueue.next if txqueue: self.write(txqueue.pop(0)) break reactor.callLater(0, self.xmit) def write(self, data, address): self.getqueue(address).append(data) def rcv(self): while True: rxqueue = self.rxcurrent self.rxcurrent = rxqueue.next if rxqueue: pdu = rxqueue.pop(0) pdu.callback() break def datagramReceived(self, data, addr): pdu, queue = self.parse(data, addr) queue.append(pdu) reactor.callLater(0, self.rcv) I'm concerned about all those reactor.callLater - since one of the main problems is the UDP socket queue overflowing, every time I xmit I have to get *out* of the protocol code ASAP and back into the select() loop, however one of the problems with the reactors (problems for me at any rate) is that they do pending calls and thread stuff before IO, which IMHO is not quite the right way round. I'm also slightly concerned about the number of function calls involved in jumping in and out of the reactor that many times a second (several thousand, if I can get it to go as fast as my previous code) given how expensive they are under Python. It would certainly be quicker to implement this inside the reactor.mainLoop. Anyway, I'll give it a go, but I thought I saw stuff recently about scalability of callLater; and certainly I've had problems with a lot of callLaters (I used to use them for the timeouts before going for a simpler expiry scan) - maybe callLater will go faster in Python 2.4 with the C bisect.insort? Thanks for the tips
![](https://secure.gravatar.com/avatar/d7875f8cfd8ba9262bfff2bf6f6f9b35.jpg?s=120&d=mm&r=g)
On Sun, 2004-09-26 at 15:35, Phil Mayers wrote:
def xmit(self): while True: txqueue = self.txcurrent self.txcurrent = txqueue.next if txqueue: self.write(txqueue.pop(0)) break reactor.callLater(0, self.xmit)
This code confuses me. What is the "next" bit for?
I'm concerned about all those reactor.callLater - since one of the main problems is the UDP socket queue overflowing, every time I xmit I have to get *out* of the protocol code ASAP and back into the select() loop, however one of the problems with the reactors (problems for me at any rate) is that they do pending calls and thread stuff before IO, which IMHO is not quite the right way round.
ABABAB BABABA These are indistinguishable once you've done half an iteration, ABABAB is BABABA slightly timeshifted, it just affects the first and last iterations.
I'm also slightly concerned about the number of function calls involved in jumping in and out of the reactor that many times a second (several thousand, if I can get it to go as fast as my previous code) given how expensive they are under Python. It would certainly be quicker to implement this inside the reactor.mainLoop.
Just have a single reactor.callLater(0, f), and f() then calls all the functions you want done in that iteration.
![](https://secure.gravatar.com/avatar/426d6dbf6554a9b3fca1fd04e6b75f38.jpg?s=120&d=mm&r=g)
Itamar Shtull-Trauring wrote:
I'm also slightly concerned about the number of function calls involved in jumping in and out of the reactor that many times a second (several thousand, if I can get it to go as fast as my previous code) given how expensive they are under Python. It would certainly be quicker to implement this inside the reactor.mainLoop.
Just have a single reactor.callLater(0, f), and f() then calls all the functions you want done in that iteration.
Ok, just a quick note to people - I solved this as suggested, however reactor.callLater(0, func) does not work; because 0 always means *now*, you get the queuing problem: class SNMP(protocol.DatagramProtocol): def datagramReceived(self, data, addr): pdu = self.decode(data) self.timeouts.remove(pdu.deferred) self.queue(pdu.deferred.callback, (pdu,)) def queue(self, func, pargs): if not self.calls: # Schedule a receive at some later date reactor.callLater(0.001, self.dequeue) self.calls.append((func, pargs)) def dequeue(self): if not self.calls: return func, pargs = self.calls.pop(0) # The problem is here - this function will almost certainly # be a protocol action that will generate another transmit PDU # With many clients, the many transmits can overflow the input # queue while we're spinning inside code func(*pargs) if self.calls: # To avoid the problem, wait "delta" (some small number) # rather than zero; this will ensure a select() happens # before the callLater reactor.callLater(0.001, self.dequeue) class Agent: def __init__(self, host, proto): self.proto = proto def start(self): d = self.proto.query(self.host, 'get', oid1, oid2) d.addCallbacks(self.step2, self.error) def step2(self, pdu): # Do some stuff d = self.proto.query(self.host, 'get', self.whatnow[pdu]) d.addCallbacks(self.step3, self.error) proto = SNMP() for hostname in sys.argv[1:]: a = Agent(hostname, proto) # Start up one at a time, to avoid startup surge proto.queue(a.start, None) from twisted.internet import reactor reactor.run() I hope I'm explaining what's going on here - but if not, don't worry, the problem is more or less solved for me, thanks for the assistance. The only minor remaining niggle is that the static 0.001 value to callLater limits my theoretical max throughput to 1000 queries/sec. The only way to do without that parameter would be to execute a select() inside every function call in runUntilCurrent I think. As it happens, 1000/sec is more than the box can do anyway, so it's not a problem at the moment!
![](https://secure.gravatar.com/avatar/7ed9784cbb1ba1ef75454034b3a8e6a1.jpg?s=120&d=mm&r=g)
On Sun, 03 Oct 2004 10:28:27 -0400, Itamar Shtull-Trauring <itamar@itamarst.org> wrote:
callLater(0, f) does not mean 'now', it means 'next iteration'.
But as you know, due to an implementation bug, it _behaves_ as if it meant "now" when called from a function that is being called from a callLater(). Hopefully someone will get 'round to fixing this before 2.0. Jp
![](https://secure.gravatar.com/avatar/15fa47f2847592672210af8a25cd1f34.jpg?s=120&d=mm&r=g)
On Oct 3, 2004, at 3:54 PM, Itamar Shtull-Trauring wrote:
Do we have an open issue for this?
I believe it's fixed by http://www.twistedmatrix.com/bugs/issue707, although that issue isn't really about this bug. James
![](https://secure.gravatar.com/avatar/d7875f8cfd8ba9262bfff2bf6f6f9b35.jpg?s=120&d=mm&r=g)
On Sun, 2004-10-03 at 16:12, James Y Knight wrote:
I believe it's fixed by http://www.twistedmatrix.com/bugs/issue707, although that issue isn't really about this bug.
Have we figured out what to do about heapq on 2.2?
![](https://secure.gravatar.com/avatar/15fa47f2847592672210af8a25cd1f34.jpg?s=120&d=mm&r=g)
On Oct 4, 2004, at 12:04 PM, Itamar Shtull-Trauring wrote:
On Sun, 2004-10-03 at 16:12, James Y Knight wrote:
I believe it's fixed by http://www.twistedmatrix.com/bugs/issue707, although that issue isn't really about this bug.
Have we figured out what to do about heapq on 2.2?
Include a copy of it in our compatibility library. See thread "Including code under PSF license for backwards compatibility?". James
![](https://secure.gravatar.com/avatar/d7875f8cfd8ba9262bfff2bf6f6f9b35.jpg?s=120&d=mm&r=g)
On Sun, 2004-10-03 at 09:26, Phil Mayers wrote:
I hope I'm explaining what's going on here - but if not, don't worry, the problem is more or less solved for me, thanks for the assistance. The only minor remaining niggle is that the static 0.001 value to callLater limits my theoretical max throughput to 1000 queries/sec.
Using twisted.internet.task.LoopingCall should allow you higher speeds, as it's smarter than simply calling callLater repeatedly. -- Itamar Shtull-Trauring http://itamarst.org
participants (4)
-
exarkun@divmod.com
-
Itamar Shtull-Trauring
-
James Y Knight
-
Phil Mayers