[Python-checkins] bpo-47029: Fix BrokenPipeError in multiprocessing.Queue at garbage collection and explicit close (GH-31913)

miss-islington webhook-mailer at python.org
Tue May 3 20:16:42 EDT 2022


https://github.com/python/cpython/commit/28eea73e7c5405ec41dda0cddae2a3ebaac908f5
commit: 28eea73e7c5405ec41dda0cddae2a3ebaac908f5
branch: 3.10
author: Miss Islington (bot) <31488909+miss-islington at users.noreply.github.com>
committer: miss-islington <31488909+miss-islington at users.noreply.github.com>
date: 2022-05-03T17:16:21-07:00
summary:

bpo-47029: Fix BrokenPipeError in multiprocessing.Queue at garbage collection and explicit close (GH-31913)

(cherry picked from commit dfb1b9da8a4becaeaed3d9cffcaac41bcaf746f4)

Co-authored-by: Géry Ogam <gery.ogam at gmail.com>

files:
A Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst
M Lib/multiprocessing/queues.py

diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index a2901814876d6..f37f114a96887 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -139,13 +139,10 @@ def put_nowait(self, obj):
 
     def close(self):
         self._closed = True
-        try:
-            self._reader.close()
-        finally:
-            close = self._close
-            if close:
-                self._close = None
-                close()
+        close = self._close
+        if close:
+            self._close = None
+            close()
 
     def join_thread(self):
         debug('Queue.join_thread()')
@@ -169,8 +166,9 @@ def _start_thread(self):
         self._thread = threading.Thread(
             target=Queue._feed,
             args=(self._buffer, self._notempty, self._send_bytes,
-                  self._wlock, self._writer.close, self._ignore_epipe,
-                  self._on_queue_feeder_error, self._sem),
+                  self._wlock, self._reader.close, self._writer.close,
+                  self._ignore_epipe, self._on_queue_feeder_error,
+                  self._sem),
             name='QueueFeederThread'
         )
         self._thread.daemon = True
@@ -211,8 +209,8 @@ def _finalize_close(buffer, notempty):
             notempty.notify()
 
     @staticmethod
-    def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
-              onerror, queue_sem):
+    def _feed(buffer, notempty, send_bytes, writelock, reader_close,
+              writer_close, ignore_epipe, onerror, queue_sem):
         debug('starting thread to feed data to pipe')
         nacquire = notempty.acquire
         nrelease = notempty.release
@@ -238,7 +236,8 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
                         obj = bpopleft()
                         if obj is sentinel:
                             debug('feeder thread got sentinel -- exiting')
-                            close()
+                            reader_close()
+                            writer_close()
                             return
 
                         # serialize the data before acquiring the lock
diff --git a/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst
new file mode 100644
index 0000000000000..cc054673338f0
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-04-26-19-01-13.bpo-47029.qkT42X.rst
@@ -0,0 +1,4 @@
+Always close the read end of the pipe used by :class:`multiprocessing.Queue`
+*after* the last write of buffered data to the write end of the pipe to avoid
+:exc:`BrokenPipeError` at garbage collection and at
+:meth:`multiprocessing.Queue.close` calls. Patch by Géry Ogam.



More information about the Python-checkins mailing list