On Mon, 12 Mar 2007 15:44:41 -0400, "Rutt, Benjamin" <benjamin.rutt@gs.com> wrote:
Hi.
Anyone have any pointers as to how I can get some of my questions answered below? I had hoped to get some response. Did I not use the proper etiquitte? Or there is some expert on the IPushProducer mechanism or the author of page
Sorry, your question was big and challenging to approach.
[snip]
When running the following code (my 2nd twisted program!), it works as I had hoped - it doesn't starve any clients that want to receive data back, even with a simultaneously active really long streaming server-to-client communication (i.e. one piggy client asking for millions of bytes). i.e. another client can get in and ask for just a few bytes while a large payload is being delivered to a different client. Which is great!
Here's a sample interaction from the client side:
$ telnet localhost 8007 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. 1 x 2 xx 3 xxx 10 xxxxxxxxxx 99999
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[...lots of x's...] xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx bye Connection closed by foreign host. $
So I have 2 questions on my code:
1) am I doing anything wrong in setting up the plumbing? 2) does pauseProducing() get called by another thread whilst resumeProducing() is running? (I believe it must, otherwise my resumeProducing() would only be entered once). If so I should have an appropriate mutex around the read/write of self.pause, no?
Here is the code, and output from the server is at the end. Thanks -- Benjamin
#!/usr/bin/env python import os, os.path, sys, re, commands, pickle, tempfile, getopt, datetime import socket, string, random, time, traceback, shutil, popen2
from zope.interface import implements from twisted.internet import protocol, defer, interfaces, error, reactor from twisted.internet.protocol import Protocol, Factory from twisted.protocols.basic import LineReceiver
class NonStarvingXGiver: implements(interfaces.IPushProducer) def __init__(self, howmany, consumer): self.howmany = howmany self.sent_already = 0 self.paused = False self.consumer = consumer def beginSendingXs(self): self.deferred = deferred = defer.Deferred() self.consumer.registerProducer(self, False) return deferred def pauseProducing(self): print 'pauseProducing: invoked' self.paused = True def resumeProducing(self): print 'resumeProducing: invoked' self.paused = False maxchunksz = 1024
This loop:
while not self.paused and self.howmany > self.sent_already: chunksz = min(maxchunksz, self.howmany - self.sent_already) self.consumer.write('x' * chunksz) self.sent_already += chunksz
is a bit atypical, I think. The reason it is eventually stopping is that your code is being invoked re-entrantly by the consumer as soon as it decides its buffer is full. I'm not sure the loop is /wrong/, but it is a bit surprising. You don't need a mutex here, since it's single threaded, but you do need to be aware that your code can be re-entered within a single thread. Does that answer your questions?
[snip]
Jean-Paul