[Twisted-Python] Factory question
![](https://secure.gravatar.com/avatar/41103198ecac8dc95025c4417de593ad.jpg?s=120&d=mm&r=g)
Hello everyone, I have a small question, I have a service which needs to sometimes send data (without having received any prior to sending) and sometimes receive data, which is better : 1) create a factory that inherits from ServerFactory and ClientFactory, thus it can listen and send data 2) create a factory that inherits from ServerFactory only and uses a single-use client (ClientCreator, as shown in the writing clients howto) when it needs to send data Thank you! Gabriel
![](https://secure.gravatar.com/avatar/d6304567ada7ac5e8c6f4e5902270831.jpg?s=120&d=mm&r=g)
On Wed, Feb 27, 2008 at 3:32 AM, Gabriel Rossetti <mailing_lists@evotex.ch> wrote:
I'm not sure of there's a single right way to do it, but I wouldn't bother inheriting from both ClientFactory and ServerFactory. I think you're on the write track with 2, though. In most use cases you shouldn't have to create custom factories. Just write the protocol to support bidirectional comm and to create the server: f = Factory() f.protocol = YourBidirectionalProtocol Regarding the client, how you implement it depends on whether or not the server is establishing the connection vs. reusing the existing connection. If you're establishing the connection (like in a cluster app with known peers), just use ClientCreator. If you're reusing the existing connection, then you might not have to anything, unless you have some state to set up which could be done by overriding connectionMade on your Protocol. Finally, take everything I've stated above with a grain of salt. -- \\\\\/\"/\\\\\\\\\\\ \\\\/ // //\/\\\\\\\ \\\/ \\// /\ \/\\\\ \\/ /\/ / /\/ /\ \\\ \/ / /\/ /\ /\\\ \\ / /\\\ /\\\ \\\\\/\ \/\\\\\/\\\\\/\\\\\\ d.p.s
![](https://secure.gravatar.com/avatar/41103198ecac8dc95025c4417de593ad.jpg?s=120&d=mm&r=g)
Drew Smathers wrote:
I had taken route 1 up until now (I'm thinking about switching...) Maybe there's something I haven't quite gotten, when ClientA initially connects to the server, the factory creates an instance of the protocol, correct? Now ClientA sends some data to the server, which processes it and sends something back. After that, the TCP session ends, and the client disconnects, and the protocols instance dies. Is this correct or does it live on and get reused somehow? I ask this because since initially the clients send data to the server (registration), the server will there after send data to the clients. This makes the client have to connect to the server initially though a port using reactor.connectTCP() and listen to a port (that the server now knows since the client registered itself) using reactor.listenTCP(). I think I have to use reactor.connectTCP() instead of ClientCreator since the connection has to happen at the beginning and a transport needs to exist before I can send anything. Well...now that I think about it, I could have the factory register the client...... that would make me not have to inherit from the Client factory.... (I've now switched to solution 2, see last part of this email).
Thanks, I find it fairly hard to get used to Twisted, I wanted to buy the book, but it was written in 2005 and I'm not sure if it's still valid with today's version. BTW, any idea why I'm getting this type of behavior (one server, 3 distinct connections from clients) : Daemon listening for connections... daemon proto instance 1 Connection from 127.0.0.1:57821! <-- ok, Client1 .... daemon proto instance 2 Connection from 127.0.0.1:57821! <-- Client1 again????? why? Connection from 127.0.0.1:57823! <-- ok, Client2 .... daemon proto instance 3 Connection from 127.0.0.1:57821! <-- ok, Client1 again????? why? Connection from 127.0.0.1:57823! <-- ok, Client2 again????? why? Connection from 127.0.0.1:57824! <-- ok, Client3 Oh, and by the time I finished writing this email, I've switched to solution 2, but I still get the behavior above.
![](https://secure.gravatar.com/avatar/d6304567ada7ac5e8c6f4e5902270831.jpg?s=120&d=mm&r=g)
On Thu, Feb 28, 2008 at 9:42 AM, Gabriel Rossetti <mailing_lists@evotex.ch> wrote:
Yes.
The protocol instance does not get reused.
This is might be a bad idea - depending on the locality of your servers and clients. Why not just use the established connection? If the *client* is listening on a port then it isn't just a client - it's a server, or a peer in a clustered system.
Ok.
The book it is not up to date.
BTW, any idea why I'm getting this type of behavior (one server, 3 distinct connections from clients) :
Without seeing your code, no.
-- \\\\\/\"/\\\\\\\\\\\ \\\\/ // //\/\\\\\\\ \\\/ \\// /\ \/\\\\ \\/ /\/ / /\/ /\ \\\ \/ / /\/ /\ /\\\ \\ / /\\\ /\\\ \\\\\/\ \/\\\\\/\\\\\/\\\\\\ d.p.s
![](https://secure.gravatar.com/avatar/41103198ecac8dc95025c4417de593ad.jpg?s=120&d=mm&r=g)
Drew Smathers wrote:
Ok
Ok, so every time there is data exchanged (new tcp/ip session) then a new protocol instance is created. Any persistence/state data must therefore be stored in the factory if I understand correctly. transmit messages to the daemon which routes/relays them to the correct service, sort of like a micro-kernel. So, sometimes the services initiate the communication process and sometimes they don't, the central server does. This makes the services be servers and clients. Imagine this : service1 has some data that needs to be processed by service2 (which will in turn send it to another service), is sends it to the central server, which sends it to service2. Service2 does whatever it has to do, and then sends it to the central server to route to service_n. So the established connection is usually useless, except for sending some sort of ACK maybe.
I guess it's/they a sort of peer(s) in a clustered system, it/they may reside on the same machine as the central server or not.
ok, it's a bit long..... -------------------------"Central server" aka Daemon--------------------------------------------------------------------------- class MdfXmlStreamFactory(XmlStreamFactoryMixin): """ The factory class used by the daemon and services to create protocol instances """ def __init__(self, proto, *args, **kwargs): """ Constructor @param proto: the protocol to use @type proto: a subclass of L{XmlStream<twisted.words.xish.xmlstream.XmlStream>} @param args: misc args @type args: C{tuple} @param kwargs: misc keyword args @type kwargs: C{dict} """ XmlStreamFactoryMixin.__init__(self) self.args = args self.kwargs = kwargs self.protocol = proto def buildProtocol(self, addr): """ Builds the protocol and @param addr: The address (protocol, IP, port) of the connection @type addr: L{IPv4Address<twisted.internet.address._ServerFactoryIPv4Address>} @return: an instance of the built protocol """ #self.resetDelay() xs = self.protocol(*self.args, **self.kwargs) xs.factory = self self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, xs.connected) # stream connect event or xml start event??? for event, fn in self.bootstraps: xs.addObserver(event, fn) return xs class MdfXmlStreamServerFactory(MdfXmlStreamFactory, ServerFactory): """ The factory class used by the daemon to create protocol instances """ # The registered services _services = {} def __init__(self, proto, *args, **kwargs): """ Constructor @param proto: the protocol to use @type proto: a subclass of L{XmlStream<twisted.words.xish.xmlstream.XmlStream>} @param args: misc args @type args: C{tuple} @param kwargs: misc keyword args @type kwargs: C{dict} """ MdfXmlStreamFactory.__init__(self, proto, *args, **kwargs) class Daemon(xmlstream.XmlStream): """ The daemon is the implementation of a microkernel type inter-service communication (ISC) routing daemon. Here is how it works : - Services announce their presence to the daemon by giving their name, version, ip, port and a list of message-types that they accept - The daemon listens for messages from the attached services, when one is received, it routes the message to the correct service @todo: add unique id generation/verification """ # Holds the real method __dataReceived = xmlstream.XmlStream.dataReceived # The registered services #__services = {} cnt = 1 def __init__(self, *args, **kwargs): """ Constructor @param args: non-keyword args @type args: C{tuple} @param kwargs: keyword args @type kwargs: C{dict} """ xmlstream.XmlStream.__init__(self) self.__routeTo = None self.__lastMsgType = None self.__lastMsgId = None self.inst = Daemon.cnt Daemon.cnt += 1 print "daemon proto instance %d" % self.inst def connectionMade(self): xmlstream.XmlStream.connectionMade(self) def dataReceived(self, data): """ Called everytime data is received @param data: the data received @type data: C{object} (anything) """ self.__dataReceived(data) def connectionLost(self, reason): """ Called when the connection is shut down, restores the dataReceived method @param reason: the reason why the connection was lost @type reason: C{str} """ self.__dataReceived = xmlstream.XmlStream.dataReceived self.__routeTo = None xmlstream.XmlStream.connectionLost(self, reason) def __onHeader(self, element): """ Analyse a header and set the data's recipiant @param element: the header element (XML) @type element: L{Element<twisted.words.xish.domish.Element>} """ print "got header from %s:%s : %s" % (str(self.transport.getPeer().host), str(self.transport.getPeer().port), element.toXml()) self.__lastMsgId = element.getAttribute("id") self.__lastMsgType = element.getAttribute("type") if(self.__lastMsgType != constants._REG_MSG_TYPE): self.__routeTo = self.factory._services[type] self.__dataReceived = __routeDataReceived def __onReg(self, element): """ Register a service @param element: the registeration element (XML) @type element: L{Element<twisted.words.xish.domish.Element>} """ print "got reg from %s:%s : %s" % (str(self.transport.getPeer().host), str(self.transport.getPeer().port), element.toXml()) name = str(xpath.XPathQuery("/body/reg/name").queryForNodes(element)[0]) version = str(xpath.XPathQuery("/body/reg/version").queryForNodes(element)[0]) #address = str(xpath.XPathQuery("/body/reg/address").queryForNodes(element)[0]) port = int(str(xpath.XPathQuery("/body/reg/port").queryForNodes(element)[0])) msgs = [ str(m) for m in xpath.XPathQuery("/body/reg/message_type").queryForNodes(element) ] address = self.transport.getPeer().host #port = self.transport.getPeer().port serv = ServiceReg(name, version, msgs, address, port) self.__registerService(serv) def connected(self, xs): """ Called when a client connects using an XML stream @param xs: the current xml stream @type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>} """ print 'Connection from %s:%s!' % (str(self.transport.getPeer().host), str(self.transport.getPeer().port)) xs.addObserver("/header", self.__onHeader) xs.addObserver("/body/reg", self.__onReg) def __routeDataReceived(self, data): """ Pushes the messages to the correct service @param data: the data received @type data: C{object} (anything) """ print "route '%s' to : %s" % (str(data), self.__routeTo) utils.sendMessage(self.__routeTo.ip, self.__routeTo.port, data) #self.send(data) def __registerService(self, service): """ Register a service @param service: the service to register @type service: L{ServiceReg} @raise ServiceMessageConflictError: if another service already has a message registered that the current service is trying to register @todo: what is to be done with the exception once raised???? Finish status message... """ def foundConflict(self, msgTypes): """ Check if there is a conflict with message types to be registered by this service @param service: the service messages to check for conflicts @type service: C{str} @return: the conflicting service type or None if no conflict is found """ for mt in msgTypes: if(self.factory._services.has_key(mt)): return mt return None print "Registering service : ", str(service) try: # # Check if another service already registered a message type that # the current service is trying to register # conflict = foundConflict(self, service.acceptedMsgs) if(conflict != None): raise ServiceMessageConflictError(conflict) # # Regrister the message types and this service # for msgType in service.acceptedMsgs: self.factory._services[msgType] = service except ServiceMessageConflictError, reason: status = utils.createConfirmationMsgBody(constants._MSG_FAILURE_TYPE, self.__lastMsgId, str(reason)) else: status = utils.createConfirmationMsgBody(constants._MSG_SUCCESS_TYPE, self.__lastMsgId) # # Send registeration confirmation (succeeded or failed) # msgRoot = utils.createMessage(constants._REG_MSG_CONFIRM_TYPE, constants._CONF_MSG_ID, constants._DAEMON_SERVICE_NAME, constants._MSG_SPEC_VERSION, constants._STATUS_DATA_TYPE, status) print "Sending confirmation message to %s : %s" % (self.transport.getPeer().host, msgRoot.toXml()) #self.send(msgRoot) utils.sendMessage(self.transport.getPeer().host, service.port, msgRoot) if(__name__ == "__main__"): reactor.listenTCP(4321, MdfXmlStreamServerFactory(Daemon)) print "Listening for connections..." reactor.run() ----------------------------------------"Service"----------------------------------------------------------------------------------------------------- class MdfXmlStreamClientFactory(MdfXmlStreamServerFactory): """ The factory class used by the services to create protocol instances @attention: this class might dissapear, I have to see if it's useful to keep it or not """ __daemonAddrs = None __daemonPort = None def __init__(self, proto, *args, **kwargs): """ Constructor @param proto: the protocol to use @type proto: a subclass of L{XmlStream<twisted.words.xish.xmlstream.XmlStream>} @param args: misc args @type args: C{tuple} @param kwargs: misc keyword args @type kwargs: C{dict} """ MdfXmlStreamServerFactory.__init__(self, proto, *args, **kwargs) self._serviceInfo = ServiceReg(kwargs["name"], kwargs["version"], list(kwargs.get("messageTypes", []))) MdfXmlStreamClientFactory.__daemonAddrs = kwargs["address"] MdfXmlStreamClientFactory.__daemonPort = kwargs["port"] def register(self, port): """ Register the service @param port: the service's port @type port: C{int} """ self._serviceInfo.port = port msgBodyData = utils.createRegMsgBody(self._serviceInfo.name, self._serviceInfo.version, str(self._serviceInfo.port), self._serviceInfo.acceptedMsgs) msgRoot = utils.createMessage(constants._REG_MSG_TYPE, constants._REG_MSG_ID, self._serviceInfo.name, constants._MSG_SPEC_VERSION, constants._REG_DATA_TYPE, msgBodyData) utils.sendMessage(MdfXmlStreamClientFactory.__daemonAddrs, MdfXmlStreamClientFactory.__daemonPort, msgRoot) class BaseService(xmlstream.XmlStream): """ The service is the implementation of a microkernel type inter-service communication (ISC) endpoint. Here is how it works : - Services announce their presence to the daemon by giving their name, version, ip, port and a list of message-types that they accept - The daemon listens for messages from the attached services, when one is received, it routes the message to the correct service @todo: add unique id generation/verification """ def __init__(self, *args, **kwargs): """ Constructor @param args: non-keyword args @type args: C{tuple} @param kwargs: keyword args @type kwargs: C{dict} """ xmlstream.XmlStream.__init__(self) self._msgSrc = None self._msgDest = None self._msgBodyData = None self._registered = False def _onHeader(self, element): """ Analyse a header and save the source and destination @param element: the header element (XML) @type element: L{Element<twisted.words.xish.domish.Element>} @todo: add msg spec version verification @todo: add id verification??? """ print "got header from %s:%s : %s" % (str(self.transport.getPeer().host), str(self.transport.getPeer().port), element.toXml()) self._msgSrc = xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("source") self._msgDest = xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("destination") def _onBody(self, element): """ Get the body act accordingly @param element: the body element (XML) @type element: L{Element<twisted.words.xish.domish.Element>} @todo: add data type verification @todo: call data action callbacks """ self._msgBodyData = xpath.XPathQuery("/body").queryForNodes(element)[0].toXml() print "_onbody : ", self._msgBodyData def _onConfirm(self, element): """ Get the confirmation message and act accordingly @param element: the confirmation element (XML) @type element: L{Element<twisted.words.xish.domish.Element>} @todo: add data type verification @todo: call data action callbacks """ status = str(xpath.XPathQuery("/body/conf/status").queryForNodes(element)[0]) id = int(str(xpath.XPathQuery("/body/conf/id").queryForNodes(element)[0])) msg = str(xpath.XPathQuery("/body/conf/msg").queryForNodes(element)[0]) if(id == constants._REG_MSG_ID): if(status == constants._MSG_SUCCESS_TYPE): self._registered = True def connected(self, xs): """ Called when a client connects using an XML stream @param xs: the current xml stream @type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>} @todo: add data action callbacks """ print 'Connection from %s:%s!' % (str(self.transport.getPeer().host), str(self.transport.getPeer().port)) xs.addObserver("/header", self._onHeader) xs.addObserver("/body/conf", self._onConfirm) xs.addObserver("/body", self._onBody) def start(daemonAddrs, daemonPort, serviceRef, serviceName, serviceVersion, serviceMessageTypes): """ Start the daemon @param daemonPort: the port the daemon listens on @type daemonPort: C{int} @param serviceRef: a reference to the service's class @type serviceRef: a subclass of L{BaseService<service.BaseService>} @param serviceName: the service's name @type serviceName: C{str} @param serviceVersion: the service's version @type serviceVersion: C{str} @param serviceMessageTypes: the list of messages the service registers @type serviceMessageTypes: C{str list} """ f = MdfXmlStreamClientFactory(serviceRef, address=daemonAddrs, port=daemonPort, name=serviceName, version=serviceVersion, messageTypes=serviceMessageTypes) port = reactor.listenTCP(0, f).getHost().port f.register(port) print port reactor.run() if(__name__ == "__main__"): start("localhost", 4321, BaseService, "service_base", "1.0", ["all"])
Thank you, Gabriel
![](https://secure.gravatar.com/avatar/41103198ecac8dc95025c4417de593ad.jpg?s=120&d=mm&r=g)
Gabriel Rossetti wrote: traced it down to the following code in : self.addBootstrap(xmlstream.STREAM_START_EVENT, xs._connected) self.addBootstrap(xmlstream.STREAM_END_EVENT, xs._disconnected) for event, fn in self.bootstraps: xs.addObserver(event, fn) since the original XmlStreamFactory/XmlStreamFactoryMixin was designed to be a XML client and not a server, it has no notion of differentiating protocol instances (since there is only one) Gabriel
![](https://secure.gravatar.com/avatar/d6304567ada7ac5e8c6f4e5902270831.jpg?s=120&d=mm&r=g)
On Wed, Feb 27, 2008 at 3:32 AM, Gabriel Rossetti <mailing_lists@evotex.ch> wrote:
I'm not sure of there's a single right way to do it, but I wouldn't bother inheriting from both ClientFactory and ServerFactory. I think you're on the write track with 2, though. In most use cases you shouldn't have to create custom factories. Just write the protocol to support bidirectional comm and to create the server: f = Factory() f.protocol = YourBidirectionalProtocol Regarding the client, how you implement it depends on whether or not the server is establishing the connection vs. reusing the existing connection. If you're establishing the connection (like in a cluster app with known peers), just use ClientCreator. If you're reusing the existing connection, then you might not have to anything, unless you have some state to set up which could be done by overriding connectionMade on your Protocol. Finally, take everything I've stated above with a grain of salt. -- \\\\\/\"/\\\\\\\\\\\ \\\\/ // //\/\\\\\\\ \\\/ \\// /\ \/\\\\ \\/ /\/ / /\/ /\ \\\ \/ / /\/ /\ /\\\ \\ / /\\\ /\\\ \\\\\/\ \/\\\\\/\\\\\/\\\\\\ d.p.s
![](https://secure.gravatar.com/avatar/41103198ecac8dc95025c4417de593ad.jpg?s=120&d=mm&r=g)
Drew Smathers wrote:
I had taken route 1 up until now (I'm thinking about switching...) Maybe there's something I haven't quite gotten, when ClientA initially connects to the server, the factory creates an instance of the protocol, correct? Now ClientA sends some data to the server, which processes it and sends something back. After that, the TCP session ends, and the client disconnects, and the protocols instance dies. Is this correct or does it live on and get reused somehow? I ask this because since initially the clients send data to the server (registration), the server will there after send data to the clients. This makes the client have to connect to the server initially though a port using reactor.connectTCP() and listen to a port (that the server now knows since the client registered itself) using reactor.listenTCP(). I think I have to use reactor.connectTCP() instead of ClientCreator since the connection has to happen at the beginning and a transport needs to exist before I can send anything. Well...now that I think about it, I could have the factory register the client...... that would make me not have to inherit from the Client factory.... (I've now switched to solution 2, see last part of this email).
Thanks, I find it fairly hard to get used to Twisted, I wanted to buy the book, but it was written in 2005 and I'm not sure if it's still valid with today's version. BTW, any idea why I'm getting this type of behavior (one server, 3 distinct connections from clients) : Daemon listening for connections... daemon proto instance 1 Connection from 127.0.0.1:57821! <-- ok, Client1 .... daemon proto instance 2 Connection from 127.0.0.1:57821! <-- Client1 again????? why? Connection from 127.0.0.1:57823! <-- ok, Client2 .... daemon proto instance 3 Connection from 127.0.0.1:57821! <-- ok, Client1 again????? why? Connection from 127.0.0.1:57823! <-- ok, Client2 again????? why? Connection from 127.0.0.1:57824! <-- ok, Client3 Oh, and by the time I finished writing this email, I've switched to solution 2, but I still get the behavior above.
![](https://secure.gravatar.com/avatar/d6304567ada7ac5e8c6f4e5902270831.jpg?s=120&d=mm&r=g)
On Thu, Feb 28, 2008 at 9:42 AM, Gabriel Rossetti <mailing_lists@evotex.ch> wrote:
Yes.
The protocol instance does not get reused.
This is might be a bad idea - depending on the locality of your servers and clients. Why not just use the established connection? If the *client* is listening on a port then it isn't just a client - it's a server, or a peer in a clustered system.
Ok.
The book it is not up to date.
BTW, any idea why I'm getting this type of behavior (one server, 3 distinct connections from clients) :
Without seeing your code, no.
-- \\\\\/\"/\\\\\\\\\\\ \\\\/ // //\/\\\\\\\ \\\/ \\// /\ \/\\\\ \\/ /\/ / /\/ /\ \\\ \/ / /\/ /\ /\\\ \\ / /\\\ /\\\ \\\\\/\ \/\\\\\/\\\\\/\\\\\\ d.p.s
![](https://secure.gravatar.com/avatar/41103198ecac8dc95025c4417de593ad.jpg?s=120&d=mm&r=g)
Drew Smathers wrote:
Ok
Ok, so every time there is data exchanged (new tcp/ip session) then a new protocol instance is created. Any persistence/state data must therefore be stored in the factory if I understand correctly. transmit messages to the daemon which routes/relays them to the correct service, sort of like a micro-kernel. So, sometimes the services initiate the communication process and sometimes they don't, the central server does. This makes the services be servers and clients. Imagine this : service1 has some data that needs to be processed by service2 (which will in turn send it to another service), is sends it to the central server, which sends it to service2. Service2 does whatever it has to do, and then sends it to the central server to route to service_n. So the established connection is usually useless, except for sending some sort of ACK maybe.
I guess it's/they a sort of peer(s) in a clustered system, it/they may reside on the same machine as the central server or not.
ok, it's a bit long..... -------------------------"Central server" aka Daemon--------------------------------------------------------------------------- class MdfXmlStreamFactory(XmlStreamFactoryMixin): """ The factory class used by the daemon and services to create protocol instances """ def __init__(self, proto, *args, **kwargs): """ Constructor @param proto: the protocol to use @type proto: a subclass of L{XmlStream<twisted.words.xish.xmlstream.XmlStream>} @param args: misc args @type args: C{tuple} @param kwargs: misc keyword args @type kwargs: C{dict} """ XmlStreamFactoryMixin.__init__(self) self.args = args self.kwargs = kwargs self.protocol = proto def buildProtocol(self, addr): """ Builds the protocol and @param addr: The address (protocol, IP, port) of the connection @type addr: L{IPv4Address<twisted.internet.address._ServerFactoryIPv4Address>} @return: an instance of the built protocol """ #self.resetDelay() xs = self.protocol(*self.args, **self.kwargs) xs.factory = self self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, xs.connected) # stream connect event or xml start event??? for event, fn in self.bootstraps: xs.addObserver(event, fn) return xs class MdfXmlStreamServerFactory(MdfXmlStreamFactory, ServerFactory): """ The factory class used by the daemon to create protocol instances """ # The registered services _services = {} def __init__(self, proto, *args, **kwargs): """ Constructor @param proto: the protocol to use @type proto: a subclass of L{XmlStream<twisted.words.xish.xmlstream.XmlStream>} @param args: misc args @type args: C{tuple} @param kwargs: misc keyword args @type kwargs: C{dict} """ MdfXmlStreamFactory.__init__(self, proto, *args, **kwargs) class Daemon(xmlstream.XmlStream): """ The daemon is the implementation of a microkernel type inter-service communication (ISC) routing daemon. Here is how it works : - Services announce their presence to the daemon by giving their name, version, ip, port and a list of message-types that they accept - The daemon listens for messages from the attached services, when one is received, it routes the message to the correct service @todo: add unique id generation/verification """ # Holds the real method __dataReceived = xmlstream.XmlStream.dataReceived # The registered services #__services = {} cnt = 1 def __init__(self, *args, **kwargs): """ Constructor @param args: non-keyword args @type args: C{tuple} @param kwargs: keyword args @type kwargs: C{dict} """ xmlstream.XmlStream.__init__(self) self.__routeTo = None self.__lastMsgType = None self.__lastMsgId = None self.inst = Daemon.cnt Daemon.cnt += 1 print "daemon proto instance %d" % self.inst def connectionMade(self): xmlstream.XmlStream.connectionMade(self) def dataReceived(self, data): """ Called everytime data is received @param data: the data received @type data: C{object} (anything) """ self.__dataReceived(data) def connectionLost(self, reason): """ Called when the connection is shut down, restores the dataReceived method @param reason: the reason why the connection was lost @type reason: C{str} """ self.__dataReceived = xmlstream.XmlStream.dataReceived self.__routeTo = None xmlstream.XmlStream.connectionLost(self, reason) def __onHeader(self, element): """ Analyse a header and set the data's recipiant @param element: the header element (XML) @type element: L{Element<twisted.words.xish.domish.Element>} """ print "got header from %s:%s : %s" % (str(self.transport.getPeer().host), str(self.transport.getPeer().port), element.toXml()) self.__lastMsgId = element.getAttribute("id") self.__lastMsgType = element.getAttribute("type") if(self.__lastMsgType != constants._REG_MSG_TYPE): self.__routeTo = self.factory._services[type] self.__dataReceived = __routeDataReceived def __onReg(self, element): """ Register a service @param element: the registeration element (XML) @type element: L{Element<twisted.words.xish.domish.Element>} """ print "got reg from %s:%s : %s" % (str(self.transport.getPeer().host), str(self.transport.getPeer().port), element.toXml()) name = str(xpath.XPathQuery("/body/reg/name").queryForNodes(element)[0]) version = str(xpath.XPathQuery("/body/reg/version").queryForNodes(element)[0]) #address = str(xpath.XPathQuery("/body/reg/address").queryForNodes(element)[0]) port = int(str(xpath.XPathQuery("/body/reg/port").queryForNodes(element)[0])) msgs = [ str(m) for m in xpath.XPathQuery("/body/reg/message_type").queryForNodes(element) ] address = self.transport.getPeer().host #port = self.transport.getPeer().port serv = ServiceReg(name, version, msgs, address, port) self.__registerService(serv) def connected(self, xs): """ Called when a client connects using an XML stream @param xs: the current xml stream @type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>} """ print 'Connection from %s:%s!' % (str(self.transport.getPeer().host), str(self.transport.getPeer().port)) xs.addObserver("/header", self.__onHeader) xs.addObserver("/body/reg", self.__onReg) def __routeDataReceived(self, data): """ Pushes the messages to the correct service @param data: the data received @type data: C{object} (anything) """ print "route '%s' to : %s" % (str(data), self.__routeTo) utils.sendMessage(self.__routeTo.ip, self.__routeTo.port, data) #self.send(data) def __registerService(self, service): """ Register a service @param service: the service to register @type service: L{ServiceReg} @raise ServiceMessageConflictError: if another service already has a message registered that the current service is trying to register @todo: what is to be done with the exception once raised???? Finish status message... """ def foundConflict(self, msgTypes): """ Check if there is a conflict with message types to be registered by this service @param service: the service messages to check for conflicts @type service: C{str} @return: the conflicting service type or None if no conflict is found """ for mt in msgTypes: if(self.factory._services.has_key(mt)): return mt return None print "Registering service : ", str(service) try: # # Check if another service already registered a message type that # the current service is trying to register # conflict = foundConflict(self, service.acceptedMsgs) if(conflict != None): raise ServiceMessageConflictError(conflict) # # Regrister the message types and this service # for msgType in service.acceptedMsgs: self.factory._services[msgType] = service except ServiceMessageConflictError, reason: status = utils.createConfirmationMsgBody(constants._MSG_FAILURE_TYPE, self.__lastMsgId, str(reason)) else: status = utils.createConfirmationMsgBody(constants._MSG_SUCCESS_TYPE, self.__lastMsgId) # # Send registeration confirmation (succeeded or failed) # msgRoot = utils.createMessage(constants._REG_MSG_CONFIRM_TYPE, constants._CONF_MSG_ID, constants._DAEMON_SERVICE_NAME, constants._MSG_SPEC_VERSION, constants._STATUS_DATA_TYPE, status) print "Sending confirmation message to %s : %s" % (self.transport.getPeer().host, msgRoot.toXml()) #self.send(msgRoot) utils.sendMessage(self.transport.getPeer().host, service.port, msgRoot) if(__name__ == "__main__"): reactor.listenTCP(4321, MdfXmlStreamServerFactory(Daemon)) print "Listening for connections..." reactor.run() ----------------------------------------"Service"----------------------------------------------------------------------------------------------------- class MdfXmlStreamClientFactory(MdfXmlStreamServerFactory): """ The factory class used by the services to create protocol instances @attention: this class might dissapear, I have to see if it's useful to keep it or not """ __daemonAddrs = None __daemonPort = None def __init__(self, proto, *args, **kwargs): """ Constructor @param proto: the protocol to use @type proto: a subclass of L{XmlStream<twisted.words.xish.xmlstream.XmlStream>} @param args: misc args @type args: C{tuple} @param kwargs: misc keyword args @type kwargs: C{dict} """ MdfXmlStreamServerFactory.__init__(self, proto, *args, **kwargs) self._serviceInfo = ServiceReg(kwargs["name"], kwargs["version"], list(kwargs.get("messageTypes", []))) MdfXmlStreamClientFactory.__daemonAddrs = kwargs["address"] MdfXmlStreamClientFactory.__daemonPort = kwargs["port"] def register(self, port): """ Register the service @param port: the service's port @type port: C{int} """ self._serviceInfo.port = port msgBodyData = utils.createRegMsgBody(self._serviceInfo.name, self._serviceInfo.version, str(self._serviceInfo.port), self._serviceInfo.acceptedMsgs) msgRoot = utils.createMessage(constants._REG_MSG_TYPE, constants._REG_MSG_ID, self._serviceInfo.name, constants._MSG_SPEC_VERSION, constants._REG_DATA_TYPE, msgBodyData) utils.sendMessage(MdfXmlStreamClientFactory.__daemonAddrs, MdfXmlStreamClientFactory.__daemonPort, msgRoot) class BaseService(xmlstream.XmlStream): """ The service is the implementation of a microkernel type inter-service communication (ISC) endpoint. Here is how it works : - Services announce their presence to the daemon by giving their name, version, ip, port and a list of message-types that they accept - The daemon listens for messages from the attached services, when one is received, it routes the message to the correct service @todo: add unique id generation/verification """ def __init__(self, *args, **kwargs): """ Constructor @param args: non-keyword args @type args: C{tuple} @param kwargs: keyword args @type kwargs: C{dict} """ xmlstream.XmlStream.__init__(self) self._msgSrc = None self._msgDest = None self._msgBodyData = None self._registered = False def _onHeader(self, element): """ Analyse a header and save the source and destination @param element: the header element (XML) @type element: L{Element<twisted.words.xish.domish.Element>} @todo: add msg spec version verification @todo: add id verification??? """ print "got header from %s:%s : %s" % (str(self.transport.getPeer().host), str(self.transport.getPeer().port), element.toXml()) self._msgSrc = xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("source") self._msgDest = xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("destination") def _onBody(self, element): """ Get the body act accordingly @param element: the body element (XML) @type element: L{Element<twisted.words.xish.domish.Element>} @todo: add data type verification @todo: call data action callbacks """ self._msgBodyData = xpath.XPathQuery("/body").queryForNodes(element)[0].toXml() print "_onbody : ", self._msgBodyData def _onConfirm(self, element): """ Get the confirmation message and act accordingly @param element: the confirmation element (XML) @type element: L{Element<twisted.words.xish.domish.Element>} @todo: add data type verification @todo: call data action callbacks """ status = str(xpath.XPathQuery("/body/conf/status").queryForNodes(element)[0]) id = int(str(xpath.XPathQuery("/body/conf/id").queryForNodes(element)[0])) msg = str(xpath.XPathQuery("/body/conf/msg").queryForNodes(element)[0]) if(id == constants._REG_MSG_ID): if(status == constants._MSG_SUCCESS_TYPE): self._registered = True def connected(self, xs): """ Called when a client connects using an XML stream @param xs: the current xml stream @type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>} @todo: add data action callbacks """ print 'Connection from %s:%s!' % (str(self.transport.getPeer().host), str(self.transport.getPeer().port)) xs.addObserver("/header", self._onHeader) xs.addObserver("/body/conf", self._onConfirm) xs.addObserver("/body", self._onBody) def start(daemonAddrs, daemonPort, serviceRef, serviceName, serviceVersion, serviceMessageTypes): """ Start the daemon @param daemonPort: the port the daemon listens on @type daemonPort: C{int} @param serviceRef: a reference to the service's class @type serviceRef: a subclass of L{BaseService<service.BaseService>} @param serviceName: the service's name @type serviceName: C{str} @param serviceVersion: the service's version @type serviceVersion: C{str} @param serviceMessageTypes: the list of messages the service registers @type serviceMessageTypes: C{str list} """ f = MdfXmlStreamClientFactory(serviceRef, address=daemonAddrs, port=daemonPort, name=serviceName, version=serviceVersion, messageTypes=serviceMessageTypes) port = reactor.listenTCP(0, f).getHost().port f.register(port) print port reactor.run() if(__name__ == "__main__"): start("localhost", 4321, BaseService, "service_base", "1.0", ["all"])
Thank you, Gabriel
![](https://secure.gravatar.com/avatar/41103198ecac8dc95025c4417de593ad.jpg?s=120&d=mm&r=g)
Gabriel Rossetti wrote: traced it down to the following code in : self.addBootstrap(xmlstream.STREAM_START_EVENT, xs._connected) self.addBootstrap(xmlstream.STREAM_END_EVENT, xs._disconnected) for event, fn in self.bootstraps: xs.addObserver(event, fn) since the original XmlStreamFactory/XmlStreamFactoryMixin was designed to be a XML client and not a server, it has no notion of differentiating protocol instances (since there is only one) Gabriel
participants (2)
-
Drew Smathers
-
Gabriel Rossetti