[Tutor] Threading synchronization (was: Unit testing

Terry Carroll carroll at tjc.com
Thu Jun 29 16:45:42 CEST 2006


On Thu, 29 Jun 2006, Tino Dai wrote:

> Between the producer and consumer threads, does the consumer end of the
> queue sit there and wait for something to come down the queue...

Yes.  The following call:

   workunit = self.inQ.get(True)

means, try to take something off the queue, and if the queue is empty,
wait until something is put onto the queue by another thread.  That forced
waiting ("blocking") is the behavior that I think you want.

Another ("non-blocking") form (that I didn't use) could have been:

   workunit = self.inQ.get()

But this one does *not* wait if the queue is empty.  It either returns 
immediately, if there's something in the queue to get; or it raises the 
Empty exception if there isn't. 

If you use the non-blocking form, I think you're back to requiring your 
semaphores.  The blocking form is particularly convenient for the type of 
application you're talking about, I think.  (I've never actually had 
occasion to use the non-blocking form, but then, my needs are usually 
pretty simple).

There's also a temporarily blocking form, e.g.:

  workunit = self.inQ.get(True, 10)

This tries to get something from the queue, and either returns something, 
or, if nothing's on the queue, waits for up to 10 (in this example) 
seconds.  If something shows up on that time, it wakes up and returns the 
element off the queue, or raises Empty if the queue is still empty.

For our purposes, this has the same problems as the non-blocking form, and 
I'd avoid it. 

By the way, do not try to use a combination of the non-blocking form and
Queue.empty(), to determine if something's there to get, and get only if
it's non-empty.  Queue.empty() is not reliable for synchronization
purposes.  

I would consider using Queue.empty() only if, for example, I had some sort
of monitoring thread that was periodically checking if there was anything
in the queue, solely for the purpose of providing tuning data.  (i.e., 90%
of the time, a queue is non-empty, so maybe I should consider having more
consumer threads on that queue)  Even there, I'd probably use qsize()  
instead (which is also unreliable for synch purposes).

You're best of just pretending that Queue.empty(), Queue.full() and 
Queue.qsize() don't exist, for synchronization purposes.

>  or is the consumer wake up after a randompause()?

The randompause() is *only* to insert random pauses into the execution, so 
the threads have a little bit of a chance of executing out of order, in 
essence simulating the threads' work taking some amount of unpredictable 
time.  In practice, it doesn't usually have any effect.  You wouldn't use 
the randompause() in your code, that's just for simulation purposes.

> Right now, I have the semaphores as gatekeepers to each one of the
> threads. And until something is in the queue, the thread's semaphore
> will wait for the semphore to be released by the previous thread.

Right.  That's an unnecessary complication, requiring that you manage the
inter-thread communication.  If you instead use blocking queue reads, you
won't have to do that.  The blocking will make sure that each thread wakes 
up and does work only when there's work for it.

By the way, I'm not a professional programmer (I'm a lawyer, who programs
pretty much as a hobby), and it shows in this code.  My first cut had some
errors because I really don't get namespaces very well.  I adjusted it to 
get it to work, but not very well; very kludgily.  Here's a more 
appropriate version (not that it's a paragon of good code now, but it's 
not as, um, eccentric, as last night's).

I've also changed it to get rid of the id variable being carried in the 
thread instance.  Instead, I use setName to name a thread, and getName to 
access that name.  This is better, because then a meaningful thread name 
will show up in any exception messages you get (e.g., "Exception in thread 
first:")

import time, Queue, threading, random

class processor(threading.Thread):
    def __init__(self, id, inQ=None, outQ=None, inmessage=None):
        """
        id is thread Id: "first" for initial producer,
          "last" for final consumer
        inQ is input Q, or None for first producer
        outQ is outputQ, or None for final consumer
        """
        self.inQ = inQ
        self.outQ = outQ
        if id == "first":
            self.l = list(inmessage)
        if id == "last":
            self.message=""
        threading.Thread.__init__(self)
        self.setName(id)


    def run(self):
        threadname = self.getName()
        if threadname == "first":
            self.producer()
        elif threadname == "last":
            self.consumer()
        else:
            self.hybrid()

    def producer(self):
        while True:
            self.randompause()
            try:
                workunit = self.l.pop(0)
            except IndexError:
                self.outQ.put("stop")
                self.endmessage()
                return
            self.statusmessage(workunit)
            self.outQ.put(workunit)

    def consumer(self):
        while True:
            self.randompause()
            workunit = self.inQ.get(True)
            if workunit == "stop":
                print "final message:", self.message
                self.endmessage()
                return
            else:
                self.statusmessage(workunit)
                self.message = self.message+workunit

    def hybrid(self):
        while True:
            self.randompause()
            workunit = self.inQ.get(True)
            if workunit == "stop":
                self.outQ.put(workunit)
                self.endmessage()
                return
            else:
                self.statusmessage(workunit)
                self.outQ.put(workunit)

    def randompause(self):
        time.sleep(random.randint(1,5))

    def statusmessage(self, workunit):
        print "%s thread %s, workunit %s" %(time.asctime(), 
self.getName(), workunit)

    def endmessage(self):
        print "%s thread %s ending" %(time.asctime(), self.getName())


if __name__ == "__main__":
    q_ab = Queue.Queue()
    pa = processor(id="first", inmessage="spam", outQ=q_ab)
    q_bc = Queue.Queue()
    pb = processor(id="second", inQ=q_ab, outQ=q_bc)
    q_cd = Queue.Queue()
    pc = processor(id="third", inQ=q_bc, outQ=q_cd)
    q_de = Queue.Queue()
    pd = processor(id="fourth", inQ=q_cd, outQ=q_de)
    pe = processor(id="last", inQ=q_de)
    pa.start()
    pb.start()
    pc.start()
    pd.start()
    pe.start()
    
    






More information about the Tutor mailing list