[New-bugs-announce] [issue35608] python3 multiprocessing queue deadlock when use thread and process at same time
白稳平
report at bugs.python.org
Fri Dec 28 21:53:42 EST 2018
New submission from 白稳平 <baiwenping00 at gmail.com>:
I used multi-processes to handle cpu intensive task,I have a thread reading data from stdin and put it to a input_queue, a thread get data from output_queue and write it to stdout, multiple processes get data from input queue,then handled the data,and put it to output_queue.But It some times will block forever,I doubt that it was because inappropriate to use the multiprocessing Queue,But I don't know how to solved it,can anyone help me?
my code as follows:
import multiprocessing
import sys
import threading
import time
from multiprocessing import Queue
def write_to_stdout(result_queue: Queue):
"""write queue data to stdout"""
while True:
data = result_queue.get()
if data is StopIteration:
break
sys.stdout.write(data)
sys.stdout.flush()
def read_from_stdin(queue):
"""read data from stdin, put it in queue for process handling"""
try:
for line in sys.stdin:
queue.put(line)
finally:
queue.put(StopIteration)
def process_func(input_queue, result_queue):
"""get data from input_queue,handled,put result into result_queue"""
try:
while True:
data = input_queue.get()
if data is StopIteration:
break
# cpu intensive task,use time.sleep instead
# result = compute_something(data)
time.sleep(0.1)
result_queue.put(data)
finally:
# ensure every process end
input_queue.put(StopIteration)
if __name__ == '__main__':
# queue for reading to stdout
input_queue = Queue(1000)
# queue for writing to stdout
result_queue = Queue(1000)
# thread reading data from stdin
input_thread = threading.Thread(target=read_from_stdin, args=(input_queue,))
input_thread.start()
# thread reading data from stdin
output_thread = threading.Thread(target=write_to_stdout, args=(result_queue,))
output_thread.start()
processes = []
cpu_count = multiprocessing.cpu_count()
# start multi-process to handle some cpu intensive task
for i in range(cpu_count):
proc = multiprocessing.Process(target=process_func, args=(input_queue, result_queue))
proc.start()
processes.append(proc)
# joined input thread
input_thread.join()
# joined all task processes
for proc in processes:
proc.join()
# ensure output thread end
result_queue.put(StopIteration)
# joined output thread
output_thread.join()
test environment:
python3.6.5
ubuntu16.04
----------
components: Library (Lib)
messages: 332691
nosy: davin, pitrou, 白稳平
priority: normal
severity: normal
status: open
title: python3 multiprocessing queue deadlock when use thread and process at same time
versions: Python 3.6
_______________________________________
Python tracker <report at bugs.python.org>
<https://bugs.python.org/issue35608>
_______________________________________
More information about the New-bugs-announce
mailing list