[Python-ideas] Add closing and iteration to threading.Queue

Vladimir Filipović hemflit at gmail.com
Sun Oct 21 13:58:05 EDT 2018


Hi!

I originally submitted this as a pull request. Raymond Hettinger
suggested it should be given a shakeout in python-ideas first.

https://github.com/python/cpython/pull/10018
https://bugs.python.org/issue35034

------

Briefly:

Add a close() method to Queue, which should simplify many common uses
of the class and reduce the space for some easy-to-make errors.

Also add an __iter__() method which in conjunction with close() would
further simplify some common use patterns.

------

At eye-watering length:

Apologies in advance for the length of this message. This isn't a PEP
in disguise, it's a proposal for a very small, simple and I dare
imagine uncontroversial feature. I'm new to contributing to Python and
after the BPO/github submission I didn't manage to come up with a
better way to present it than this.

The issue

Code using threading.Queue often needs to coordinate a "work is
finished as far as I care" state between the producing and consuming
side. Not "work" in the task_done() sense of completion of processing
of queue items, "work" in the simpler sense of just passing data
through the queue.

For example, a producer can be driving the communication by enqueuing
e.g. names of files that need to be processed, and once it's enqueued
the last filename, it can be useful to inform the consumers that no
further names will be coming, so after they've retrieved what's
in-flight currently, they don't need to bother waiting for any more.
Alternatively, a consumer can be driving the communication, and may
need to let the producers know "I'm not interested in any more, so you
can stop wasting resources on producing and enqueuing them".
Also, a third, coordinating component may need to let both sides know
that "Our collective work here is done. Start wrapping it up y'all,
but don't drop any items that are still in-flight."

In practice it's probably the exception, not the rule, when any piece
of code interacting with a Queue _doesn't_ have to either inform
another component that its interest in transferring the data has
ended, or watch for such information.

In the most common case of producer letting consumers know that it's
done, this is usually implemented (over and over again) with sentinel
objects, which is at best needlessly verbose and at worst error-prone.
A recipe for multiple consumers making sure nobody misses the sentinel
is not complicated, but neither is it obvious the first time one needs
to do it.
When a generic sentinel (None or similar) isn't adequate, some
component needs to create the sentinel object and communicate it to
the others, which complicates code, and especially complicates
interfaces between components that are not being developed together
(e.g. if one of them is part of a library and expects the library-user
code to talk to it through a Queue).

In the less common cases where the producers are the ones being
notified, there isn't even a typical solution - everything needs to be
cooked up from scratch using synchronization primitives.

------

A solution

Adding a close() method to the Queue that simply prohibits all further
put()'s (with other methods acting appropriately when the queue is
closed) would simplify a lot of this in a clean and safe way - for the
most obvious example, multi-consumer code would not have to juggle
sentinel objects.

Adding a further __iter__() method (that would block as necessary, and
stop its iteration once the queue is closed and exhausted) would
especially simplify many unsophisticated consumers.

This is a current fairly ordinary pattern:

# Producer:
while some_condition:
    q.put(generate_item())
q.put(sentinel)

# Consumer:
while True:
    item = q.get()
    if item == sentinel:
        q.put(sentinel)
        break
    process(item)

(This consumer could be simplified a little with an assignment
expression or an iter(q.get, sentinel), but one of those is super new
and the other seems little-known in spite of being nearly old enough
to vote.)

With the close() and __iter__(), this would become:

# Producer:
with closing(q):
    while some_condition:
        q.put(generate_item())

# Consumer:
for item in q:
    process(item)

Apart from it being shorter and less error-prone (e.g. if
generate_item raises), the implied interface for initializing the two
components is also simplified, because there's no sentinel to pass
around.

More complex consumers that couldn't take advantage of the __iter__()
would still benefit from being able to explicitly and readably find
out (via exception or querying) that the queue has been closed and
exhausted, without juggling the sentinel.

I honestly think this small addition would be an unqualified win. And
it would not change anything for code that doesn't want to use it.

