`multirun.py'

François Pinard pinard at iro.umontreal.ca
Fri Aug 10 20:08:42 EDT 2001


Hello, my friends.  I keep rewriting code to execute things in parallel,
without needing the full capabilities of the `threading' module, so I just
packaged the following little module, named `multirun.py'.  Criticize! :-)

-------------- next part --------------
# Handle simple case of uniform multi-processor function.
# Copyright ? 2001 Progiciels Bourbeau-Pinard inc.
# Fran?ois Pinard <pinard at iro.umontreal.ca>, 2001.

"""
This module offers a very simple API for the common case of threaded
execution of relatively independent requests, using many agent processes
all reading from a single common input queue.  The overall usage scheme is:

    import multirun
    multi = multirun.Multi(AGENTS, SIZE)
    ...
    multi.do(FUNCTION, ARG1, ARG2, ...)
    ...
    multi.wrapup()

AGENTS is the number of threads for serving requests, while SIZE is the
maximum size of the wait request queue.  There are usually many calls to
`multi.do', meant to be executed in parallel, more or less.  If all agents
are already busy at the time of a `multi.do' request, the request is put into
the queue of waiting requests.  If the queue already holds SIZE requests,
`multi.do' blocks until there is some room in the queue.  Any freed agent
takes a request from the queue and calls:

    FUNCTION(ARG1, ARG2, ...)

FUNCTION is typically I/O bound or network bound, or a sleeping function.
If AGENTS was zero, `multi.do' executes FUNCTION without any delay.

Finally, `multi.wrapup()' waits for all agents to be freed, and destroy them.
In fact, it is fully equivalent to `multi.set_agents(0)'.  By using:

    multi.set_agents(AGENTS)

one can dynamically change the number of processing agents, which are
created or killed as required, but only once they get free, of course.
It can be used after a `multi.wrapup()' to revive the service.

A few other methods are provided for convenience.  Each separate `Multi'
instance has its own single, all purpose user lock.  Sections of code,
within various FUNCTIONs, which need sections of atomic execution, may use:

    multi.lock()
    ... CRITICAL CODE SECTION...
    multi.unlock()

Of course, improper use of `multi.lock()' could trigger bad deadlocks.
To get `multi' defined within the scope of the called function requires
a bit of care.  For example, `multi' could be made global when created,
of passed as an extra argument to functions from the `multi.do' calls.

Finally, `multi.activity()' returns a number representing the current
activity of the current `Multi' instance: it is the number of agents, either
working or waiting, added to the number of requests waiting in the queue.
"""

import Queue, threading

class Multi:

    def __init__(self, agents, length):
        self.agents = 0
        # All purpose, single lock, for user convenience.
        self.user_lock = threading.Lock()
        # To wait until some agent processor terminates.
        self.killer = threading.Semaphore(0)
        self.queue = Queue.Queue(length)
        self.set_agents(agents)

    def lock(self):
        self.user_lock.acquire()

    def unlock(self):
        self.user_lock.release()

    def activity(self):
        return self.queue.qsize() + self.agents

    def do(self, function, *arguments):
        if self.agents == 0:
            reply = apply(function, arguments)
            if reply is not None:
                function, arguments = reply
                apply(function, arguments)
        else:
            self.queue.put((function, arguments))

    def wrapup(self):
        self.set_agents(0)

    def set_agents(self, agents):
        while self.agents < agents:
            Agent(self.killer, self.queue)
            self.agents = self.agents + 1
        while self.agents > agents:
            self.queue.put(None)
            self.killer.acquire()
            self.agents = self.agents - 1

class Agent(threading.Thread):

    def __init__(self, killer, queue):
        threading.Thread.__init__(self)
        self.killer = killer
        self.queue = queue
        self.start()

    def run(self):
        while 1:
            request = self.queue.get()
            if request is None:
                self.killer.release()
                return
            function, arguments = request
            apply(function, arguments)

def test():

    def write(counter, multi):
        import sys
        multi.lock()
        sys.stdout.write('%.4d\n' % counter)
        multi.unlock()

    multi = Multi(50, 200)
    for counter in range(2000):
        multi.do(write, counter, multi)
    multi.wrapup()

if __name__ == '__main__':
    test()
-------------- next part --------------

-- 
Fran?ois Pinard   http://www.iro.umontreal.ca/~pinard


More information about the Python-list mailing list