[Python-3000] Kill GIL?

Valentino Volonghi aka Dialtone dialtone at divmod.com
Mon Sep 18 09:50:26 CEST 2006


On Sun, 17 Sep 2006 18:18:32 -0700, Josiah Carlson <jcarlson at uci.edu> wrote:
>    import autorpc
>    caller = autorpc.init_processes(autorpc.num_processors())
>
>    import callables
>    caller.register_module(callables)
>
>    result = caller.fcn1(arg1, arg2, arg3)
>
>The point is not to compare API/etc., with threading, but to compare it
>with XMLRPC.  Because ultimately, what I would like to see, is a
>mechanic similar to XMLRPC; call a method on an instance, that is
>automatically executed perhaps in some other thread in some other
>process, or maybe even in the same thread on the same process (depending
>on load, etc.), and which returns the result in-place.

I've written something similar taking inspiration from axiom.batch (from divmod.org). And the result is the following code:

import sys, os

from twisted.internet import reactor
from twisted.protocols import amp
from twisted.internet import protocol
from twisted.python import log
from epsilon import process

# These are the Commands, they are needed to call remote methods safely.
class Sum(amp.Command):
    arguments = [('a', amp.Integer()),
                 ('b', amp.Integer())]
    response = [('total', amp.Integer())]

class StopReactor(amp.Command):
    arguments = [('delay', amp.Integer())]
    response = [('status', amp.String())]

# This is the class that tells the RPC exposed methods and their
# implementation
class JustSum(amp.AMP):
    def sum(self, a, b):
        total = a + b
        log.msg('Did a sum: %d + %d = %d' % (a, b, total))
        return {'total': total}
    Sum.responder(sum)

    def stop(self, delay):
        reactor.callLater(delay, reactor.stop)
        return {'status': 'scheduled'}
    StopReactor.responder(stop)

# Various stuff needed to use AMP over stdin/stdout/stderr with a child 
# process
class AMPConnector(protocol.ProcessProtocol):
    def __init__(self, proto, controller):
        self.amp = proto
        self.controller = controller

    def connectionMade(self):
        log.msg("Subprocess started.")
        self.amp.makeConnection(self)
        self.controller.childProcessCreated()

    # Transport
    disconnecting = False

    def write(self, data):
        self.transport.write(data)

    def writeSequence(self, data):
        self.transport.writeSequence(data)

    def loseConnection(self):
        self.transport.loseConnection()

    def getPeer(self):
        return ('omfg what are you talking about',)

    def getHost(self):
        return ('seriously it is a process this makes no sense',)

    def inConnectionLost(self):
        log.msg("Standard in closed")
        protocol.ProcessProtocol.inConnectionLost(self)

    def outConnectionLost(self):
        log.msg("Standard out closed")
        protocol.ProcessProtocol.outConnectionLost(self)

    def errConnectionLost(self):
        log.msg("Standard err closed")
        protocol.ProcessProtocol.errConnectionLost(self)

    def outReceived(self, data):
        self.amp.dataReceived(data)

    def errReceived(self, data):
        log.msg("Received stderr from subprocess: " + repr(data))

    def processEnded(self, status):
        log.msg("Process ended")
        self.amp.connectionLost(status)
        self.controller.childProcessTerminated(status)

# Here you write the code that uses the commands above.
class ProcessController(object):
    def childProcessCreated(self):
        def _cb(result):
            print result
            d = self.child.callRemote(StopReactor, delay=0)
            d.addErrback(lambda _: reactor.stop())
        def _eb(error):
            print error
        d = self.child.callRemote(Sum, a=4, b=5)
        d.addCallback(_cb)
        d.addErrback(_eb)
        
    def childProcessTerminated(self, status):
        print status

    def startProcess(self):
        executable = '/Library/Frameworks/Python.framework/Versions/2.4/bin/python'
        env = os.environ
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
        self.child = JustSum()
        self.connector = AMPConnector(self.child, self)
        
        args = (
            executable,
            '/usr/bin/twistd',
            '--logfile=/Users/dialtone/Projects/python/twist/sub.log',
            '--pidfile=/Users/dialtone/Projects/python/twist/sub.pid',
            '-noy',
            '/Users/dialtone/Projects/python/twist/sub.tac')
        self.process = process.spawnProcess(self.connector, executable, args, env=env)

if __name__ == '__main__':
    p = ProcessController()
    reactor.callWhenRunning(p.startProcess)
    reactor.run()

If you exlude 'boilerplate' code you end up with a ProcessController class
and with the other 3 classes about RPC. Since the father process is using
self.child = JustSum() it will also expose the same API to the child which
will be able to call any of the methods.
The style above may not be immediately obvious to people not used to Twisted
but other than that it's not really hard to abstract a bit more to provide
an API similar to what you described.

HTH


More information about the Python-3000 mailing list