[Tutor] Threading synchronization (was: Unit testing
Terry Carroll
carroll at tjc.com
Thu Jun 29 16:45:42 CEST 2006
On Thu, 29 Jun 2006, Tino Dai wrote:
> Between the producer and consumer threads, does the consumer end of the
> queue sit there and wait for something to come down the queue...
Yes. The following call:
workunit = self.inQ.get(True)
means, try to take something off the queue, and if the queue is empty,
wait until something is put onto the queue by another thread. That forced
waiting ("blocking") is the behavior that I think you want.
Another ("non-blocking") form (that I didn't use) could have been:
workunit = self.inQ.get()
But this one does *not* wait if the queue is empty. It either returns
immediately, if there's something in the queue to get; or it raises the
Empty exception if there isn't.
If you use the non-blocking form, I think you're back to requiring your
semaphores. The blocking form is particularly convenient for the type of
application you're talking about, I think. (I've never actually had
occasion to use the non-blocking form, but then, my needs are usually
pretty simple).
There's also a temporarily blocking form, e.g.:
workunit = self.inQ.get(True, 10)
This tries to get something from the queue, and either returns something,
or, if nothing's on the queue, waits for up to 10 (in this example)
seconds. If something shows up on that time, it wakes up and returns the
element off the queue, or raises Empty if the queue is still empty.
For our purposes, this has the same problems as the non-blocking form, and
I'd avoid it.
By the way, do not try to use a combination of the non-blocking form and
Queue.empty(), to determine if something's there to get, and get only if
it's non-empty. Queue.empty() is not reliable for synchronization
purposes.
I would consider using Queue.empty() only if, for example, I had some sort
of monitoring thread that was periodically checking if there was anything
in the queue, solely for the purpose of providing tuning data. (i.e., 90%
of the time, a queue is non-empty, so maybe I should consider having more
consumer threads on that queue) Even there, I'd probably use qsize()
instead (which is also unreliable for synch purposes).
You're best of just pretending that Queue.empty(), Queue.full() and
Queue.qsize() don't exist, for synchronization purposes.
> or is the consumer wake up after a randompause()?
The randompause() is *only* to insert random pauses into the execution, so
the threads have a little bit of a chance of executing out of order, in
essence simulating the threads' work taking some amount of unpredictable
time. In practice, it doesn't usually have any effect. You wouldn't use
the randompause() in your code, that's just for simulation purposes.
> Right now, I have the semaphores as gatekeepers to each one of the
> threads. And until something is in the queue, the thread's semaphore
> will wait for the semphore to be released by the previous thread.
Right. That's an unnecessary complication, requiring that you manage the
inter-thread communication. If you instead use blocking queue reads, you
won't have to do that. The blocking will make sure that each thread wakes
up and does work only when there's work for it.
By the way, I'm not a professional programmer (I'm a lawyer, who programs
pretty much as a hobby), and it shows in this code. My first cut had some
errors because I really don't get namespaces very well. I adjusted it to
get it to work, but not very well; very kludgily. Here's a more
appropriate version (not that it's a paragon of good code now, but it's
not as, um, eccentric, as last night's).
I've also changed it to get rid of the id variable being carried in the
thread instance. Instead, I use setName to name a thread, and getName to
access that name. This is better, because then a meaningful thread name
will show up in any exception messages you get (e.g., "Exception in thread
first:")
import time, Queue, threading, random
class processor(threading.Thread):
def __init__(self, id, inQ=None, outQ=None, inmessage=None):
"""
id is thread Id: "first" for initial producer,
"last" for final consumer
inQ is input Q, or None for first producer
outQ is outputQ, or None for final consumer
"""
self.inQ = inQ
self.outQ = outQ
if id == "first":
self.l = list(inmessage)
if id == "last":
self.message=""
threading.Thread.__init__(self)
self.setName(id)
def run(self):
threadname = self.getName()
if threadname == "first":
self.producer()
elif threadname == "last":
self.consumer()
else:
self.hybrid()
def producer(self):
while True:
self.randompause()
try:
workunit = self.l.pop(0)
except IndexError:
self.outQ.put("stop")
self.endmessage()
return
self.statusmessage(workunit)
self.outQ.put(workunit)
def consumer(self):
while True:
self.randompause()
workunit = self.inQ.get(True)
if workunit == "stop":
print "final message:", self.message
self.endmessage()
return
else:
self.statusmessage(workunit)
self.message = self.message+workunit
def hybrid(self):
while True:
self.randompause()
workunit = self.inQ.get(True)
if workunit == "stop":
self.outQ.put(workunit)
self.endmessage()
return
else:
self.statusmessage(workunit)
self.outQ.put(workunit)
def randompause(self):
time.sleep(random.randint(1,5))
def statusmessage(self, workunit):
print "%s thread %s, workunit %s" %(time.asctime(),
self.getName(), workunit)
def endmessage(self):
print "%s thread %s ending" %(time.asctime(), self.getName())
if __name__ == "__main__":
q_ab = Queue.Queue()
pa = processor(id="first", inmessage="spam", outQ=q_ab)
q_bc = Queue.Queue()
pb = processor(id="second", inQ=q_ab, outQ=q_bc)
q_cd = Queue.Queue()
pc = processor(id="third", inQ=q_bc, outQ=q_cd)
q_de = Queue.Queue()
pd = processor(id="fourth", inQ=q_cd, outQ=q_de)
pe = processor(id="last", inQ=q_de)
pa.start()
pb.start()
pc.start()
pd.start()
pe.start()
More information about the Tutor
mailing list