[New-bugs-announce] [issue44155] Race condition when using multiprocessing BaseManager and Pool in Python3
David Chen
report at bugs.python.org
Mon May 17 04:50:27 EDT 2021
New submission from David Chen <chenzhuowansui at 163.com>:
could someone help me out? i spent a lot of time to debug a race condition i have encountered when using BaseManager, Pool within multiprocessing library. here is the simplified code:
```
import sys, time
from multiprocessing.managers import BaseManager, SyncManager, BaseProxy
from multiprocessing import Process, cpu_count, Pool, Lock, get_context
from multiprocessing.queues import Queue, JoinableQueue
import queue
class QueueManager(BaseManager):
pass
class Singleton:
'''
Decorator class for singleton pattern.
'''
def __init__(self, cls):
self._cls = cls
self._lock = Lock()
self._instance = {}
def __call__(self, *args, **kwargs):
if self._cls not in self._instance:
with self._lock:
self._instance[self._cls] = self._cls(*args, **kwargs)
return self._instance[self._cls]
def getInstance(self):
return self._instance[self._cls]
class LoggingServer(object):
def __init__(self, address, pwd):
self.logServerAddr = address
self.logServerPwd = pwd
self.msgQueue = queue.Queue()
try:
QueueManager.register('getQueue', callable=lambda: self.msgQueue)
self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd)
self.logServer = self.queueManager.get_server()
self.logServer.serve_forever()
except:
raise RuntimeError("Couldn't start the logging server!")
class LoggingProcess(object):
def __init__(self, address, pwd):
self.logServerAddr = address
self.logServerPwd = pwd
try:
QueueManager.register('getQueue')
self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd)
self.queueManager.connect()
except:
raise RuntimeError("Couldn't connect logging process to the logging server!")
self.msgQueue = self.queueManager.getQueue()
self.process = Process(target=self.loggingProcess, name = "Logging Process", args=(), daemon = True)
self.process.start()
def terminate(self):
self.msgQueue.join()
self.process.terminate()
def loggingProcess(self):
while True:
logObj = self.msgQueue.get()
print(logObj)
@Singleton
class Logger(object):
def __init__(self, address, pwd):
self.logServerAddr = address
self.logServerPwd = pwd
self.queueManager = None
self.msgQueue = None
def connectToLogServer(self):
try:
QueueManager.register('getQueue')
self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd)
self.queueManager.connect()
self.msgQueue = self.queueManager.getQueue()
self.ready = True
except:
raise RuntimeError("Couldn't connect logger to Log Server!")
def ReadyCheck(func):
def makeDecorator(self, *args, **kwargs):
if not self.msgQueue:
self.connectToLogServer()
func(self, *args, **kwargs)
return makeDecorator
# Overrided function to log info
@ReadyCheck
def info(self, info, logfile = sys.stdout):
self.msgQueue.put(info)
address = ('', 50000)
password = b'PASSWORD'
log = Logger(address, password)
def callback(*args):
#print("Finished!!!")
pass
def job(index):
time.sleep(0.1)
log.info(str(log.msgQueue) + ":{}".format(index))
log.info("here {}".format(index))
if __name__ == "__main__":
# import multiprocessing
# logger = multiprocessing.log_to_stderr()
# logger.setLevel(multiprocessing.SUBDEBUG)
serverProcess = Process(target = LoggingServer, name = "LoggingServerDaemon", args = ((address, password)), daemon = True)
serverProcess.start()
time.sleep(1)
loggingProcess = LoggingProcess(address, password)
log.info("Starting...")
#pool = Pool(cpu_count())
pool = Pool() #Using a small number of worker(like 10), no problem, but if we increase to a bigger number, say 48 in my case, this program hangs every time...
results = [pool.apply_async(job, (i,), callback = callback) for i in range(1)]
pool.close()
pool.join()
log.info("Done")
#loggingProcess.terminate()
#serverProcess.terminate()
```
LoggerServer class is working as a logging Server(like a proxy), which manages a shared queue. LoggingProcess class is a log consumer class, which fetch the logs from the shared queue(managed by LoggingServer). Logger class is a producer class, which put the logs into the shared queue. As i want to share the global logger in multiple modules in order to unify the logs format/output places/...(something like the logging standard library), so the Logger class is not fully initialized and will be fully initialized later when using it(please see connectToLogServer). and i highly suspect this is root cause of program hang, but i can't go further...
the hang sub-process's(ForkPoolWorker) traceback is like the following(using py-spy):
```
Process 3958088: python3 Logger.py
Python v3.9.0 (/usr/bin/python3.9)
Thread 3958088 (idle): "MainThread"
_recv (/usr/lib/python3.9/multiprocessing/connection.py:384)
_recv_bytes (/usr/lib/python3.9/multiprocessing/connection.py:419)
recv_bytes (/usr/lib/python3.9/multiprocessing/connection.py:221)
answer_challenge (/usr/lib/python3.9/multiprocessing/connection.py:757)
Client (/usr/lib/python3.9/multiprocessing/connection.py:513)
_decref (/usr/lib/python3.9/multiprocessing/managers.py:861)
__call__ (/usr/lib/python3.9/multiprocessing/util.py:224)
_run_finalizers (/usr/lib/python3.9/multiprocessing/util.py:300)
_exit_function (/usr/lib/python3.9/multiprocessing/util.py:334)
_bootstrap (/usr/lib/python3.9/multiprocessing/process.py:318)
_launch (/usr/lib/python3.9/multiprocessing/popen_fork.py:71)
__init__ (/usr/lib/python3.9/multiprocessing/popen_fork.py:19)
_Popen (/usr/lib/python3.9/multiprocessing/context.py:277)
start (/usr/lib/python3.9/multiprocessing/process.py:121)
_repopulate_pool_static (/usr/lib/python3.9/multiprocessing/pool.py:326)
_repopulate_pool (/usr/lib/python3.9/multiprocessing/pool.py:303)
__init__ (/usr/lib/python3.9/multiprocessing/pool.py:212)
Pool (/usr/lib/python3.9/multiprocessing/context.py:119)
<module> (/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py:129)
```
it seems the refcount of the shared queue failed to be decref... I googled a lot stuffs, but none seems to be the same with this... so i bring this issue here for help. Any comments and suggestions are highly appreciated!
Traceback after CTRL+C:
```
raceback (most recent call last):
File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 43, in __init__
self.logServer.serve_forever()
File "/usr/lib/python3.9/multiprocessing/managers.py", line 183, in serve_forever
sys.exit(0)
SystemExit: 0
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 68, in loggingProcess
logObj = self.msgQueue.get()
File "<string>", line 2, in get
File "/usr/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod
kind, result = conn.recv()
File "/usr/lib/python3.9/multiprocessing/connection.py", line 255, in recv
buf = self._recv_bytes()
File "/usr/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 45, in __init__
raise RuntimeError("Couldn't start the logging server!")
RuntimeError: Couldn't start the logging server!
```
----------
components: Library (Lib)
messages: 393796
nosy: chenzhuowansui
priority: normal
severity: normal
status: open
title: Race condition when using multiprocessing BaseManager and Pool in Python3
type: behavior
versions: Python 3.9
_______________________________________
Python tracker <report at bugs.python.org>
<https://bugs.python.org/issue44155>
_______________________________________
More information about the New-bugs-announce
mailing list