Pyton 2.7.14 Multiprocessing Pool Join Hanging
Hi, /usr/lib/python2.7/multiprocessing/pool.py hangs intermittently for large queue sizes. My suggested fix is to modify Pool class' join() method thus: def join(self): debug('joining pool') assert self._state in (CLOSE, TERMINATE) # moved the join for each subprocess before Pool thead joins for p in self._pool: p.join() # moved Pool thread joins after the subprocesses and add timeouts self._worker_handler.join(timeout=30) self._task_handler.join(timeout=30) self._result_handler.join(time=30) The bulk of the work is done by subprocesses. When these are done, the Pool class' threads should complete very quickly. I'm fairly certain that the hang or stall happens in the threads' join, not the subprocesses'. The timeout on the threads' joins should prevent any hanging. The only way I've manually been able to reproduce the hanging is by the below script using the *CalledProcessError exception.* Kind Regards, -Nazar -------------------------- import logging from multiprocessing import Pool, Process from time import sleep import random import pdb import sys import os from subprocess import CalledProcessError logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) def f(x): pause = random.randint(1,2) #print("pid %s start, arg = %s, pause = %s" % (os.getpid(), x, pause)) if pause % 2 == 0: #pause = 9 #9999 raise CalledProcessError(1,2) sleep(0.01) #pause) #print("pid %s finish, arg = %s" % (os.getpid(), x)) def print_status(pl): print("\n") for proc in pl._pool: print("Process %s, pid %s, is_alive %s, exit code %s, unfinished tasks %s" % (proc._name, proc.pid, proc.is_alive(), proc.exitcode, pl._taskqueue.unfinished_tasks)) import subprocess def pool_join(proc_pool): """ Join all of the process pool's subprocesses to the main processes Python Pool's "join" intermittently hangs for ~1K subprocesses: STATICQA-5027 """ RUN = 0 CLOSE = 1 TERMINATE = 2 # log memory usage mem_str = subprocess.check_output(['cat', '/proc/meminfo']) logging.info('------ memory usage\n%s' % mem_str) logging.info('------ joining process pool') assert proc_pool._state in (CLOSE, TERMINATE) logging.info('------ proc_pool._worker_handler.join()') proc_pool._worker_handler.join() logging.info('------ proc_pool._task_handler.join()') proc_pool._task_handler.join() logging.info('------ proc_pool._result_handler.join()') proc_pool._result_handler.join() logging.info('------ starting to join each process in POOL') for proc in proc_pool._pool: logging.info("------ Joining process %s, pid %s, is_alive %s, exit code %s" % (proc._name, proc.pid, proc.is_alive(), proc.exitcode)) proc.join() logging.info("------- All POOL joins are done") if __name__ == '__main__': pl = Pool(6) NUM_PROC = 100 for i in range(1,NUM_PROC): res = pl.apply_async(f, (i,)) pl.close() print("Num processes = %s" % len(pl._pool)) for i in range(4): print_status(pl) sleep(2) print("before join") #pl.join() pool_join(pl) print("after join") print_status(pl)
participants (1)
-
Smirnov