Problem in Multiprocessing module
Terry Reedy
tjreedy at udel.edu
Fri Oct 11 18:58:44 EDT 2013
On 10/11/2013 10:53 AM, William Ray Wing wrote:
> I'm running into a problem in the multiprocessing module.
>
> My code is running four parallel processes which are doing network access completely independently of each other (gathering data from different remote sources). On rare circumstances, the code blows up when one of my processes has do start doing some error recovery. I strongly suspect it is because there is a time-out that isn't being caught in the multiprocessing lib, and that in turn is exposing the TypeError. Note that the error is "cannot concatenate 'str' and 'NoneType' objects and it is occurring way down in the multiprocessing library.
>
> I'd really appreciate it if someone more knowledgeable about multiprocessing could confirm (or refute) my suspicion and then tell me how to fix things up.
>
> I'm running python 2.7.5 on a Mac OS-X 10.8.5
The version is important, see below.
> The traceback I get is:
After moving the last line to the top. Better to cut and paste as is.
> TypeError: cannot concatenate 'str' and 'NoneType' objects
To understand an exception, you must know what sort of expression could
cause it. In 2.7, this arises from something like
>>> 'a'+None
Traceback (most recent call last):
File "<pyshell#0>", line 1, in <module>
'a'+None
TypeError: cannot concatenate 'str' and 'NoneType' objects
In 3.x, the same expression generates
TypeError: Can't convert 'NoneType' object to str implicitly
The equivalent join expression gives a different message in 2.7 (and
nearly the same in 3.3):
>>> ''.join(('a', None))
Traceback (most recent call last):
File "<pyshell#1>", line 1, in <module>
''.join(('a', None))
TypeError: sequence item 1: expected string, NoneType found
> File "/Users/wrw/Dev/Python/Connection_Monitor/Version3.0/CM_Harness.py", line 20, in <module>
> my_pool = pool.map(monitor, targets) # and hands off to four targets
> File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 250, in map
> return self.map_async(func, iterable, chunksize).get()
> File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 554, in get
> raise self._value
>
> To save you-all some time:
>
> The "get" function at line 554 in pool.py (which is in the multiprocessing lib) is:
class ApplyResult(object):
> def get(self, timeout=None):
> self.wait(timeout)
This must set self._ready, self._success, and self._value
> if not self._ready:
> raise TimeoutError
This did not happen, so self._ready must be True
> if self._success:
> return self._value
This did not happen, so self._success must be False
> else:
> raise self._value
This did, and self._value is the TypeError reported.
Let us look into self.wait and see if we can find where there is a
string1 + string2 expression and then figure out how string2 might be None.
class ApplyResult(object):
def __init__(self, cache, callback):
self._cond = threading.Condition(threading.Lock())
...
threading.Condition is a (useless) function that returns a class
_Condition object.
def wait(self, timeout=None):
self._cond.acquire()
try:
if not self._ready:
self._cond.wait(timeout)
finally:
self._cond.release()
so it seems that we need to look at the _Condition methods acquire,
release, and wait. (The first two are lock methods
self.acquire = lock.acquire
self.release = lock.release
). However, this seems wrong because self._cond has no reference to self
and hence cannot set self attributes. The problem must be in some
callback that is called while waiting. Async is terrible to debug
because the call stack in the traceback ends with wait and does not tell
us what function was called during the wait.
After the .get method is ._set, which starts
def _set(self, i, obj):
self._success, self._value = obj
# and goes on to set
This is the only place where self._value is set, so it must have been
called during the wait.
It is only used in Pool._handle_results where the relevant lines are
@staticmethod
def _handle_results(outqueue, get, cache):
...
task = get()
...
job, i, obj = task
...
cache[job]._set(i, obj)
We need to find out what get is. _handle_results is only used in
Pool.__init__:
self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache)
)
so when _handle_results is called, get = self._quick_get, and what is
that? .__init__ starts with self._setup_queues(self) and that has
from .queues import SimpleQueue # relative import
self._outqueue = SimpleQueue()
self._quick_get = self._outqueue._reader.recv
SimpleQueue._reader is the first member of the pair returned by
multiprocessing.Pipe(duplex=False), which calls
multiprocessing.connection.Pipe(duplex). For duplex=False, that in turn
returns two _multiprocessing.Connections (not Windows) or
PipeConnections (Windows). So it seems that get = _quick_get =
_multiprocessing(Pipe)Connection.recv.
I tried looking at the C code for conn_recv_string() in
Modules/_multiprocessing/pipe_connection and conn_recv_bytes in
.../socket_connection, but did not get very far. I do not see how a
triple could be returned, nor any string concatenation. If I am on the
right track, the concatenation would have to be deeper in one of the
called functions.
But maybe I am on the wrong track and the problem is in your monitor
program, or whatever part gets called as part of the callback.
--
Terry Jan Reedy
More information about the Python-list
mailing list