------

I've got a sample implementation ready for Queue and its children.
(Link is at the start of this message. It includes documentation
updates too, in case those clarify anything further at this stage.)

If this is going in the right direction, I'm happy to do the same for
SimpleQueue, but I haven't done it yet. I'm still getting my bearings
around the C code base.

------

To immediately answer some of Raymond's initial comments at BPO:

This is completely independent from Queue's existing task-tracking
protocol. One is about controlling transport, and the other about
tracking the completion of processing after transport. I hesitate to
call them "orthogonal", but there is no functional overlap between
them at all.

I keep talking about "common cases" and such weaselly concepts above,
and really it's just conjecture based on my own limited experience and
informal conversations.
BUT, I've also done a survey of the standard library itself. There are
four packages that use threading.Queue: concurrent, idlelib, logging,
multiprocessing. All except idlelib have at least one piece that would
have benefited from a Queue.close() if it had been available when they
were being written.

I've now had a look at a handful of other languages too:
- Java and C# have nothing like this. They basically start from a
deque as a collection, and add synchronization features to it; C++ STL
doesn't even go that far. None of them do the Python thing where a
Queue is a dedicated communication tool that just uses a collection as
part of its implementation.
- Ruby (to my mild surprise) also does nothing like this.
- Clojure does just about the same thing as this proposal, yay.
- Go does approximately the same thing, just more finicky about
practical usage. The producer can say `close(q)` and the consumers can
say `for item := range q { process(item) }` which do exactly the same
thing as the proposed Python equivalents, but it has some harsh
limitations that are not applicable to Python. (Can't re-close a
closed channel, can't query whether a channel is closed outside of
retrieving an item).

------

To anticipate a couple more possible questions:

- What would this proposal do about multiple producers/consumers
needing to jointly decide _when_ to close the queue?

Explicitly nothing.

The queue's state is either closed or not, and it doesn't care who
closed it. It needs to interact correctly with multiple consumers and
multiple producers, but once any one piece of code closes it, the
correct interaction is acting like a closed queue for everybody.

When e.g. multiple producers need to arrange among themselves that the
queue be closed only after all of them are done producing, then it's
not the queue's job to coordinate _that_. They probably need a
Semaphore or a more complex coordinator in charge of the closing. Yes,
this too is a non-trivial-to-get-right situation, but trying to solve
this one inside the Queue class seems like bloat.

- Why not make the Queue a context manager while you're at it? Most
closeable classes do it.

I don't see that as a clear win in this case. Happy to add it if
there's consensus in its favour, of course.

I think `with closing(q):` is much more expressive than `with q:`
while still brief. The Queue is more versatile in use than files,
database cursors and many other resource-type classes, so the meaning
of a `with q:` would not be as immediately suggestive as with them. It
also often needs to be passed around between creation and initial use
(the whole point is that more than one piece of code has access to it)
so the common idiom `with Queue() as q:` would be a slightly less
natural fit than with resource-like classes.

- Should SimpleQueue do the same thing as Queue?

Yes, I would propose so and volunteer to implement it.
Some details would be adapted to SimpleQueue's existing promises
(put() can't fail) but I think this feature is very much in the spirit
of SimpleQueue.

- Should multiprocessing.Queue do the same thing?

I think so, though I'm not proposing it.

It already has a close() method, whose meaning is very similar but not
identical to (a subset of) the proposed threading.Queue.close's
meaning (with resource-management effects not relevant to
threading.Queue either way). I'd hope that this "not identical" part
would not cause any problems in practice (lots of things aren't
identical between those two Queues), but all hoping aside, maybe
people more experienced than me can evaluate if that's really the
case.

I also have no clear idea if the feature would be as valuable, and if
the implementation would be as easy and clean as they are with
threading.Queue.

- Should asyncio.Queue do the same thing?

Probably? I'm too unfamiliar with asyncio for my opinion on this to be
of value. So I'm not proposing it.

------


More information about the Python-ideas mailing list