[Twisted-Python] Ideas on limiting/throttling spawnProcess

Hi, I have an application that ingests data and does a reactor.spawnProcess() for each chunk of data (a product) to pass it as STDIN to a compiled binary and then I harvest the STDOUT. It has been working well, but I have an issue when my data rates get too high and the machine gets overloaded with spawned processes (I think) and starts running out of file descriptors (yes I can raise it :) causing all sorts of pain. I'm wondering about a mechanism to throttle the number of spawned processes going at one time? It'd be nice to only have 10 of these spawned processes going at any one time. Thanks for your ideas :) daryl code snipet: class SHEFIT(protocol.ProcessProtocol): def __init__(self, tp): self.tp = tp self.data = "" def connectionMade(self): self.transport.write( self.tp.raw ) self.transport.closeStdin() def outReceived(self, data): self.data = self.data + data def errReceived(self, data): print "errReceived! with %d bytes!" % len(data) print data def outConnectionLost(self): really_process(self.tp, self.data) def got_product(): shef = SHEFIT( tp ) reactor.spawnProcess(shef, "shefit", ["shefit"], {}) def really_process(tp,data): print 'Do some work'

I think twisted.internet.defer.DeferredSemaphore is designed for this. John Paul Calderone answers a similar question in detail: http://stackoverflow.com/questions/2861858/queue-remote-calls-to-a-python-tw... Donal McMullan On 29/10/2011, at 11:21 AM, Daryl Herzmann wrote:
Hi,
I have an application that ingests data and does a reactor.spawnProcess() for each chunk of data (a product) to pass it as STDIN to a compiled binary and then I harvest the STDOUT. It has been working well, but I have an issue when my data rates get too high and the machine gets overloaded with spawned processes (I think) and starts running out of file descriptors (yes I can raise it :) causing all sorts of pain. I'm wondering about a mechanism to throttle the number of spawned processes going at one time? It'd be nice to only have 10 of these spawned processes going at any one time. Thanks for your ideas :)
daryl
code snipet:
class SHEFIT(protocol.ProcessProtocol): def __init__(self, tp): self.tp = tp self.data = ""
def connectionMade(self): self.transport.write( self.tp.raw ) self.transport.closeStdin()
def outReceived(self, data): self.data = self.data + data
def errReceived(self, data): print "errReceived! with %d bytes!" % len(data) print data
def outConnectionLost(self): really_process(self.tp, self.data)
def got_product(): shef = SHEFIT( tp ) reactor.spawnProcess(shef, "shefit", ["shefit"], {})
def really_process(tp,data): print 'Do some work'
_______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

For a similar use case, we're using a combination of cooperator with defertoprocess. I can explain more if anyone's interested. Le 29 oct. 2011 00:41, "Donal McMullan" <donal.mcmullan@gmail.com> a écrit :
I think twisted.internet.defer.DeferredSemaphore is designed for this. John Paul Calderone answers a similar question in detail:
http://stackoverflow.com/questions/2861858/queue-remote-calls-to-a-python-tw...
Donal McMullan
On 29/10/2011, at 11:21 AM, Daryl Herzmann wrote:
Hi,
I have an application that ingests data and does a reactor.spawnProcess() for each chunk of data (a product) to pass it as STDIN to a compiled binary and then I harvest the STDOUT. It has been working well, but I have an issue when my data rates get too high and the machine gets overloaded with spawned processes (I think) and starts running out of file descriptors (yes I can raise it :) causing all sorts of pain. I'm wondering about a mechanism to throttle the number of spawned processes going at one time? It'd be nice to only have 10 of these spawned processes going at any one time. Thanks for your ideas :)
daryl
code snipet:
class SHEFIT(protocol.ProcessProtocol): def __init__(self, tp): self.tp = tp self.data = ""
def connectionMade(self): self.transport.write( self.tp.raw ) self.transport.closeStdin()
def outReceived(self, data): self.data = self.data + data
def errReceived(self, data): print "errReceived! with %d bytes!" % len(data) print data
def outConnectionLost(self): really_process(self.tp, self.data)
def got_product(): shef = SHEFIT( tp ) reactor.spawnProcess(shef, "shefit", ["shefit"], {})
def really_process(tp,data): print 'Do some work'
_______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
_______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

On Sat, Oct 29, 2011 at 2:10 AM, Nicolas Toper <ntoper@gmail.com> wrote:
For a similar use case, we're using a combination of cooperator with defertoprocess.
I can explain more if anyone's interested.
I would certainly be interested :) I'm having a difficult time understanding how the pieces fit together. Thank you! daryl

def parallel(iterable, count, callable, *args, **named): #Copy/paste from http://jcalderone.livejournal.com/24285.html # http://oubiwann.blogspot.com/2008/06/async-batching-with-twisted-walkthrough... coop = task.Cooperator() work = (callable(elem, *args, **named) for elem in iterable) return defer.DeferredList([coop.coiterate(work) for i in xrange(count)]) def _localSend(messages): return deferToProcessPool(reactor, _p, makeEmails, messages, logger) _p = Pool(5) def _localDeliver(messages): #... Some part left out d = parallel(messages,2, _localSend) d.addErrback(_log_error) return d _localDeliver call parallel: it will launch at most 2 deferred executing localSend(messages). These are actually deferToProcess This is a very very powerful compbination and exarkun really nailed it in his blog :) On Mon, Oct 31, 2011 at 2:22 PM, Daryl Herzmann <akrherz@iastate.edu> wrote:
On Sat, Oct 29, 2011 at 2:10 AM, Nicolas Toper <ntoper@gmail.com> wrote:
For a similar use case, we're using a combination of cooperator with defertoprocess.
I can explain more if anyone's interested.
I would certainly be interested :) I'm having a difficult time understanding how the pieces fit together. Thank you!
daryl
_______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

On 28 Oct 2011, at 15:21, Daryl Herzmann wrote:
Hi,
I have an application that ingests data and does a reactor.spawnProcess() for each chunk of data (a product) to pass it as STDIN to a compiled binary and then I harvest the STDOUT. It has been working well, but I have an issue when my data rates get too high and the machine gets overloaded with spawned processes (I think) and starts running out of file descriptors (yes I can raise it :) causing all sorts of pain. I'm wondering about a mechanism to throttle the number of spawned processes going at one time? It'd be nice to only have 10 of these spawned processes going at any one time. Thanks for your ideas :)
Another alternative is to use turtl which was written for a similar purpose. http://pypi.python.org/pypi/turtl https://bitbucket.org/adroll/turtl/ https://launchpad.net/turtl http://twistedmatrix.com/pipermail/twisted-python/2011-August/024415.html -- Valentino Volonghi http://www.adroll.com
participants (5)
-
Daryl Herzmann
-
Daryl Herzmann
-
Donal McMullan
-
Nicolas Toper
-
Valentino Volonghi