[Python-ideas] Add closing and iteration to threading.Queue

Nathaniel Smith njs at pobox.com
Sun Oct 21 16:34:12 EDT 2018

Hi Vladimir,

It's great to see people revisiting these old stdlib tools. Closure
tracking is definitely a big point of awkwardness for Queues. In Trio we
started with a straight copy of threading.Queue, and this turned out to be
a major friction point for users. We just deprecated our version of Queue
and replaced it with a new design. Our new thing is probably more radical
than you want to get in the stdlib (we ended up splitting the object into
two pieces, a sender object and a receiver object), but you might find the
discussions interesting:


A more minimal proposal to add closure tracking to trio.Queue:

Follow-up issue with design questions we're still thinking about (also
links to earlier design discussions):

We only started shipping this last week, so we're still getting experience
with it.


On Sun, Oct 21, 2018, 10:59 Vladimir Filipović <hemflit at gmail.com> wrote:

> Hi!
> I originally submitted this as a pull request. Raymond Hettinger
> suggested it should be given a shakeout in python-ideas first.
> https://github.com/python/cpython/pull/10018
> https://bugs.python.org/issue35034
> ------
> Briefly:
> Add a close() method to Queue, which should simplify many common uses
> of the class and reduce the space for some easy-to-make errors.
> Also add an __iter__() method which in conjunction with close() would
> further simplify some common use patterns.
> ------
> At eye-watering length:
> Apologies in advance for the length of this message. This isn't a PEP
> in disguise, it's a proposal for a very small, simple and I dare
> imagine uncontroversial feature. I'm new to contributing to Python and
> after the BPO/github submission I didn't manage to come up with a
> better way to present it than this.
> The issue
> Code using threading.Queue often needs to coordinate a "work is
> finished as far as I care" state between the producing and consuming
> side. Not "work" in the task_done() sense of completion of processing
> of queue items, "work" in the simpler sense of just passing data
> through the queue.
> For example, a producer can be driving the communication by enqueuing
> e.g. names of files that need to be processed, and once it's enqueued
> the last filename, it can be useful to inform the consumers that no
> further names will be coming, so after they've retrieved what's
> in-flight currently, they don't need to bother waiting for any more.
> Alternatively, a consumer can be driving the communication, and may
> need to let the producers know "I'm not interested in any more, so you
> can stop wasting resources on producing and enqueuing them".
> Also, a third, coordinating component may need to let both sides know
> that "Our collective work here is done. Start wrapping it up y'all,
> but don't drop any items that are still in-flight."
> In practice it's probably the exception, not the rule, when any piece
> of code interacting with a Queue _doesn't_ have to either inform
> another component that its interest in transferring the data has
> ended, or watch for such information.
> In the most common case of producer letting consumers know that it's
> done, this is usually implemented (over and over again) with sentinel
> objects, which is at best needlessly verbose and at worst error-prone.
> A recipe for multiple consumers making sure nobody misses the sentinel
> is not complicated, but neither is it obvious the first time one needs
> to do it.
> When a generic sentinel (None or similar) isn't adequate, some
> component needs to create the sentinel object and communicate it to
> the others, which complicates code, and especially complicates
> interfaces between components that are not being developed together
> (e.g. if one of them is part of a library and expects the library-user
> code to talk to it through a Queue).
> In the less common cases where the producers are the ones being
> notified, there isn't even a typical solution - everything needs to be
> cooked up from scratch using synchronization primitives.
> ------
> A solution
> Adding a close() method to the Queue that simply prohibits all further
> put()'s (with other methods acting appropriately when the queue is
> closed) would simplify a lot of this in a clean and safe way - for the
> most obvious example, multi-consumer code would not have to juggle
> sentinel objects.
> Adding a further __iter__() method (that would block as necessary, and
> stop its iteration once the queue is closed and exhausted) would
> especially simplify many unsophisticated consumers.
> This is a current fairly ordinary pattern:
> # Producer:
> while some_condition:
>     q.put(generate_item())
> q.put(sentinel)
> # Consumer:
> while True:
>     item = q.get()
>     if item == sentinel:
>         q.put(sentinel)
>         break
>     process(item)
> (This consumer could be simplified a little with an assignment
> expression or an iter(q.get, sentinel), but one of those is super new
> and the other seems little-known in spite of being nearly old enough
> to vote.)
> With the close() and __iter__(), this would become:
> # Producer:
> with closing(q):
>     while some_condition:
>         q.put(generate_item())
> # Consumer:
> for item in q:
>     process(item)
> Apart from it being shorter and less error-prone (e.g. if
> generate_item raises), the implied interface for initializing the two
> components is also simplified, because there's no sentinel to pass
> around.
> More complex consumers that couldn't take advantage of the __iter__()
> would still benefit from being able to explicitly and readably find
> out (via exception or querying) that the queue has been closed and
> exhausted, without juggling the sentinel.
> I honestly think this small addition would be an unqualified win. And
> it would not change anything for code that doesn't want to use it.
> ------
> I've got a sample implementation ready for Queue and its children.
> (Link is at the start of this message. It includes documentation
> updates too, in case those clarify anything further at this stage.)
> If this is going in the right direction, I'm happy to do the same for
> SimpleQueue, but I haven't done it yet. I'm still getting my bearings
> around the C code base.
> ------
> To immediately answer some of Raymond's initial comments at BPO:
> This is completely independent from Queue's existing task-tracking
> protocol. One is about controlling transport, and the other about
> tracking the completion of processing after transport. I hesitate to
> call them "orthogonal", but there is no functional overlap between
> them at all.
> I keep talking about "common cases" and such weaselly concepts above,
> and really it's just conjecture based on my own limited experience and
> informal conversations.
> BUT, I've also done a survey of the standard library itself. There are
> four packages that use threading.Queue: concurrent, idlelib, logging,
> multiprocessing. All except idlelib have at least one piece that would
> have benefited from a Queue.close() if it had been available when they
> were being written.
> I've now had a look at a handful of other languages too:
> - Java and C# have nothing like this. They basically start from a
> deque as a collection, and add synchronization features to it; C++ STL
> doesn't even go that far. None of them do the Python thing where a
> Queue is a dedicated communication tool that just uses a collection as
> part of its implementation.
> - Ruby (to my mild surprise) also does nothing like this.
> - Clojure does just about the same thing as this proposal, yay.
> - Go does approximately the same thing, just more finicky about
> practical usage. The producer can say `close(q)` and the consumers can
> say `for item := range q { process(item) }` which do exactly the same
> thing as the proposed Python equivalents, but it has some harsh
> limitations that are not applicable to Python. (Can't re-close a
> closed channel, can't query whether a channel is closed outside of
> retrieving an item).
> ------
> To anticipate a couple more possible questions:
> - What would this proposal do about multiple producers/consumers
> needing to jointly decide _when_ to close the queue?
> Explicitly nothing.
> The queue's state is either closed or not, and it doesn't care who
> closed it. It needs to interact correctly with multiple consumers and
> multiple producers, but once any one piece of code closes it, the
> correct interaction is acting like a closed queue for everybody.
> When e.g. multiple producers need to arrange among themselves that the
> queue be closed only after all of them are done producing, then it's
> not the queue's job to coordinate _that_. They probably need a
> Semaphore or a more complex coordinator in charge of the closing. Yes,
> this too is a non-trivial-to-get-right situation, but trying to solve
> this one inside the Queue class seems like bloat.
> - Why not make the Queue a context manager while you're at it? Most
> closeable classes do it.
> I don't see that as a clear win in this case. Happy to add it if
> there's consensus in its favour, of course.
> I think `with closing(q):` is much more expressive than `with q:`
> while still brief. The Queue is more versatile in use than files,
> database cursors and many other resource-type classes, so the meaning
> of a `with q:` would not be as immediately suggestive as with them. It
> also often needs to be passed around between creation and initial use
> (the whole point is that more than one piece of code has access to it)
> so the common idiom `with Queue() as q:` would be a slightly less
> natural fit than with resource-like classes.
> - Should SimpleQueue do the same thing as Queue?
> Yes, I would propose so and volunteer to implement it.
> Some details would be adapted to SimpleQueue's existing promises
> (put() can't fail) but I think this feature is very much in the spirit
> of SimpleQueue.
> - Should multiprocessing.Queue do the same thing?
> I think so, though I'm not proposing it.
> It already has a close() method, whose meaning is very similar but not
> identical to (a subset of) the proposed threading.Queue.close's
> meaning (with resource-management effects not relevant to
> threading.Queue either way). I'd hope that this "not identical" part
> would not cause any problems in practice (lots of things aren't
> identical between those two Queues), but all hoping aside, maybe
> people more experienced than me can evaluate if that's really the
> case.
> I also have no clear idea if the feature would be as valuable, and if
> the implementation would be as easy and clean as they are with
> threading.Queue.
> - Should asyncio.Queue do the same thing?
> Probably? I'm too unfamiliar with asyncio for my opinion on this to be
> of value. So I'm not proposing it.
> ------
> _______________________________________________
> Python-ideas mailing list
> Python-ideas at python.org
> https://mail.python.org/mailman/listinfo/python-ideas
> Code of Conduct: http://python.org/psf/codeofconduct/
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-ideas/attachments/20181021/9577a7e2/attachment-0001.html>

More information about the Python-ideas mailing list