[New-bugs-announce] [issue38799] race condition in multiprocessing.Pool with maxtasksperchild=1

Steve Lorimer report at bugs.python.org
Thu Nov 14 09:24:23 EST 2019


New submission from Steve Lorimer <steve.lorimer at gmail.com>:

There is a race condition when closing/joining a pool with maxtasksperchild=1

Illustrative example:

```
#!/usr/bin/env python3
import os
import time
import logging
import multiprocessing.pool

def run_task(i):
    print(f'[{os.getpid()}] task({i}) complete')

if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)

    tasks = iter(range(10))
    processes = 4

    pool = multiprocessing.pool.Pool(processes=processes, maxtasksperchild=1)
    running = []
    while True:
        try:
            running = [ f for f in running if not f.ready() ]
            avail = processes - len(running)
            if avail:
                for _ in range(avail):
                    i = next(tasks)
                    print(f'[{os.getpid()}] add task({i})')
                    future = pool.apply_async(run_task, ( i, ))
                    running.append(future)
            else:
                time.sleep(0.1)
        except StopIteration:
            print(f'[{os.getpid()}] all tasks scheduled')
            break

    print(f'[{os.getpid()}] close and join pool')
    pool.close()
    pool.join()
    print(f'[{os.getpid()}] all done')
```


Example output:


```
[DEBUG/MainProcess] created semlock with handle 140042193375232
[DEBUG/MainProcess] created semlock with handle 140042193371136
[DEBUG/MainProcess] created semlock with handle 140042193367040
[DEBUG/MainProcess] created semlock with handle 140042193362944
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-4] child process calling self.run()
[7150] add task(0)
[7150] add task(1)
[7150] add task(2)
[7150] add task(3)
[7151] task(0) complete
[7152] task(1) complete
[7153] task(2) complete
[7154] task(3) complete
[DEBUG/ForkPoolWorker-1] worker exiting after 1 tasks
[INFO/ForkPoolWorker-1] process shutting down
[DEBUG/ForkPoolWorker-2] worker exiting after 1 tasks
[DEBUG/ForkPoolWorker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/ForkPoolWorker-3] worker exiting after 1 tasks
[INFO/ForkPoolWorker-2] process shutting down
<snip>
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[7150] add task(8)
[7150] add task(9)
[DEBUG/MainProcess] cleaning up worker 0
[7150] all tasks scheduled
[DEBUG/MainProcess] added worker
[7150] close and join pool
[DEBUG/MainProcess] closing pool
[DEBUG/MainProcess] joining pool
[INFO/ForkPoolWorker-9] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] added worker
[7164] task(9) complete
[DEBUG/ForkPoolWorker-10] worker exiting after 1 tasks
[INFO/ForkPoolWorker-10] process shutting down
[DEBUG/ForkPoolWorker-10] running all "atexit" finalizers with priority >= 0
[DEBUG/ForkPoolWorker-10] running the remaining "atexit" finalizers
[INFO/ForkPoolWorker-10] process exiting with exitcode 0
[INFO/ForkPoolWorker-11] child process calling self.run()
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-12] child process calling self.run()
```


The process hangs forever.

Interrupting the process then produces the following output:


```
^CTraceback (most recent call last):
[INFO/ForkPoolWorker-11] process shutting down
  File "./test.py", line 36, in <module>
[INFO/ForkPoolWorker-12] process shutting down
[DEBUG/ForkPoolWorker-11] running all "atexit" finalizers with priority >= 0
[DEBUG/ForkPoolWorker-12] running all "atexit" finalizers with priority >= 0
[DEBUG/ForkPoolWorker-11] running the remaining "atexit" finalizers
Process ForkPoolWorker-11:
[DEBUG/ForkPoolWorker-12] running the remaining "atexit" finalizers
Process ForkPoolWorker-12:
    pool.join()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 546, in join
    self._worker_handler.join()
  File "/usr/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
    with self._rlock:
  File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
[INFO/ForkPoolWorker-12] process exiting with exitcode 1
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 335, in get
    res = self._reader.recv_bytes()
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
[INFO/ForkPoolWorker-11] process exiting with exitcode 1
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] joining worker handler
[DEBUG/MainProcess] result handler found thread._state=TERMINATE
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=2, thread._state=2
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
[DEBUG/MainProcess] running the remaining "atexit" finalizers
```


Since this is a race condition, the example code may need to be run several times to cause it to hang.


```
$ for i in {1..100}; do ./test.py; done
```


Notably, removing maxtasksperchild stops the process from hanging.


Additionally, changing the sleep to a busy wait causes the issue to go away:


```
wait = time.time() + 0.1
while time.time() < wait:
    pass
```

----------
components: Library (Lib)
messages: 356609
nosy: steve.lorimer at gmail.com
priority: normal
severity: normal
status: open
title: race condition in multiprocessing.Pool with maxtasksperchild=1
type: behavior
versions: Python 3.6

_______________________________________
Python tracker <report at bugs.python.org>
<https://bugs.python.org/issue38799>
_______________________________________


More information about the New-bugs-announce mailing list