`__iter__` for queues?

Hello, Is there a reason that queues don't have an `__iter__` method? I mean both `Queue.Queue` and `multiprocessing.Queue`. I had to write up my own "iterate on queue" function for use in my project. Do you think that it should be a built-in method on the Queue class? Ram.

Georg Brandl wrote:
To me the 'obvious' meaning is to call get(block=True) and have it raise Empty (actually, StopIteration) when the queue is empty and the 'sender' has somehow signalled that no more items will be put into the queue (q.finished()?). This would also eliminate the need for a sentinel!

On 19Jan2010 23:01, MRAB <python@mrabarnett.plus.com> wrote: | Georg Brandl wrote: | >Am 19.01.2010 21:20, schrieb cool-RR: | >>On Tue, Jan 19, 2010 at 11:10 PM, Simon Brunning <simon@brunningonline.net>: | >> 2010/1/19 cool-RR <cool-rr@cool-rr.com>: | >> > Is there a reason that queues don't have an `__iter__` method? I | >> > mean both `Queue.Queue` and `multiprocessing.Queue`. | >> | >> Could it be made threadsafe? | >> | >>For me, iterating on the queue means just calling `get` repeatedly until | >>it's empty. Now that I think about it, maybe this is not the most | >>obvious meaning? I'm not sure now. | | >Your obvious queue iterator would call get(block=False) and stop on Empty. | >The other obvious meaning is be to call get(block=True) forever. IMO they | >are both too "obvious" to make a call -- an explicit while loop is better. | > | To me the 'obvious' meaning is to call get(block=True) and have it raise | Empty (actually, StopIteration) when the queue is empty and the 'sender' | has somehow signalled that no more items will be put into the queue | (q.finished()?). This would also eliminate the need for a sentinel! Personally, I have long had an IterableQueue subclass, but it uses a tunable sentinel (None by default). But it adds a .close() method, and doesn't iterate until empty, it iterates until closed. This is because I want to write a handler like so: for item in Q: ...do stuff... and have it block if the queue is empty. So clearly there are two reasonable approaches to the end-of-iteration idea; extending Queue to do iteration probably would want to choose one. So maybe two helper iteration methods might be the go: it's easy enough to write a generator to iterate-until-empty or iterate-until-sentinel. Cheers, -- Cameron Simpson <cs@zip.com.au> DoD#743 http://www.cskk.ezoshosting.com/cs/ ... It beeped and said "Countdown initiated." Is that bad?

Cameron Simpson wrote:
Ah, yes, .close() would be better. :-) As for iterate-until-empty or iterate-until-sentinel, perhaps only a keyword argument is needed when the queue is created. Another possible feature (although this might be going too far) is a keyword argument for the number of 'producers' (default=1). Each can close the queue when it's finished, and StopIteration will be raised only when all have closed and the queue is empty.

On 1/19/2010 4:20 PM, cool-RR wrote:
Explicit .get()s release the queue between getting each item so other code can also put and get. A mutable collection iterator may or may not lock the collection or monitor mutations. Dict iteration (and, I presume, set iteration, but I see nothing in the doc) monitors mutation. List iteration does not. While modifying a list during iteration can have uses, it is also leads to bugs and newby confusion. On the otherhand, one can write explicitly iterate dicts with while d: k,v = dict.getitem() <go ahead and modify dict> and the same with set.pop and have no problem. So iterating with an iterator is not quite the same as repeated one-item fetches. Terry Jan Reedy

On Jan 19, 2010, at 6:06 PM, Terry Reedy wrote:
If that is all you're looking for, then just do that. Write a short generator to dump the queue. You can do that without blocking your producer thread. The problem with proposing an change to the API for Queue.Queue and MultiProcessing.Queue is that it presents non-obvious pitfalls for other common use cases. It is better to leave it out of the API and let users explicitly craft solutions that fit their problem (perhaps using two queues: Producer ---> DatabaseUpdater --> Consumer or some solution involving locking). For many uses of Queue, it doesn't make sense to block a producer for the time it takes to iterate, nor does it make sense to have a non-blocking iterator (leading to a race condition and inconsistent results). It is better to use the API as designed than to propose a new method than may be unsuitable for many use cases.
So iterating with an iterator is not quite the same as repeated one-item fetches.
Right. The two ideas should not be conflated. A Queue object is all about buffering producers and consumers running in different threads. Anything API that is at odds with that notion is probably not a good idea (i.e. iterating over a dynamically updating queue buffer). Raymond

