[Twisted-Python] UDP and multiple access

Hi, I am working on a little project to list the servers of the online game «Enemy Territory» and also to list/find a known player. I try to use twisted to simplify the network access. I am new both to twisted and python, but as i am also a programmer, learning is quite easy. The task consists in 2 times: 1) one UDP request to the master server at idsoftware that returns the list of the active (slaves) servers (currently around 2020 hosts) 2) one UDP request to *each* slave server to obtain characteristics (name, map that is played, ...) and the list of the players For the time i managed to perform the task 1) and i can obtain the list of the hosts:ports to ask for. If someone would like to peek an eye on my code, i uploaded it [1]. My problem is now to send around 2000+ non blocking requests and to take the answers as they come back. Data treatment should not be a problem, but i don't really see how to perform the mass request. I don't understand clearly the twisted terminology (protocols, factory, ...). I have seen in another script [2] a case where there is just one datagramReceived that checks on the host:port of the datagram to distinguish the answers, but that is kind of a library, and it only shows 2 requests for a test. I have no idea for the mass request. Would someone be kind to guide me a little about it ? For example by just giving me a snippet for a multiple UDP request with the hosts are in a list, and if the datagramReceived will suit the response treatment. I hope to be clear enough. TIA. Footnotes: [1] http://sebastien.kirche.free.fr/python_stuff/master_query.py (comments welcome) [2] http://sebastien.kirche.free.fr/python_stuff/twist_p2p.py (initially published in the twisted mailing-list) Sébastien Kirche

Sebastien Kirche wrote:
Ah ha. This is extremely similar to what I've been doing recently (SNMP, with many many agents, lots of concurrent UDP clients). Here's the setup I used (warning: comes with no guarantee, may end your marriage, etc. - specifically, read the warning at the bottom). This is very, very pseudo-code - my actual code has a lot of very confusing non-relevant stuff in it. It uses a queue to buffer the receive events and ensure the UDP socket queue is emptied asap (the queue function exits quickly, but reschdules itself a short time in the future; this short time is enough to let select() run and the data be received - see the recent thread on the mailing list about "scalability with hundreds of clients" and callLater(0, ...) not doing what you might think). That same queue is used to start off the clients' first xmit, meaning they'll be relatively well interspersed and you won't overload the socket *output* buffer either. class Protocol(protocol.DatagramProtocol): def __init__(self): self._queue = [] self.timeouts = [] from twisted.internet import reactor self.reactor = reactor reactor.callLater(1, self.dotimeouts) def dotimeouts(self): now = time.time() while self.timeouts: due, deferred = self.timeouts[0] if due > now: break due, deferred = self.timeouts.pop(0) # Might have been called, don't timeout if so if not deferred.called: deferred.errback(Timeout()) def queue(self, callable, pargs=tuple(), kwargs=dict()): if not self.calls: # Then we won't have schedules an dequeue either # WARNING: this number is important... self.reactor.callLater(0.001, self.dequeue) self._queue.append((callable, pargs, kwargs)) def dequeue(self): if not self.calls: # Shouldn't happen return callable, pargs, kwargs = self._queue.pop(0) callable(*pargs, **kwargs) if self.calls: # If we've more to dequeue, do so # WARNING: this number is also important... self.reactor.callLater(0.001, self.dequeue) def datagramReceived(self, data, addr): # Do stuff, then... pdu = self.parse(data) deferred = self.get_outstanding(pdu, addr) self.queue(deferred.callback, pdu) def query(self, host, op, args): # Do stuff, then pdu = self.encode(op, args) self.transport.write(pdu.bytes(), host) return self.set_outstanding(pdu, host) class Client: def __init__(self, host, protocol): self.protocol = protocol self.host = host self.protocol.queue(self.step1) def step1(self): deferred = self.protocol.query(self.host, 'op', 'args') deferred.addCallbacks(self.step2, self.fail) def step2(self, pdu): for thing, value in pdu.items(): # blah, blah pass deferred = self.protocol.query(self.host, 'op2', None) deferred.addCallbacks(self.step3, self.fail) if __name__=='__main__': import sys from twisted.internet import reactor proto = Protocol() reactor.listenUDP(0, proto) for hostname in sys.argv[1:] if ':' in hostname: hostname, port = hostname.split(':') else: port = defaultport client = Client((hostname, port), proto) reactor.run() Now, I make no claims this is the perfect Twisted app - it ain't. What it *does* show is the only way I've found (which may be entirely my lack of ability) to scalably send multiple hundreds of UDP PDUs without: a) Starving the Twisted mainloop of CPU, meaning select() doesn't get run often enough, and the UDP socket buffer overflows, dropping replies and necessitating retransmits b) Using a lot of sockets, which runs into problems with select() and poll() as well as the system fd limit (solveable with ulimit I'll grant) c) Starving the clients that are slow responders of "cpu" (queue) time The warning however: UDP, lacking flow control, is very easy to get wrong and accidentally DDoS the clients you're trying to talk to. Specifically the value in the reactor.callLater(0.001, ...) determines how often a "task" will be dequeued from the protocol, and therefore how many PDUs you'll send and receive events you'll process per second. Best to start with a) a small number of servers and b) a larger value (lower rate) for that. You should also probably implement some form of variable per-host timeout to get some kind of rate control/ However, I've had a great deal of difficultly making these points understood to other coders, for which I've got two explanations; I'm totally wrong, or it's a very subtle issue. Guess which I think it is :o)

