Multiprocessing queue sharing and python3.8
Israel Brewster
ijbrewster at alaska.edu
Mon Apr 6 13:23:52 EDT 2020
Under python 3.7 (and all previous versions I have used), the following code works properly, and produces the expected output:
import multiprocessing as mp
mp_comm_queue = None #Will be initalized in the main function
mp_comm_queue2=mp.Queue() #Test pre-initalized as well
def some_complex_function(x):
print(id(mp_comm_queue2))
assert(mp_comm_queue is not None)
print(x)
return x*2
def main():
global mp_comm_queue
#initalize the Queue
mp_comm_queue=mp.Queue()
#Set up a pool to process a bunch of stuff in parallel
pool=mp.Pool()
values=range(20)
data=pool.imap(some_complex_function,values)
for val in data:
print(f"**{val}**")
if __name__=="__main__":
main()
- mp_comm_queue2 has the same ID for all iterations of some_complex_function, and the assert passes (mp_comm_queue is not None). However, under python 3.8, it fails - mp_comm_queue2 is a *different* object for each iteration, and the assert fails.
So what am I doing wrong with the above example block? Assuming that it broke in 3.8 because I wasn’t sharing the Queue properly, what is the proper way to share a Queue object among multiple processes for the purposes of inter-process communication?
The documentation (https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes <https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes>) appears to indicate that I should pass the queue as an argument to the function to be executed in parallel, however that fails as well (on ALL versions of python I have tried) with the error:
Traceback (most recent call last):
File "test_multi.py", line 32, in <module>
main()
File "test_multi.py", line 28, in main
for val in data:
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 748, in next
raise value
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks
put(task)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 356, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
after I add the following to the code to try passing the queue rather than having it global:
#Try by passing queue
values=[(x,mp_comm_queue) for x in range(20)]
data=pool.imap(some_complex_function,values)
for val in data:
print(f"**{val}**")
So if I can’t pass it as an argument, and having it global is incorrect (at least starting with 3.8), what is the proper method of getting multiprocessing queues to child processes?
---
Israel Brewster
Software Engineer
Alaska Volcano Observatory
Geophysical Institute - UAF
2156 Koyukuk Drive
Fairbanks AK 99775-7320
Work: 907-474-5172
cell: 907-328-9145
More information about the Python-list
mailing list