That would not be thread-safe because your sentinel is only seen once by a single lucky consumer, other listeners who called iter() will still execute self.get() and block forever. The solution I found is to put the sentinel back in the queue so that other consumers will see it (this use StopIteration as the sentinel): def _iterqueue(queue): # Turn a either a threading.Queue or a multiprocessing.queues.SimpleQueue # into an thread-safe iterator which will exhaust when StopIteration is # put into it. while 1: item = queue.get() if item is StopIteration: # Re-broadcast, in case there is another listener blocking on # queue.get(). That listener will receive StopIteration and # re-broadcast to the next one in line. queue.put(StopIteration) break else: yield item I agree, however, that this should be left out of the stdlib API. -- // aht http://blog.onideas.ws

On Jan 20, 2010, at 3:59 AM, cool-RR wrote:
I think you need to abandon the notion of iteration and instead focus on your original use case of (destructively) pulling all of the items out of a queue. Taking a cue (no pun intended) from the DB-API's fetchone() and fetchall(), perhaps you're looking for a getall() method? def get_all(self, block=True, timeout=None): """Remove and return all items from the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until an item is available. If 'timeout' is a positive number, it blocks at most 'timeout' seconds and raises the Empty exception if no item was available within that time. Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). """ self.not_empty.acquire() try: if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a positive number") else: endtime = _time() + timeout while not self._qsize(): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) result = [] while self._qsize(): result.append(self._get()) self.not_full.notify() return result finally: self.not_empty.release() Raymond

Thanks for the code Raymond. However, what I use is actually simpler: def iterate(queue, block=False): '''Iterate over the items in the queue.''' while True: try: yield queue.get(block=block) except Queue.Empty: raise StopIteration Since it's a generator, the flow of the program is (get item) -> (process it) -> (get item) -> (process it) instead of getting all the items, putting them in a list, and then beginning to process. (Because by the time we're done processing, there might be more work in the queue.) Ram. On Wed, Jan 20, 2010 at 9:36 PM, Raymond Hettinger < raymond.hettinger@gmail.com> wrote:
-- Sincerely, Ram Rachum

On Wed, Jan 20, 2010 at 9:36 PM, Raymond Hettinger <raymond.hettinger@gmail.com> wrote:
It's not obvious to me what "all" should mean, or even if it makes sense, in the default case (`block=True, timeout=None`) when there are no items. IIUC, the implementation above makes it effectively equivalent to get() in this case, i.e. a list with a single-item is returned. Although that's reasonable, it seems arbitrary. I'd find more intuitive a method with signature `get_all(self, timeout=0.0)`, which by default is equivalent to the `block=False` above, and equivalent to `block=True, timeout=timeout` if timeout>0. George

On Jan 19, 2010, at 1:10 PM, Simon Brunning wrote:
To me, this is exactly the right line of questioning. Is there a use case for iterating through a queue that is being mutated by consumers and producers? If so, should the consumers and producers be locked out during iteration or should the iterator fail if it detects a mutation (like we currently do for dictionaries that change during iteration). The semantics should be dictated by use cases. When someone writes "for t in q: action(q)" what is the most useful thing to happen when another thread does a get or put? My personal opinion is that queues are better-off without an iterator. They serve mainly to buffer consumers and producers and iteration does fit in well. In other words, adding __iter__ to queues in threaded environment may do more harm than good. Raymond

On Wed, Jan 20, 2010 at 2:08 AM, Raymond Hettinger < raymond.hettinger@gmail.com> wrote:
In my program, I have one thread putting data into a queue, and the main thread periodically iterates over the whole queue to integrate all the data into a central data structure. The general use case is when you want a thread to take items from a queue and process them until the queue's empty. Like: for work_item in queue: process(work_item)
(I think you did a typo in your code sample, I think you meant `action(t)`) What I'm suggesting is that the iteration won't care about what other threads do with the queue. It'll just `get` until it's empty.
Okay. I don't have much experience in this, so unless someone else will find this suggestion useful, I'll stick with my custom function. Ram.

cool-RR wrote:
I agree. I don't see why __iter__ wouldn't work when .get does. 'Empty' could mean either a non-blocking .get and stopping when there are no more items in the queue, or a blocking .get and stopping when there are no more items in the queue _and_ the queue has been closed by the producer(s).

