[Python-checkins] cpython: Stop making fork server have copy of semaphore_tracker_fd.

richard.oudkerk python-checkins at python.org
Thu Aug 22 12:47:33 CEST 2013


http://hg.python.org/cpython/rev/b0b224e0d2b5
changeset:   85313:b0b224e0d2b5
user:        Richard Oudkerk <shibturn at gmail.com>
date:        Thu Aug 22 11:38:57 2013 +0100
summary:
  Stop making fork server have copy of semaphore_tracker_fd.

files:
  Lib/multiprocessing/forkserver.py        |  13 +++++----
  Lib/multiprocessing/popen_spawn_posix.py |   6 +++-
  Lib/multiprocessing/popen_spawn_win32.py |   5 ++-
  Lib/multiprocessing/semaphore_tracker.py |  14 +++++-----
  Lib/multiprocessing/spawn.py             |  15 ++++++-----
  5 files changed, 29 insertions(+), 24 deletions(-)


diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -10,6 +10,7 @@
 from . import connection
 from . import process
 from . import reduction
+from . import semaphore_tracker
 from . import spawn
 from . import util
 
@@ -55,13 +56,14 @@
     The calling process should write to data_w the pickled preparation and
     process data.
     '''
-    if len(fds) + 3 >= MAXFDS_TO_SEND:
+    if len(fds) + 4 >= MAXFDS_TO_SEND:
         raise ValueError('too many fds')
     with socket.socket(socket.AF_UNIX) as client:
         client.connect(_forkserver_address)
         parent_r, child_w = util.pipe()
         child_r, parent_w = util.pipe()
-        allfds = [child_r, child_w, _forkserver_alive_fd]
+        allfds = [child_r, child_w, _forkserver_alive_fd,
+                  semaphore_tracker._semaphore_tracker_fd]
         allfds += fds
         try:
             reduction.sendfds(client, allfds)
@@ -88,8 +90,6 @@
             return
 
         assert all(type(mod) is str for mod in _preload_modules)
-        config = process.current_process()._config
-        semaphore_tracker_fd = config['semaphore_tracker_fd']
         cmd = ('from multiprocessing.forkserver import main; ' +
                'main(%d, %d, %r, **%r)')
 
@@ -110,7 +110,7 @@
             # when they all terminate the read end becomes ready.
             alive_r, alive_w = util.pipe()
             try:
-                fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
+                fds_to_pass = [listener.fileno(), alive_r]
                 cmd %= (listener.fileno(), alive_r, _preload_modules, data)
                 exe = spawn.get_executable()
                 args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
@@ -197,7 +197,8 @@
     fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
     s.close()
     assert len(fds) <= MAXFDS_TO_SEND
-    child_r, child_w, _forkserver_alive_fd, *_inherited_fds = fds
+    child_r, child_w, _forkserver_alive_fd, stfd, *_inherited_fds = fds
+    semaphore_tracker._semaphore_tracker_fd = stfd
 
     # send pid to client processes
     write_unsigned(child_w, os.getpid())
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
--- a/Lib/multiprocessing/popen_spawn_posix.py
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -40,7 +40,8 @@
         return fd
 
     def _launch(self, process_obj):
-        tracker_fd = current_process()._config['semaphore_tracker_fd']
+        from . import semaphore_tracker
+        tracker_fd = semaphore_tracker._semaphore_tracker_fd
         self._fds.append(tracker_fd)
         prep_data = spawn.get_preparation_data(process_obj._name)
         fp = io.BytesIO()
@@ -55,7 +56,8 @@
         try:
             parent_r, child_w = util.pipe()
             child_r, parent_w = util.pipe()
-            cmd = spawn.get_command_line() + [str(child_r)]
+            cmd = spawn.get_command_line(tracker_fd=tracker_fd,
+                                         pipe_handle=child_r)
             self._fds.extend([child_r, child_w])
             self.pid = util.spawnv_passfds(spawn.get_executable(),
                                            cmd, self._fds)
diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py
--- a/Lib/multiprocessing/popen_spawn_win32.py
+++ b/Lib/multiprocessing/popen_spawn_win32.py
@@ -32,13 +32,14 @@
 
     def __init__(self, process_obj):
         prep_data = spawn.get_preparation_data(process_obj._name)
-        cmd = ' '.join('"%s"' % x for x in spawn.get_command_line())
 
         # read end of pipe will be "stolen" by the child process
         # -- see spawn_main() in spawn.py.
         rhandle, whandle = _winapi.CreatePipe(None, 0)
         wfd = msvcrt.open_osfhandle(whandle, 0)
-        cmd += ' {} {}'.format(os.getpid(), rhandle)
+        cmd = spawn.get_command_line(parent_pid=os.getpid(),
+                                     pipe_handle=rhandle)
+        cmd = ' '.join('"%s"' % x for x in cmd)
 
         with open(wfd, 'wb', closefd=True) as to_child:
             # start process
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
--- a/Lib/multiprocessing/semaphore_tracker.py
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -26,6 +26,7 @@
 __all__ = ['ensure_running', 'register', 'unregister']
 
 
+_semaphore_tracker_fd = None
 _lock = threading.Lock()
 
 
@@ -34,9 +35,9 @@
 
     This can be run from any process.  Usually a child process will use
     the semaphore created by its parent.'''
