Hi,

  /usr/lib/python2.7/multiprocessing/pool.py hangs intermittently for large queue sizes.
  My suggested fix is to modify Pool class' join() method thus:

    def join(self):
        debug('joining pool')
        assert self._state in (CLOSE, TERMINATE)
        # moved the join for each subprocess before Pool thead joins
        for p in self._pool:
            p.join()
        # moved Pool thread joins after the subprocesses and add timeouts
        self._worker_handler.join(timeout=30)
        self._task_handler.join(timeout=30)
        self._result_handler.join(time=30)

The bulk of the work is done by subprocesses.  When these are done, the Pool class' threads should complete very quickly.

I'm fairly certain that the hang or stall happens in the threads' join, not the subprocesses'.
The timeout on the threads' joins should prevent any hanging.

The only way I've manually been able to reproduce the hanging is by the below script using the CalledProcessError exception.

  Kind Regards,
  -Nazar


--------------------------


import logging
from multiprocessing import Pool, Process
from time import sleep
import random
import pdb
import sys
import os
from subprocess import CalledProcessError


logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)


def f(x):
    pause = random.randint(1,2)
    #print("pid %s start, arg = %s, pause = %s" % (os.getpid(), x, pause))
    if pause % 2 == 0:
        #pause = 9 #9999
        raise CalledProcessError(1,2)
    sleep(0.01)  #pause)
    #print("pid %s finish, arg = %s" % (os.getpid(), x))


def print_status(pl):
    print("\n")
    for proc in pl._pool:
        print("Process %s, pid %s, is_alive %s, exit code %s, unfinished tasks %s" %
         (proc._name, proc.pid, proc.is_alive(), proc.exitcode, pl._taskqueue.unfinished_tasks))


import subprocess
def pool_join(proc_pool):
    """
    Join all of the process pool's subprocesses to the main processes
    Python Pool's "join" intermittently hangs for ~1K subprocesses: STATICQA-5027
    """
    RUN = 0
    CLOSE = 1
    TERMINATE = 2

    # log memory usage
    mem_str = subprocess.check_output(['cat', '/proc/meminfo'])
    logging.info('------ memory usage\n%s' % mem_str)

    logging.info('------ joining process pool')
    assert proc_pool._state in (CLOSE, TERMINATE)
    logging.info('------  proc_pool._worker_handler.join()')
    proc_pool._worker_handler.join()
    logging.info('------ proc_pool._task_handler.join()')
    proc_pool._task_handler.join()
    logging.info('------ proc_pool._result_handler.join()')
    proc_pool._result_handler.join()
    logging.info('------ starting to join each process in POOL')
    for proc in proc_pool._pool:
        logging.info("------ Joining process %s, pid %s, is_alive %s, exit code %s" %
         (proc._name, proc.pid, proc.is_alive(), proc.exitcode))
        proc.join()
    logging.info("------- All POOL joins are done")



if __name__ == '__main__':
    pl = Pool(6)
    NUM_PROC = 100
    for i in range(1,NUM_PROC):
        res = pl.apply_async(f, (i,))
    pl.close()
    print("Num processes = %s" % len(pl._pool))
    for i in range(4):
        print_status(pl)
        sleep(2)
    print("before join")
    #pl.join()
    pool_join(pl)
    print("after join")
    print_status(pl)