
Anh Hai Trinh wrote:
I will note that the check-and-reinsert example given elsewhere in this thread contains a race condition in the multiple producer use case or in cases where a single producer may place additional items in the queue after the shutdown sentinel:
def _iterqueue(queue): while 1: item = queue.get() # Queue is not locked here, producers may insert more items # and other consumers may process items that were in the queue # after the sentinel if item is StopIteration: # We put StopIteration into the queue again, but it may not be # at the front if a producer inserted something after the original # insertion of the sentinel value queue.put(StopIteration) break else: yield item
You are quite right. The assumption is that the StopIteration singleton correctly marks the end of the queue however that is achieved (e.g. have the producer-spawning thread join() with all producers, then put StopIteration).
I realised that my suggestion wasn't threadsafe when there was more than one consumer some time after I posted. :-( OK, so how about this: Add a new method close(), which sets a 'closed' flag in the queue. Currently, if the queue is empty then a blocking get() will wait and a non-blocking get() will raise an Empty exception. In future, if the queue is empty and the 'closed' flag is set then a get(), whether blocking or non-blocking, will raise a Closed exception. (The Closed exception is only ever raised if close() is called, so the addition won't break existing code.) An additional enhancement for multiple producers is to add a keyword argument that specifies the number of producers and count the number of times that close() has been called (each producer would call close() once). The Closed exception would be raised only if the queue is empty and the 'closed' count has reached the number of producers. __iter__ can now be: def __iter__(self): try: while True: yield self.get() except Closed: raise StopIteration