[Twisted-Python] [p2p-hackers] source code (in python, using twisted) to my reliable udp rpc protocol (fwd from angryhicKclown@netscape.net)

----- Forwarded message from angryhicKclown@netscape.net ----- From: angryhicKclown@netscape.net Date: Sat, 07 Aug 2004 14:23:26 -0400 To: p2p-hackers@zgp.org Subject: [p2p-hackers] source code (in python, using twisted) to my reliable udp rpc protocol X-Mailer: Atlas Mailer 2.0 Reply-To: "Peer-to-peer development." <p2p-hackers@zgp.org> I was paging thru the archives and realized I never posted it. Works for me, but I've never done a large-scale test of it. Perhaps someone would like to and share the results? It consists of several layers, a stack, if you will: Reliable datagram - handles acks, etc Datagram stream - allows one to send messages larger than the MTU CBOB - binary format for storing structured data, similar to xml, except it doesn't suck TinyRPC - RPC protocol build upon all of those layers Enjoy. Requires www.twistedmatrix.com. ---- cut here ---- # TinyRPC protocol # bsd license from twisted.internet import reactor, protocol, interfaces, defer from twisted.protocols import policies import socket import struct import binascii import cStringIO as StringIO import time CBOB_BOOL = 0 CBOB_INT = 1 CBOB_FLOAT = 2 CBOB_STR = 3 CBOB_USTR = 4 CBOB_TUPLE = 5 CBOB_LIST = 6 CBOB_DICT = 7 CBOB_OBJ = 8 CBOB_NONE = 9 def cbob_encode(obj, buf=StringIO.StringIO()): if isinstance(obj, bool): if obj: buf.write(struct.pack("! BB", CBOB_BOOL, 1)) else: buf.write(struct.pack("! BB", CBOB_BOOL, 0)) elif isinstance(obj, int): buf.write(struct.pack("! Bi", CBOB_INT, obj)) elif isinstance(obj, float): buf.write(struct.pack("! Bf", CBOB_FLOAT, obj)) elif isinstance(obj, str): buf.write(struct.pack("! BH", CBOB_STR, len(obj)) + obj) elif isinstance(obj, unicode): buf.write(struct.pack("! BH", CBOB_USTR, len(obj)) + obj) elif isinstance(obj, tuple): buf.write(struct.pack("! BB", CBOB_TUPLE, len(obj))) for o in obj: cbob_encode(o, buf) elif isinstance(obj, list): buf.write(struct.pack("! BH", CBOB_LIST, len(obj))) for o in obj: cbob_encode(o, buf) elif isinstance(obj, dict): buf.write(struct.pack("! BH", CBOB_DICT, len(obj))) for o in obj: cbob_encode(o, buf) cbob_encode(obj[o], buf) elif isinstance(obj, object): members = obj.__dict__.items() #inspect.getmembers(obj) buf.write(struct.pack("! BHB", CBOB_OBJ, len(members), len(obj.__class__.__name__))) buf.write(obj.__class__.__name__) for member in members: cbob_encode(member[0], buf) cbob_encode(member[1], buf) elif isinstance(obj, NoneType): buf.write(struct.pack("! B", CBOB_NONE)) else: raise "couldn't serialize " + str(obj) def cbob_decode(buf): d = buf.read(1) typ = struct.unpack("! B", d)[0] if typ == CBOB_BOOL: v = struct.unpack("! B", buf.read(1))[0] return v == 1 elif typ == CBOB_INT: return struct.unpack("! i", buf.read(4))[0] elif typ == CBOB_FLOAT: return struct.unpack("! f", buf.read(4))[0] elif typ == CBOB_STR: l = struct.unpack("! H", buf.read(2))[0] return buf.read(l) elif typ == CBOB_USTR: l = struct.unpack("! H", buf.read(2))[0] return unicode(buf.read(l)) elif typ == CBOB_TUPLE: l = struct.unpack("! B", buf.read(1))[0] v = () for i in range(0, l): v = v + (cbob_decode(buf),) return v elif typ == CBOB_LIST: l = struct.unpack("! H", buf.read(2))[0] v = [] for i in range(0, l): v.append(cbob_decode(buf)) return v elif typ == CBOB_DICT: l = struct.unpack("! H", buf.read(2))[0] v = {} for i in range(0, l): key,value = cbob_decode(buf),cbob_decode(buf) v[key] = value return v elif typ == CBOB_OBJ: membercount,clsnamesize = struct.unpack("! HB", buf.read(3)) clsname = buf.read(clsnamesize) v = globals()[clsname]() for i in range(0, membercount): name,value = cbob_decode(buf),cbob_decode(buf) setattr(v, name, value) return v elif typ == CBOB_NONE: return None else: raise "invalid typecode " + typ class DispatcherTransport: __implements__ = interfaces.IUDPConnectedTransport def __init__(self, dispatcher, addr, proto): self.dispatcher = dispatcher self.addr = addr self.protocol = proto def getHost(self): return self.dispatcher.getHost() def getPeer(self): return ("INET",) + self.addr def write(self, packet): return self.dispatcher.transport.write(packet, self.addr) def loseConnection(self): self.protocol.stopProtocol() del self.protocol class Dispatcher(protocol.DatagramProtocol): # TODO: call startFactory()? def __init__(self, factory): self.handlers = {} factory.dispatcher = self self.factory = factory def __getitem__(self, addr): if addr not in self.handlers: return self.open_new(addr) return self.handlers[addr] def datagramReceived(self, data, addr): self[addr].datagramReceived(data) def getHost(self): return self.transport.getHost() def open_new(self, addr): assert addr not in self.handlers, "Already connected" p = self.factory.buildProtocol(addr) p.transport = DispatcherTransport(self, addr, p) p.startProtocol() self.handlers[addr] = p return p def startProtocol(self): return self.factory.startFactory() def stopProtocol(self): return self.factory.stopFactory() class ReliableDatagramProtocol(protocol.ConnectedDatagramProtocol): "acks and stuff" NUM_RETRANSMITS = 2 #10 RETRANSMIT_INTERVAL = .200 MAX_LAST_RECEIVED = 50 OP_SEND = 0 OP_ACK = 128 # msb set def startProtocol(self): self.retransmits = {} # crc->(data,num retransmits, bits, delayedcall for retransmit, Deferred when packet is acked) self.last_received = [] def stopProtocol(self, reason): print "protocol stopped",reason def send(self, data, bits=0): crc = binascii.crc32(data) d = defer.Deferred() self.retransmits[crc] = (data, self.NUM_RETRANSMITS, bits, reactor.callLater(self.RETRANSMIT_INTERVAL, self.retransmit, crc), d) self.send_packet(self.OP_SEND, bits, data) return d def retransmit(self, crc): t = self.retransmits[crc] self.send_packet(self.OP_SEND, t[2], t[0]) if t[1] > 0: self.retransmits[crc] = (t[0], t[1] - 1, t[2], reactor.callLater(self.RETRANSMIT_INTERVAL, self.retransmit, crc), t[4]) else: self.packetLost(retransmits[crc][0]) del self.retransmits[crc] def send_packet(self, op, bits, data): self.transport.write(struct.pack("! B", op | bits) + data) def send_ack(self, crc): if len(self.last_received) >= self.MAX_LAST_RECEIVED: del self.last_received[0] self.last_received.append(crc) self.transport.write(struct.pack("! Bi", self.OP_ACK, crc)) def got_ack(self, crc): self.retransmits[crc][3].cancel() d = self.retransmits[crc][4] del self.retransmits[crc] d.callback(crc) def datagramReceived(self, data): opbyte, = struct.unpack("! B", data[0]) op = opbyte & 128 data = data[1:] if op == self.OP_SEND: crc = binascii.crc32(data) if crc not in self.last_received: self.dataReceived(data, opbyte) self.send_ack(crc) else: self.got_ack(struct.unpack("! i", data)[0]) def dataReceived(self, data, bits): print "received",data,"from",self.transport.getPeer() def packetLost(self): self.transport.loseConnection() class ReliableDatagramStreamMessageProtocol(ReliableDatagramProtocol): "Sends large messages (> mtu size) and sends them in a buffered block..." MTU = 512 OP_BEGINMSG = 64 # 2nd msb set OP_DATA = 0 def startProtocol(self): ReliableDatagramProtocol.startProtocol(self) self.outmessagequeue = [] # data len, stream, bits self.inmessage = None self.packetcount = 0 self.numpackets = 0 def sendMessage(self, data, bits=0): self.outmessagequeue.append((len(data), StringIO.StringIO(data), bits)) if len(self.outmessagequeue) == 1: self.send_first_block() def send_first_block(self, dummy=""): length,stream,bits = self.outmessagequeue[0] length = length - self.MTU + 4 if length < 0: length = 0 length = length + 1 self.send(struct.pack("! I", length / self.MTU + length % self.MTU) + stream.read(self.MTU - 4), self.OP_BEGINMSG | bits).addCallback(self.send_next_block) def send_next_block(self, dummy=""): length,stream,bits = self.outmessagequeue[0] d = stream.read(self.MTU) if len(d) > 0: self.send(d, self.OP_DATA | bits).addCallback(self.send_next_block) else: stream.close() del self.outmessagequeue[0] if len(self.outmessagequeue) > 0: self.send_next_block() def dataReceived(self, data, bits): if bits & 64 == self.OP_DATA: self.packetcount = self.packetcount + 1 self.inmessage.write(data) else: self.inmessage = StringIO.StringIO() self.packetcount = 1 mtu = len(data) # the entire packet size is the size of the mtu self.numpackets = struct.unpack("! I", data[:4])[0] self.inmessage.write(data[4:]) if self.packetcount >= self.numpackets: self.messageReceived(self.inmessage.getvalue(), bits) self.inmessage = StringIO.StringIO() def messageReceived(self, message, bits): print "got message", message class DatagramRPCProtocol(ReliableDatagramStreamMessageProtocol): OP_CALL = 32 OP_RETURN = 0 MAX_CALLID = 512 def startProtocol(self): ReliableDatagramStreamMessageProtocol.startProtocol(self) if hasattr(self.factory, "buildRemote"): self.remotes = self.factory.buildRemote() self.calls = {} self.callid = 0 def messageReceived(self, message, bits): message = StringIO.StringIO(message) if bits & 32 == self.OP_CALL: t, objname, methodname, args, kwargs = cbob_decode(message) r = getattr(self.remotes[objname], methodname)(*args,**kwargs) #print "calling " + objname + "." + methodname if isinstance(r, defer.Deferred): r.addCallback(lambda result: self.send_result(t, result)) else: self.send_result(t, r) else: t,r = cbob_decode(message) self.calls[t].callback(r) def send_result(self, t, r): sio = StringIO.StringIO() cbob_encode((t, r), sio) self.sendMessage(sio.getvalue(), self.OP_RETURN) def callRemote(self, objname, methodname, *args, **kwargs): t = self.callid if self.callid == self.MAX_CALLID: self.callid = 0 else: self.callid = self.callid + 1 self.calls[t] = defer.Deferred() sio = StringIO.StringIO() cbob_encode((t, objname, methodname, args, kwargs), sio) self.sendMessage(sio.getvalue(), self.OP_CALL) return self.calls[t] #class LazyRMIProtocol(DatagramRPCProtocol): # raise "Todo: implement" class TestObject: def hello(self, name): return "Hello, " + name.first + " " + name.last class Name: first = "" last = "" def test(): reactor.callLater(0, test2) reactor.run() def test2(): f = protocol.ServerFactory() #f.protocol = ReliableDatagramProtocol #f.protocol = ReliableDatagramStreamMessageProtocol f.protocol = DatagramRPCProtocol f.buildRemote = lambda: {"myObj" : TestObject()} d = Dispatcher(f) d2 = Dispatcher(f) p = d[socket.gethostbyname("localhost"), 8889] p2 = d[socket.gethostbyname("localhost"), 8888] reactor.listenUDP(8888, d) reactor.listenUDP(8889, d2) reactor.callLater(0, test3, p, p2) def test3(p, p2): name = Name() name.first = "great" name.last = "world" p.callRemote("myObj", "hello", name).addCallback(test4) def test4(r): print "result of call", r if __name__ == "__main__": test() __________________________________________________________________ Switch to Netscape Internet Service. As low as $9.95 a month -- Sign up today at http://isp.netscape.com/register Netscape. Just the Net You Need. New! Netscape Toolbar for Internet Explorer Search from anywhere on the Web and block those annoying pop-ups. Download now at http://channels.netscape.com/ns/search/install.jsp _______________________________________________ p2p-hackers mailing list p2p-hackers@zgp.org http://zgp.org/mailman/listinfo/p2p-hackers _______________________________________________ Here is a web page listing P2P Conferences: http://www.neurogrid.net/twiki/bin/view/Main/PeerToPeerConferences ----- End forwarded message ----- -- Eugen* Leitl <a href="http://leitl.org">leitl</a> ______________________________________________________________ ICBM: 48.07078, 11.61144 http://www.leitl.org 8B29F6BE: 099D 78BA 2FD3 B014 B08A 7779 75B0 2443 8B29 F6BE http://moleculardevices.org http://nanomachines.net
participants (1)
-
Eugen Leitl