MRAB wrote:
When there are multiple valid approaches to iterating over a queue, and none of them are more obviously fundamental than any of the others, does it make sense for us to bless one by implementing it as the __iter__ method? Now, an iteration API (separate from __iter__) that helped developers easily avoid some of the pitfalls of rolling your own iteration techniques (such as the reinsertion of sentinel values) may provide some value, but I suspect it would be difficult to produce an effective API even with function parameters to tweak the behaviour. I will note that the check-and-reinsert example given elsewhere in this thread contains a race condition in the multiple producer use case or in cases where a single producer may place additional items in the queue after the shutdown sentinel: def _iterqueue(queue): while 1: item = queue.get() # Queue is not locked here, producers may insert more items # and other consumers may process items that were in the queue # after the sentinel if item is StopIteration: # We put StopIteration into the queue again, but it may not be # at the front if a producer inserted something after the original # insertion of the sentinel value queue.put(StopIteration) break else: yield item This approach also doesn't work for sentinel values that are intended to mean "stop processing the queue, go do something else, then come back and start processing this queue again" since the queue can no longer be processed at all after the sentinel value has been inserted once. Cheers, Nick. P.S. An easy way to iterate over a queue in the simple "single consumer with sentinel value" use case: for obj in iter(q.get, sentinel): pass -- Nick Coghlan | ncoghlan@gmail.com | Brisbane, Australia ---------------------------------------------------------------

You are quite right. The assumption is that the StopIteration singleton correctly marks the end of the queue however that is achieved (e.g. have the producer-spawning thread join() with all producers, then put StopIteration). Cheers, -- // aht http://blog.onideas.ws

Anh Hai Trinh wrote:
I realised that my suggestion wasn't threadsafe when there was more than one consumer some time after I posted. :-( OK, so how about this: Add a new method close(), which sets a 'closed' flag in the queue. Currently, if the queue is empty then a blocking get() will wait and a non-blocking get() will raise an Empty exception. In future, if the queue is empty and the 'closed' flag is set then a get(), whether blocking or non-blocking, will raise a Closed exception. (The Closed exception is only ever raised if close() is called, so the addition won't break existing code.) An additional enhancement for multiple producers is to add a keyword argument that specifies the number of producers and count the number of times that close() has been called (each producer would call close() once). The Closed exception would be raised only if the queue is empty and the 'closed' count has reached the number of producers. __iter__ can now be: def __iter__(self): try: while True: yield self.get() except Closed: raise StopIteration

Georg Brandl wrote:
To me the 'obvious' meaning is to call get(block=True) and have it raise Empty (actually, StopIteration) when the queue is empty and the 'sender' has somehow signalled that no more items will be put into the queue (q.finished()?). This would also eliminate the need for a sentinel!

On 19Jan2010 23:01, MRAB <python@mrabarnett.plus.com> wrote: | Georg Brandl wrote: | >Am 19.01.2010 21:20, schrieb cool-RR: | >>On Tue, Jan 19, 2010 at 11:10 PM, Simon Brunning <simon@brunningonline.net>: | >> 2010/1/19 cool-RR <cool-rr@cool-rr.com>: | >> > Is there a reason that queues don't have an `__iter__` method? I | >> > mean both `Queue.Queue` and `multiprocessing.Queue`. | >> | >> Could it be made threadsafe? | >> | >>For me, iterating on the queue means just calling `get` repeatedly until | >>it's empty. Now that I think about it, maybe this is not the most | >>obvious meaning? I'm not sure now. | | >Your obvious queue iterator would call get(block=False) and stop on Empty. | >The other obvious meaning is be to call get(block=True) forever. IMO they | >are both too "obvious" to make a call -- an explicit while loop is better. | > | To me the 'obvious' meaning is to call get(block=True) and have it raise | Empty (actually, StopIteration) when the queue is empty and the 'sender' | has somehow signalled that no more items will be put into the queue | (q.finished()?). This would also eliminate the need for a sentinel! Personally, I have long had an IterableQueue subclass, but it uses a tunable sentinel (None by default). But it adds a .close() method, and doesn't iterate until empty, it iterates until closed. This is because I want to write a handler like so: for item in Q: ...do stuff... and have it block if the queue is empty. So clearly there are two reasonable approaches to the end-of-iteration idea; extending Queue to do iteration probably would want to choose one. So maybe two helper iteration methods might be the go: it's easy enough to write a generator to iterate-until-empty or iterate-until-sentinel. Cheers, -- Cameron Simpson <cs@zip.com.au> DoD#743 http://www.cskk.ezoshosting.com/cs/ ... It beeped and said "Countdown initiated." Is that bad?