Le 11 Oct 2004, Phil Mayers s'est exprimé ainsi :
Many many thanks for that code, despite of what you may think of it : I have now some code to study and a direction to look for. Formerly i did not know where to go. I'll look at it carefully and will make some experiment. If can get something working, i'll notify it there. Or if i have some more precise question too. Regards, Sébastien Kirche

Sebastien Kirche wrote:
Ah ha. This is extremely similar to what I've been doing recently (SNMP, with many many agents, lots of concurrent UDP clients). Here's the setup I used (warning: comes with no guarantee, may end your marriage, etc. - specifically, read the warning at the bottom). This is very, very pseudo-code - my actual code has a lot of very confusing non-relevant stuff in it. It uses a queue to buffer the receive events and ensure the UDP socket queue is emptied asap (the queue function exits quickly, but reschdules itself a short time in the future; this short time is enough to let select() run and the data be received - see the recent thread on the mailing list about "scalability with hundreds of clients" and callLater(0, ...) not doing what you might think). That same queue is used to start off the clients' first xmit, meaning they'll be relatively well interspersed and you won't overload the socket *output* buffer either. class Protocol(protocol.DatagramProtocol): def __init__(self): self._queue = [] self.timeouts = [] from twisted.internet import reactor self.reactor = reactor reactor.callLater(1, self.dotimeouts) def dotimeouts(self): now = time.time() while self.timeouts: due, deferred = self.timeouts[0] if due > now: break due, deferred = self.timeouts.pop(0) # Might have been called, don't timeout if so if not deferred.called: deferred.errback(Timeout()) def queue(self, callable, pargs=tuple(), kwargs=dict()): if not self.calls: # Then we won't have schedules an dequeue either # WARNING: this number is important... self.reactor.callLater(0.001, self.dequeue) self._queue.append((callable, pargs, kwargs)) def dequeue(self): if not self.calls: # Shouldn't happen return callable, pargs, kwargs = self._queue.pop(0) callable(*pargs, **kwargs) if self.calls: # If we've more to dequeue, do so # WARNING: this number is also important... self.reactor.callLater(0.001, self.dequeue) def datagramReceived(self, data, addr): # Do stuff, then... pdu = self.parse(data) deferred = self.get_outstanding(pdu, addr) self.queue(deferred.callback, pdu) def query(self, host, op, args): # Do stuff, then pdu = self.encode(op, args) self.transport.write(pdu.bytes(), host) return self.set_outstanding(pdu, host) class Client: def __init__(self, host, protocol): self.protocol = protocol self.host = host self.protocol.queue(self.step1) def step1(self): deferred = self.protocol.query(self.host, 'op', 'args') deferred.addCallbacks(self.step2, self.fail) def step2(self, pdu): for thing, value in pdu.items(): # blah, blah pass deferred = self.protocol.query(self.host, 'op2', None) deferred.addCallbacks(self.step3, self.fail) if __name__=='__main__': import sys from twisted.internet import reactor proto = Protocol() reactor.listenUDP(0, proto) for hostname in sys.argv[1:] if ':' in hostname: hostname, port = hostname.split(':') else: port = defaultport client = Client((hostname, port), proto) reactor.run() Now, I make no claims this is the perfect Twisted app - it ain't. What it *does* show is the only way I've found (which may be entirely my lack of ability) to scalably send multiple hundreds of UDP PDUs without: a) Starving the Twisted mainloop of CPU, meaning select() doesn't get run often enough, and the UDP socket buffer overflows, dropping replies and necessitating retransmits b) Using a lot of sockets, which runs into problems with select() and poll() as well as the system fd limit (solveable with ulimit I'll grant) c) Starving the clients that are slow responders of "cpu" (queue) time The warning however: UDP, lacking flow control, is very easy to get wrong and accidentally DDoS the clients you're trying to talk to. Specifically the value in the reactor.callLater(0.001, ...) determines how often a "task" will be dequeued from the protocol, and therefore how many PDUs you'll send and receive events you'll process per second. Best to start with a) a small number of servers and b) a larger value (lower rate) for that. You should also probably implement some form of variable per-host timeout to get some kind of rate control/ However, I've had a great deal of difficultly making these points understood to other coders, for which I've got two explanations; I'm totally wrong, or it's a very subtle issue. Guess which I think it is :o)

Le 11 Oct 2004, Phil Mayers s'est exprimé ainsi :
Many many thanks for that code, despite of what you may think of it : I have now some code to study and a direction to look for. Formerly i did not know where to go. I'll look at it carefully and will make some experiment. If can get something working, i'll notify it there. Or if i have some more precise question too. Regards, Sébastien Kirche
participants (2)
-
Phil Mayers
-
Sebastien Kirche