[Patches] [ python-Patches-1455676 ] Simplify using Queues with consumer threads
SourceForge.net
noreply at sourceforge.net
Fri Mar 24 21:44:07 CET 2006
Patches item #1455676, was opened at 2006-03-21 16:36
Message generated for change (Comment added) made by rhettinger
You can respond by visiting:
https://sourceforge.net/tracker/?func=detail&atid=305470&aid=1455676&group_id=5470
Please note that this message will contain a full copy of the comment thread,
including the initial issue submission, for this request,
not just the latest update.
>Category: None
>Group: Python 2.5
>Status: Closed
Resolution: Accepted
Priority: 5
Submitted By: Raymond Hettinger (rhettinger)
Assigned to: Raymond Hettinger (rhettinger)
Summary: Simplify using Queues with consumer threads
Initial Comment:
When Queues are used to communicate between producer
and consumer threads, there is often a need to
determine when all of the enqueued tasks have been
completed.
With this small patch, determining when all work is
done is as simple as adding q.task_done() to each
consumer thread and q.join() to the main thread.
Without the patch, the next best approach is to count
the number of puts, create a second queue filled by
the consumer when a task is done, and for the main
thread to call successive blocking gets on the result
queue until all of the puts have been accounted for:
def worker():
while 1:
task = tasks_in.get()
do_work(task)
tasks_out.put(None)
tasks_in = Queue()
tasks_out = Queue()
for i in range(num_worker_threads):
Thread(target=worker).start()
n = 0
for elem in source():
n += 1
tasks_in.put(elem)
# block until tasks are done
for i in range(n):
tasks_out.get()
That approach is not complicated but it does entail
more lines of code and tracking some auxiliary data.
This becomes cumersome and error-prone when an app
has multiple occurences of q.put() and q.get().
The patch essentially encapsulates this approach into
two methods, making it effortless to use and easy to
graft on to existing uses of Queue. So, the above
code simplies to:
def worker():
while 1:
task = q.get()
do_work(task)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
Thread(target=worker).start()
for elem in source():
q.put(elem)
# block until tasks are done
q.join()
The put counting is automatic, there is no need for a
separate queue object, the code readably expresses
its intent with clarity. Also, it is easy to inpect
for accuracy, each get() followed by a task_done().
The ease of inspection remains even when there are
multiple gets and puts scattered through the code (a
situtation which would become complicated for the two
Queue approach).
If accepted, will add docs with an example.
Besides being a fast, lean, elegant solution, the
other reason to accept the patch is that the
underlying problem appears again and again, requiring
some measure to invention to solve it each time.
There are a number of approaches but none as simple,
fast, or as broadly applicable as having the queue
itself track items loaded and items completed.
----------------------------------------------------------------------
>Comment By: Raymond Hettinger (rhettinger)
Date: 2006-03-24 15:44
Message:
Logged In: YES
user_id=80475
Committed as revision 43298.
----------------------------------------------------------------------
Comment By: Tim Peters (tim_one)
Date: 2006-03-23 19:17
Message:
Logged In: YES
user_id=31435
I marked this as Accepted, but there are some things I'd
like to see changed:
- A Condition is best named after the predicate it
represents. So, e.g., instead of the generic "waiter", a
better name would be "all_tasks_done". When you eventually
.notify() the Condition, you're notifing its wait()er that
"all tasks (may be) done", and "all tasks (may be) done" is
what the wait()er is waiting _for_. "all_tasks_done.wait()"
makes that much clearer than "waiter.wait()".
- A Condition.wait() can be interrupted by (at least)
KeyboardInterrupt, so the acquire/release around a
Condition.wait() call should always be in a try/finally (so
that the Condition is release()d no matter what). All other
Condition.wait()s in Queue do protect themselves this way.
I don't see a need for try/finally around other uses, except
possibly that:
- Given the intended semantics, it would be good to raise an
exception if .unfinished_tasks becomes negative; i.e., make
it a detected programmer error if task_done() is called "too
often" (although again the Condition needs to be release()d
no matter what, and a try/finally may be expedient toward
that end).
- Since any number of threads _may_ be waiting in
Queue.join(), yes, .notifyAll() is better. The other
conditions in Queue don't do that because there's a key
difference: at most one thread waiting on not_full or
not_empty can make progress when one of those is "signaled",
so it would be wasteful to wake up more than one thread
waiting on those. In contrast, all threads waiting on
.waiter can make progress when all tasks are in fact done.
You can do that with a notifyAll() in task_done(), or by
adding a notify() near the end of join() (then all threads
waiting on this condition will get notified in domino
fashion). The notifyAll() way is "purer".
- It's inevitable that someone will ask Queue.join() to grow
an optional timeout argument. OK by me if that waits ;-).
----------------------------------------------------------------------
Comment By: Raymond Hettinger (rhettinger)
Date: 2006-03-22 01:02
Message:
Logged In: YES
user_id=80475
Thanks. There are two particular areas for extra
attention.
First, should the waiter acquire/release pairs be in a
try/finally (iow, is there some behavior in notify() or
release() that potentially needs to be trapped)?
Second, should the notify() in task_done() really be a
notifyAll() (iow, does it make sense that multiple joins
may be pending)?
Thanks again.
----------------------------------------------------------------------
Comment By: Tim Peters (tim_one)
Date: 2006-03-21 20:42
Message:
Logged In: YES
user_id=31435
Yup, I'll try to make time tomorrow (can't today).
_Offhand_ it sounds like a nice addition to me.
----------------------------------------------------------------------
Comment By: Raymond Hettinger (rhettinger)
Date: 2006-03-21 17:27
Message:
Logged In: YES
user_id=80475
Tim, do you have a chance to look at this?
----------------------------------------------------------------------
You can respond by visiting:
https://sourceforge.net/tracker/?func=detail&atid=305470&aid=1455676&group_id=5470
More information about the Patches
mailing list