Cameron Simpson wrote:
Ah, yes, .close() would be better. :-) As for iterate-until-empty or iterate-until-sentinel, perhaps only a keyword argument is needed when the queue is created. Another possible feature (although this might be going too far) is a keyword argument for the number of 'producers' (default=1). Each can close the queue when it's finished, and StopIteration will be raised only when all have closed and the queue is empty.

On 1/19/2010 4:20 PM, cool-RR wrote:
Explicit .get()s release the queue between getting each item so other code can also put and get. A mutable collection iterator may or may not lock the collection or monitor mutations. Dict iteration (and, I presume, set iteration, but I see nothing in the doc) monitors mutation. List iteration does not. While modifying a list during iteration can have uses, it is also leads to bugs and newby confusion. On the otherhand, one can write explicitly iterate dicts with while d: k,v = dict.getitem() <go ahead and modify dict> and the same with set.pop and have no problem. So iterating with an iterator is not quite the same as repeated one-item fetches. Terry Jan Reedy

On Jan 19, 2010, at 6:06 PM, Terry Reedy wrote:
If that is all you're looking for, then just do that. Write a short generator to dump the queue. You can do that without blocking your producer thread. The problem with proposing an change to the API for Queue.Queue and MultiProcessing.Queue is that it presents non-obvious pitfalls for other common use cases. It is better to leave it out of the API and let users explicitly craft solutions that fit their problem (perhaps using two queues: Producer ---> DatabaseUpdater --> Consumer or some solution involving locking). For many uses of Queue, it doesn't make sense to block a producer for the time it takes to iterate, nor does it make sense to have a non-blocking iterator (leading to a race condition and inconsistent results). It is better to use the API as designed than to propose a new method than may be unsuitable for many use cases.
So iterating with an iterator is not quite the same as repeated one-item fetches.
Right. The two ideas should not be conflated. A Queue object is all about buffering producers and consumers running in different threads. Anything API that is at odds with that notion is probably not a good idea (i.e. iterating over a dynamically updating queue buffer). Raymond

That would not be thread-safe because your sentinel is only seen once by a single lucky consumer, other listeners who called iter() will still execute self.get() and block forever. The solution I found is to put the sentinel back in the queue so that other consumers will see it (this use StopIteration as the sentinel): def _iterqueue(queue): # Turn a either a threading.Queue or a multiprocessing.queues.SimpleQueue # into an thread-safe iterator which will exhaust when StopIteration is # put into it. while 1: item = queue.get() if item is StopIteration: # Re-broadcast, in case there is another listener blocking on # queue.get(). That listener will receive StopIteration and # re-broadcast to the next one in line. queue.put(StopIteration) break else: yield item I agree, however, that this should be left out of the stdlib API. -- // aht http://blog.onideas.ws

On Jan 20, 2010, at 3:59 AM, cool-RR wrote:
I think you need to abandon the notion of iteration and instead focus on your original use case of (destructively) pulling all of the items out of a queue. Taking a cue (no pun intended) from the DB-API's fetchone() and fetchall(), perhaps you're looking for a getall() method? def get_all(self, block=True, timeout=None): """Remove and return all items from the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until an item is available. If 'timeout' is a positive number, it blocks at most 'timeout' seconds and raises the Empty exception if no item was available within that time. Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). """ self.not_empty.acquire() try: if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a positive number") else: endtime = _time() + timeout while not self._qsize(): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) result = [] while self._qsize(): result.append(self._get()) self.not_full.notify() return result finally: self.not_empty.release() Raymond

Thanks for the code Raymond. However, what I use is actually simpler: def iterate(queue, block=False): '''Iterate over the items in the queue.''' while True: try: yield queue.get(block=block) except Queue.Empty: raise StopIteration Since it's a generator, the flow of the program is (get item) -> (process it) -> (get item) -> (process it) instead of getting all the items, putting them in a list, and then beginning to process. (Because by the time we're done processing, there might be more work in the queue.) Ram. On Wed, Jan 20, 2010 at 9:36 PM, Raymond Hettinger < raymond.hettinger@gmail.com> wrote:
-- Sincerely, Ram Rachum

On Wed, Jan 20, 2010 at 9:36 PM, Raymond Hettinger <raymond.hettinger@gmail.com> wrote:
It's not obvious to me what "all" should mean, or even if it makes sense, in the default case (`block=True, timeout=None`) when there are no items. IIUC, the implementation above makes it effectively equivalent to get() in this case, i.e. a list with a single-item is returned. Although that's reasonable, it seems arbitrary. I'd find more intuitive a method with signature `get_all(self, timeout=0.0)`, which by default is equivalent to the `block=False` above, and equivalent to `block=True, timeout=timeout` if timeout>0. George

