[Twisted-Python] Alarm collection and display system?
![](https://secure.gravatar.com/avatar/800fd332a461d0669051f78e934d3e39.jpg?s=120&d=mm&r=g)
Hi, Our experiment has need for a pseudo real-time (prompt) networked alarm collection and display system. Before just diving in and writing something I thought I'd best try to see if anything is out there and Twisted certainly seems a fertile enough framework to have had at least one such system sprouted already. What we have is several, independent processes all of which can raise problems of varying degrees of severity. Most of the alarm producers are written in differing languages and frameworks and are scattered over different computers (some in different states). We want to centralize these alarms and then serve them to one or more consumers. Ideally alarms are "pushed" to consumers rather than needing to poll so as to minimize latency between production and consumption. But, because some consumers will be behind firewalls the consumers should initiate the connection. Anything already fit this bill or come close? If we do end up rolling our own, any suggestions to get us started in the right direction? Thanks, -Brett.
![](https://secure.gravatar.com/avatar/d6328babd9f9a98ecc905e1ccac2495e.jpg?s=120&d=mm&r=g)
Brett Viren wrote:
The obvious solution is just to roll your own and keep sockets always open from the consumers behind firewalls, and push notifications down those channels. I'm also working on something like this, a protocol called "Q2Q", but it's heavily in development now so I wouldn't suggest it if you have any deadlines.
![](https://secure.gravatar.com/avatar/b5d77f55ec76afdbb1750fd696040536.jpg?s=120&d=mm&r=g)
Hi Brett, Let me restate the problem (in an oversimplified way, sorry about that) - the ability for disparate systems scattered across the network to push messages to interested consumers distributed across the network in a (near) real-time fashion. Did I get it right ? If that is the case, then as Glyph suggested, keep your consumers available to get messages from the producers. As an aside, can you share how you plan to go about creating a distributed subscription service - I mean, how do the consumers provide information for producers to know about the subscription status. If you think it is not right to send to the list, can you atleast (if possible) mail me atleast. Thank You Gangadhar On 5/19/05, Glyph Lefkowitz <glyph@divmod.com> wrote:
![](https://secure.gravatar.com/avatar/800fd332a461d0669051f78e934d3e39.jpg?s=120&d=mm&r=g)
Gangadhar NPK <npk.gangadhar@gmail.com> writes:
Yep, exactly.
If that is the case, then as Glyph suggested, keep your consumers available to get messages from the producers.
Yes, sage advice.
The idea was to *not* distribute the subscription service, but rather to centralize it into a single, well known, server, (actually, a "proxy server" might be the more proper term). All producers deliver messages to this single proxy which then delivers them to any connected consumers. The producers don't know or care who the consumers are. Very simple (I hope). My first choice would be to use XML-RPC for the protocol. Since there are a plethora of XML-RPC implementations it should make it easier to glue in this protocol to the disparate producers. -Brett.
![](https://secure.gravatar.com/avatar/a1de9c949cc19cce09e2ca5e724069de.jpg?s=120&d=mm&r=g)
Hi, I've written a simple twisted server, which accepts client connections, and respond with some simple text depending on what the client sends. This is working fine if I use telnet to connect to the server, and enter the commands one at a time. I'd like to write a twisted client to simulate this, i.e., the client has a list of commands to send, it will send one at a time, (may be wait a little bit between the commands,) and print out the responses from the server. I couldn't seem to get beyond the first command, following code seems to send all the commands at the end, rather than one at a time, how can I add sleep in between? Thanks: from twisted.internet.protocol import ReconnectingClientFactory from twisted.protocols import basic from twisted.internet import reactor from sys import stdout import time class MyClientProtocol(basic.LineReceiver): def lineReceived(self,data): stdout.write("Server:" + data+"\n"), def connectionMade(self): stdout.write("connectionMade\n") self.transport.write("start:\r\n") self.transport.write("command1:\r\n") self.transport.write("command2:\r\n") self.transport.write("command3:\r\n") self.transport.write("end:\r\n") class MyClientFactory(ReconnectingClientFactory): def startedConnecting(self, connector): stdout.write("Started to connect\n") def buildProtocol(self,addr): stdout.write("Connected\nResetting reconnection delay") self.resetDelay() return MyClientProtocol() def clientConnectionLost(self, connector, reason): stdout.write("Lost connection, reason:" + reason) ReconnectingClientFactory.clientConnectionList(self, connector, reason) def clientConnectionFailed(self, connector,reason): stdout.write("Connection failed, reason:" + reason) ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) def main(): reactor.connectTCP("localhost",82828, MyClientFactory()) reactor.run() if __name__ == '__main__': main()
![](https://secure.gravatar.com/avatar/7ed9784cbb1ba1ef75454034b3a8e6a1.jpg?s=120&d=mm&r=g)
On Fri, 20 May 2005 16:04:50 -0700, theshz <theshz@gmail.com> wrote:
Try this version of connectionMade, along with this definition of lineReceived: lines = ["command1", "command2", "command3", "end"] def connectionMade(self): print "connectionMade" self.lines = self.lines[:] self.sendLine("start:") def lineReceived(self, line): print "Got a line:", repr(line) if self.lines: self.sendLine(self.lines.pop(0) + ":") Of course, there are other ways to do this. You could respond to timing events instead of network events: lines = ["command1", "command2", "command3", "end"] def connectionMade(self): print "connectionMade" self.lines = self.lines[:] self.sendCommand() def sendCommand(self): self.sendLine(self.lines.pop(0) + ":") if self.lines: reactor.callLater(3, self.sendCommand) Or you could respond to events from stdin, or from another connected protocol, or a GUI, or .... Hope this helps, Jp
![](https://secure.gravatar.com/avatar/a1de9c949cc19cce09e2ca5e724069de.jpg?s=120&d=mm&r=g)
Hi, from the documentation I could only find examples where the events are related more to Deferred, i.e., a method call that may take sometime. I'm a little confused about the difference between this and the "traditional" kind of event handling. Are they the same with just different terminology? By "traditional", I mean that in part of the code, I generate an event, say a "step 1 finished event", somewhere else there is a handler waiting for this event. The reactor is responsible for dispatching this event to that handler, which hopefull starts step 2. Or even more, like publish-subscribe: multiple handers can register for the same event. In other words, these events are generated internally, not necessarily by such delays like network, file access, or user input. Is this doable in Twisted? Thanks. Z.
![](https://secure.gravatar.com/avatar/b932b1e5a3e8299878e579f51f49b84a.jpg?s=120&d=mm&r=g)
On May 25, 2005, at 11:37 PM, theshz wrote:
Deferreds are just objects that have a list of (result, error) callback pairs. When the result or error is available, it's passed to the first appropriate callback. The result or error returned by that callback is sent to the next, etc. Its job is to pass that *single* result on to everything in its callback chain either one or zero times, and then it should be garbage collected because its job is done. The way this would works is like this (bare bones example without any error handling, etc.): ### from twisted.internet import defer, reactor def step1(): d = defer.Deferred() # In two seconds, call the callback with the result 42 reactor.callLater(2.0, d.callback, 7) return d def step2(resultOfStep1): d = defer.Deferred() # In two seconds, call the callback with the result of the # argument times 6 reactor.callLater(2.0, d.callback, resultOfStep1 * 6) return d def doAllSteps(): # step1 returns a deferred d = step1() # step 2 takes the result of step1 as an argument # so we can use it as the callback for the deferred, # since the callback always receives the result as the # first argument d = d.addCallback(step2) # the other thing to note is that step2 returns a deferred, which # will automatically be chained, so we can just return it here # as our deferred return d def main(): def printResultAndQuit(result): print "the answer is:", result reactor.stop() d = doAllSteps() d.addCallback(printResultAndQuit) # this should think for about 4 seconds, # print the answer to everything, and then # return. reactor.run() if __name__ == '__main__': main() ### It is an excellent primitive for building a notification system on top of, but it isn't one. As you can see in Twisted's source, there is rarely a need for an event dispatching system, so one doesn't really exist (there is one on the reactor for startup and shutdown events, but that's about it). In almost all cases the "problem" is solved by: (a) having some particular name for a method to be implemented in a subclass (e.g. subclass LineReceiver and implement lineReceived) (b) using a delegate that implements some method with a particular name (e.g. protocols telling their transport to lose connection, or transports notifying their protocol of a lost connection) (c) using deferreds (i.e. twisted.web.server.Request.notifyFinish) The only one that looks like traditional publish-subscribe is really the reactor's system events (twisted.internet.interfaces.IReactorCore) -bob
![](https://secure.gravatar.com/avatar/1dc353f57c9d41b76d9888386da76806.jpg?s=120&d=mm&r=g)
On 5/21/05, Jp Calderone <exarkun@divmod.com> wrote:
okey , I think it's hard to parse command line in twisted reactor event loopst . In my programs , I always 'import cmd' and in between command , use reacotr.run() to send a message ,and waiting for a replay when the replay comes back, the protocol() will terminate the connection . and back into cmd loop. Hope this helps, ZP
![](https://secure.gravatar.com/avatar/45ca0fac4d15daedf40d77c7bee58f5e.jpg?s=120&d=mm&r=g)
theshz <theshz <at> gmail.com> writes:
Why do you want to simulate this? You might be interested in the source code of imagination. There is some test/demo code in the repository you might find interesting. I'm working on a project that borrows heavily from this code but has its own parser grammer writtn with pyparsing.
![](https://secure.gravatar.com/avatar/b5d77f55ec76afdbb1750fd696040536.jpg?s=120&d=mm&r=g)
So this is a subscription model, with a single point aggregator right ? The *proxy* aggregates the messages from the producers, while the consumers subscribe with the proxy regarding the message(s) they are interested in and the messages are pushed to them by the proxy. On 5/21/05, Brett Viren <bv@bnl.gov> wrote:
![](https://secure.gravatar.com/avatar/d6328babd9f9a98ecc905e1ccac2495e.jpg?s=120&d=mm&r=g)
Brett Viren wrote:
The obvious solution is just to roll your own and keep sockets always open from the consumers behind firewalls, and push notifications down those channels. I'm also working on something like this, a protocol called "Q2Q", but it's heavily in development now so I wouldn't suggest it if you have any deadlines.
![](https://secure.gravatar.com/avatar/b5d77f55ec76afdbb1750fd696040536.jpg?s=120&d=mm&r=g)
Hi Brett, Let me restate the problem (in an oversimplified way, sorry about that) - the ability for disparate systems scattered across the network to push messages to interested consumers distributed across the network in a (near) real-time fashion. Did I get it right ? If that is the case, then as Glyph suggested, keep your consumers available to get messages from the producers. As an aside, can you share how you plan to go about creating a distributed subscription service - I mean, how do the consumers provide information for producers to know about the subscription status. If you think it is not right to send to the list, can you atleast (if possible) mail me atleast. Thank You Gangadhar On 5/19/05, Glyph Lefkowitz <glyph@divmod.com> wrote:
![](https://secure.gravatar.com/avatar/800fd332a461d0669051f78e934d3e39.jpg?s=120&d=mm&r=g)
Gangadhar NPK <npk.gangadhar@gmail.com> writes:
Yep, exactly.
If that is the case, then as Glyph suggested, keep your consumers available to get messages from the producers.
Yes, sage advice.
The idea was to *not* distribute the subscription service, but rather to centralize it into a single, well known, server, (actually, a "proxy server" might be the more proper term). All producers deliver messages to this single proxy which then delivers them to any connected consumers. The producers don't know or care who the consumers are. Very simple (I hope). My first choice would be to use XML-RPC for the protocol. Since there are a plethora of XML-RPC implementations it should make it easier to glue in this protocol to the disparate producers. -Brett.
![](https://secure.gravatar.com/avatar/a1de9c949cc19cce09e2ca5e724069de.jpg?s=120&d=mm&r=g)
Hi, I've written a simple twisted server, which accepts client connections, and respond with some simple text depending on what the client sends. This is working fine if I use telnet to connect to the server, and enter the commands one at a time. I'd like to write a twisted client to simulate this, i.e., the client has a list of commands to send, it will send one at a time, (may be wait a little bit between the commands,) and print out the responses from the server. I couldn't seem to get beyond the first command, following code seems to send all the commands at the end, rather than one at a time, how can I add sleep in between? Thanks: from twisted.internet.protocol import ReconnectingClientFactory from twisted.protocols import basic from twisted.internet import reactor from sys import stdout import time class MyClientProtocol(basic.LineReceiver): def lineReceived(self,data): stdout.write("Server:" + data+"\n"), def connectionMade(self): stdout.write("connectionMade\n") self.transport.write("start:\r\n") self.transport.write("command1:\r\n") self.transport.write("command2:\r\n") self.transport.write("command3:\r\n") self.transport.write("end:\r\n") class MyClientFactory(ReconnectingClientFactory): def startedConnecting(self, connector): stdout.write("Started to connect\n") def buildProtocol(self,addr): stdout.write("Connected\nResetting reconnection delay") self.resetDelay() return MyClientProtocol() def clientConnectionLost(self, connector, reason): stdout.write("Lost connection, reason:" + reason) ReconnectingClientFactory.clientConnectionList(self, connector, reason) def clientConnectionFailed(self, connector,reason): stdout.write("Connection failed, reason:" + reason) ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) def main(): reactor.connectTCP("localhost",82828, MyClientFactory()) reactor.run() if __name__ == '__main__': main()
![](https://secure.gravatar.com/avatar/7ed9784cbb1ba1ef75454034b3a8e6a1.jpg?s=120&d=mm&r=g)
On Fri, 20 May 2005 16:04:50 -0700, theshz <theshz@gmail.com> wrote:
Try this version of connectionMade, along with this definition of lineReceived: lines = ["command1", "command2", "command3", "end"] def connectionMade(self): print "connectionMade" self.lines = self.lines[:] self.sendLine("start:") def lineReceived(self, line): print "Got a line:", repr(line) if self.lines: self.sendLine(self.lines.pop(0) + ":") Of course, there are other ways to do this. You could respond to timing events instead of network events: lines = ["command1", "command2", "command3", "end"] def connectionMade(self): print "connectionMade" self.lines = self.lines[:] self.sendCommand() def sendCommand(self): self.sendLine(self.lines.pop(0) + ":") if self.lines: reactor.callLater(3, self.sendCommand) Or you could respond to events from stdin, or from another connected protocol, or a GUI, or .... Hope this helps, Jp
![](https://secure.gravatar.com/avatar/a1de9c949cc19cce09e2ca5e724069de.jpg?s=120&d=mm&r=g)
Hi, from the documentation I could only find examples where the events are related more to Deferred, i.e., a method call that may take sometime. I'm a little confused about the difference between this and the "traditional" kind of event handling. Are they the same with just different terminology? By "traditional", I mean that in part of the code, I generate an event, say a "step 1 finished event", somewhere else there is a handler waiting for this event. The reactor is responsible for dispatching this event to that handler, which hopefull starts step 2. Or even more, like publish-subscribe: multiple handers can register for the same event. In other words, these events are generated internally, not necessarily by such delays like network, file access, or user input. Is this doable in Twisted? Thanks. Z.
![](https://secure.gravatar.com/avatar/b932b1e5a3e8299878e579f51f49b84a.jpg?s=120&d=mm&r=g)
On May 25, 2005, at 11:37 PM, theshz wrote:
Deferreds are just objects that have a list of (result, error) callback pairs. When the result or error is available, it's passed to the first appropriate callback. The result or error returned by that callback is sent to the next, etc. Its job is to pass that *single* result on to everything in its callback chain either one or zero times, and then it should be garbage collected because its job is done. The way this would works is like this (bare bones example without any error handling, etc.): ### from twisted.internet import defer, reactor def step1(): d = defer.Deferred() # In two seconds, call the callback with the result 42 reactor.callLater(2.0, d.callback, 7) return d def step2(resultOfStep1): d = defer.Deferred() # In two seconds, call the callback with the result of the # argument times 6 reactor.callLater(2.0, d.callback, resultOfStep1 * 6) return d def doAllSteps(): # step1 returns a deferred d = step1() # step 2 takes the result of step1 as an argument # so we can use it as the callback for the deferred, # since the callback always receives the result as the # first argument d = d.addCallback(step2) # the other thing to note is that step2 returns a deferred, which # will automatically be chained, so we can just return it here # as our deferred return d def main(): def printResultAndQuit(result): print "the answer is:", result reactor.stop() d = doAllSteps() d.addCallback(printResultAndQuit) # this should think for about 4 seconds, # print the answer to everything, and then # return. reactor.run() if __name__ == '__main__': main() ### It is an excellent primitive for building a notification system on top of, but it isn't one. As you can see in Twisted's source, there is rarely a need for an event dispatching system, so one doesn't really exist (there is one on the reactor for startup and shutdown events, but that's about it). In almost all cases the "problem" is solved by: (a) having some particular name for a method to be implemented in a subclass (e.g. subclass LineReceiver and implement lineReceived) (b) using a delegate that implements some method with a particular name (e.g. protocols telling their transport to lose connection, or transports notifying their protocol of a lost connection) (c) using deferreds (i.e. twisted.web.server.Request.notifyFinish) The only one that looks like traditional publish-subscribe is really the reactor's system events (twisted.internet.interfaces.IReactorCore) -bob
![](https://secure.gravatar.com/avatar/1dc353f57c9d41b76d9888386da76806.jpg?s=120&d=mm&r=g)
On 5/21/05, Jp Calderone <exarkun@divmod.com> wrote:
okey , I think it's hard to parse command line in twisted reactor event loopst . In my programs , I always 'import cmd' and in between command , use reacotr.run() to send a message ,and waiting for a replay when the replay comes back, the protocol() will terminate the connection . and back into cmd loop. Hope this helps, ZP
![](https://secure.gravatar.com/avatar/45ca0fac4d15daedf40d77c7bee58f5e.jpg?s=120&d=mm&r=g)
theshz <theshz <at> gmail.com> writes:
Why do you want to simulate this? You might be interested in the source code of imagination. There is some test/demo code in the repository you might find interesting. I'm working on a project that borrows heavily from this code but has its own parser grammer writtn with pyparsing.
![](https://secure.gravatar.com/avatar/b5d77f55ec76afdbb1750fd696040536.jpg?s=120&d=mm&r=g)
So this is a subscription model, with a single point aggregator right ? The *proxy* aggregates the messages from the producers, while the consumers subscribe with the proxy regarding the message(s) they are interested in and the messages are pushed to them by the proxy. On 5/21/05, Brett Viren <bv@bnl.gov> wrote:
participants (8)
-
Bob Ippolito
-
Brett Viren
-
Duncan McGreggor
-
Gangadhar NPK
-
Glyph Lefkowitz
-
Jp Calderone
-
july july
-
theshz