+    global _semaphore_tracker_fd
     with _lock:
-        config = current_process()._config
-        if config.get('semaphore_tracker_fd') is not None:
+        if _semaphore_tracker_fd is not None:
             return
         fds_to_pass = []
         try:
@@ -44,7 +45,7 @@
         except Exception:
             pass
         cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
-        r, semaphore_tracker_fd = util.pipe()
+        r, w = util.pipe()
         try:
             fds_to_pass.append(r)
             # process will out live us, so no need to wait on pid
@@ -53,10 +54,10 @@
             args += ['-c', cmd % r]
             util.spawnv_passfds(exe, args, fds_to_pass)
         except:
-            os.close(semaphore_tracker_fd)
+            os.close(w)
             raise
         else:
-            config['semaphore_tracker_fd'] = semaphore_tracker_fd
+            _semaphore_tracker_fd = w
         finally:
             os.close(r)
 
@@ -77,8 +78,7 @@
         # posix guarantees that writes to a pipe of less than PIPE_BUF
         # bytes are atomic, and that PIPE_BUF >= 512
         raise ValueError('name too long')
-    fd = current_process()._config['semaphore_tracker_fd']
-    nbytes = os.write(fd, msg)
+    nbytes = os.write(_semaphore_tracker_fd, msg)
     assert nbytes == len(msg)
 
 
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
--- a/Lib/multiprocessing/spawn.py
+++ b/Lib/multiprocessing/spawn.py
@@ -66,32 +66,33 @@
         sys.exit()
 
 
-def get_command_line():
+def get_command_line(**kwds):
     '''
     Returns prefix of command line used for spawning a child process
     '''
     if getattr(sys, 'frozen', False):
         return [sys.executable, '--multiprocessing-fork']
     else:
-        prog = 'from multiprocessing.spawn import spawn_main; spawn_main()'
+        prog = 'from multiprocessing.spawn import spawn_main; spawn_main(%s)'
+        prog %= ', '.join('%s=%r' % item for item in kwds.items())
         opts = util._args_from_interpreter_flags()
         return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
 
 
-def spawn_main():
+def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
     '''
     Run code specifed by data received over pipe
     '''
     assert is_forking(sys.argv)
-    handle = int(sys.argv[-1])
     if sys.platform == 'win32':
         import msvcrt
         from .reduction import steal_handle
-        pid = int(sys.argv[-2])
-        new_handle = steal_handle(pid, handle)
+        new_handle = steal_handle(parent_pid, pipe_handle)
         fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
     else:
-        fd = handle
+        from . import semaphore_tracker
+        semaphore_tracker._semaphore_tracker_fd = tracker_fd
+        fd = pipe_handle
     exitcode = _main(fd)
     sys.exit(exitcode)
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list