On Jan 19, 2010, at 1:10 PM, Simon Brunning wrote:
To me, this is exactly the right line of questioning. Is there a use case for iterating through a queue that is being mutated by consumers and producers? If so, should the consumers and producers be locked out during iteration or should the iterator fail if it detects a mutation (like we currently do for dictionaries that change during iteration). The semantics should be dictated by use cases. When someone writes "for t in q: action(q)" what is the most useful thing to happen when another thread does a get or put? My personal opinion is that queues are better-off without an iterator. They serve mainly to buffer consumers and producers and iteration does fit in well. In other words, adding __iter__ to queues in threaded environment may do more harm than good. Raymond

On Wed, Jan 20, 2010 at 2:08 AM, Raymond Hettinger < raymond.hettinger@gmail.com> wrote:
In my program, I have one thread putting data into a queue, and the main thread periodically iterates over the whole queue to integrate all the data into a central data structure. The general use case is when you want a thread to take items from a queue and process them until the queue's empty. Like: for work_item in queue: process(work_item)
(I think you did a typo in your code sample, I think you meant `action(t)`) What I'm suggesting is that the iteration won't care about what other threads do with the queue. It'll just `get` until it's empty.
Okay. I don't have much experience in this, so unless someone else will find this suggestion useful, I'll stick with my custom function. Ram.

cool-RR wrote:
I agree. I don't see why __iter__ wouldn't work when .get does. 'Empty' could mean either a non-blocking .get and stopping when there are no more items in the queue, or a blocking .get and stopping when there are no more items in the queue _and_ the queue has been closed by the producer(s).

MRAB wrote:
When there are multiple valid approaches to iterating over a queue, and none of them are more obviously fundamental than any of the others, does it make sense for us to bless one by implementing it as the __iter__ method? Now, an iteration API (separate from __iter__) that helped developers easily avoid some of the pitfalls of rolling your own iteration techniques (such as the reinsertion of sentinel values) may provide some value, but I suspect it would be difficult to produce an effective API even with function parameters to tweak the behaviour. I will note that the check-and-reinsert example given elsewhere in this thread contains a race condition in the multiple producer use case or in cases where a single producer may place additional items in the queue after the shutdown sentinel: def _iterqueue(queue): while 1: item = queue.get() # Queue is not locked here, producers may insert more items # and other consumers may process items that were in the queue # after the sentinel if item is StopIteration: # We put StopIteration into the queue again, but it may not be # at the front if a producer inserted something after the original # insertion of the sentinel value queue.put(StopIteration) break else: yield item This approach also doesn't work for sentinel values that are intended to mean "stop processing the queue, go do something else, then come back and start processing this queue again" since the queue can no longer be processed at all after the sentinel value has been inserted once. Cheers, Nick. P.S. An easy way to iterate over a queue in the simple "single consumer with sentinel value" use case: for obj in iter(q.get, sentinel): pass -- Nick Coghlan | ncoghlan@gmail.com | Brisbane, Australia ---------------------------------------------------------------

You are quite right. The assumption is that the StopIteration singleton correctly marks the end of the queue however that is achieved (e.g. have the producer-spawning thread join() with all producers, then put StopIteration). Cheers, -- // aht http://blog.onideas.ws

Anh Hai Trinh wrote:
I realised that my suggestion wasn't threadsafe when there was more than one consumer some time after I posted. :-( OK, so how about this: Add a new method close(), which sets a 'closed' flag in the queue. Currently, if the queue is empty then a blocking get() will wait and a non-blocking get() will raise an Empty exception. In future, if the queue is empty and the 'closed' flag is set then a get(), whether blocking or non-blocking, will raise a Closed exception. (The Closed exception is only ever raised if close() is called, so the addition won't break existing code.) An additional enhancement for multiple producers is to add a keyword argument that specifies the number of producers and count the number of times that close() has been called (each producer would call close() once). The Closed exception would be raised only if the queue is empty and the 'closed' count has reached the number of producers. __iter__ can now be: def __iter__(self): try: while True: yield self.get() except Closed: raise StopIteration
participants (12)
-
Anh Hai Trinh
-
Cameron Simpson
-
cool-RR
-
Georg Brandl
-
George Sakkis
-
Jesse Noller
-
MRAB
-
Nick Coghlan
-
Raymond Hettinger
-
Simon Brunning
-
Stefan Behnel
-
Terry Reedy