[Python-checkins] cpython: Issue #12040: Expose a new attribute `sentinel` on instances of

antoine.pitrou python-checkins at python.org
Mon Jun 6 19:36:06 CEST 2011


http://hg.python.org/cpython/rev/568a3ba088e4
changeset:   70690:568a3ba088e4
user:        Antoine Pitrou <solipsis at pitrou.net>
date:        Mon Jun 06 19:35:31 2011 +0200
summary:
  Issue #12040: Expose a new attribute `sentinel` on instances of
:class:`multiprocessing.Process`.  Also, fix Process.join() to not use
polling anymore, when given a timeout.

files:
  Doc/library/multiprocessing.rst  |  14 +++++++
  Lib/multiprocessing/forking.py   |  34 ++++++++++-------
  Lib/multiprocessing/process.py   |  12 ++++++
  Lib/multiprocessing/util.py      |  21 +++++++++++
  Lib/test/test_multiprocessing.py |  37 ++++++++++++++++++++
  Misc/NEWS                        |   4 ++
  6 files changed, 108 insertions(+), 14 deletions(-)


diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -411,6 +411,20 @@
 
       See :ref:`multiprocessing-auth-keys`.
 
+   .. attribute:: sentinel
+
+      A numeric handle of a system object which will become "ready" when
+      the process ends.
+
+      On Windows, this is an OS handle usable with the ``WaitForSingleObject``
+      and ``WaitForMultipleObjects`` family of API calls.  On Unix, this is
+      a file descriptor usable with primitives from the :mod:`select` module.
+
+      You can use this value if you want to wait on several events at once.
+      Otherwise calling :meth:`join()` is simpler.
+
+      .. versionadded:: 3.3
+
    .. method:: terminate()
 
       Terminate the process.  On Unix this is done using the ``SIGTERM`` signal;
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -101,10 +101,12 @@
 
 if sys.platform != 'win32':
     import time
+    import select
 
     exit = os._exit
     duplicate = os.dup
     close = os.close
+    _select = util._eintr_retry(select.select)
 
     #
     # We define a Popen class similar to the one from subprocess, but
@@ -118,8 +120,12 @@
             sys.stderr.flush()
             self.returncode = None
 
+            r, w = os.pipe()
+            self.sentinel = r
+
             self.pid = os.fork()
             if self.pid == 0:
+                os.close(r)
                 if 'random' in sys.modules:
                     import random
                     random.seed()
@@ -128,6 +134,11 @@
                 sys.stderr.flush()
                 os._exit(code)
 
+            # `w` will be closed when the child exits, at which point `r`
+            # will become ready for reading (using e.g. select()).
+            os.close(w)
+            util.Finalize(self, os.close, (r,))
+
         def poll(self, flag=os.WNOHANG):
             if self.returncode is None:
                 try:
@@ -145,20 +156,14 @@
             return self.returncode
 
         def wait(self, timeout=None):
-            if timeout is None:
-                return self.poll(0)
-            deadline = time.time() + timeout
-            delay = 0.0005
-            while 1:
-                res = self.poll()
-                if res is not None:
-                    break
-                remaining = deadline - time.time()
-                if remaining <= 0:
-                    break
-                delay = min(delay * 2, remaining, 0.05)
-                time.sleep(delay)
-            return res
+            if self.returncode is None:
+                if timeout is not None:
+                    r = _select([self.sentinel], [], [], timeout)[0]
+                    if not r:
+                        return None
+                # This shouldn't block if select() returned successfully.
+                return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+            return self.returncode
 
         def terminate(self):
             if self.returncode is None:
@@ -258,6 +263,7 @@
             self.pid = pid
             self.returncode = None
             self._handle = hp
+            self.sentinel = int(hp)
 
             # send information to child
             prep_data = get_preparation_data(process_obj._name)
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -132,6 +132,7 @@
         else:
             from .forking import Popen
         self._popen = Popen(self)
+        self._sentinel = self._popen.sentinel
         _current_process._children.add(self)
 
     def terminate(self):
@@ -218,6 +219,17 @@
 
     pid = ident
 
+    @property
+    def sentinel(self):
+        '''
+        Return a file descriptor (Unix) or handle (Windows) suitable for
+        waiting for process termination.
+        '''
+        try:
+            return self._sentinel
+        except AttributeError:
+            raise ValueError("process not started")
+
     def __repr__(self):
         if self is _current_process:
             status = 'started'
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -32,9 +32,11 @@
 # SUCH DAMAGE.
 #
 
+import functools
 import itertools
 import weakref
 import atexit
+import select
 import threading        # we want threading to install it's
                         # cleanup function before multiprocessing does
 
@@ -315,3 +317,22 @@
         register_after_fork(self, lambda obj : obj.__dict__.clear())
     def __reduce__(self):
         return type(self), ()
+
+
+#
+# Automatic retry after EINTR
+#
+
+def _eintr_retry(func, _errors=(EnvironmentError, select.error)):
+    @functools.wraps(func)
+    def wrapped(*args, **kwargs):
+        while True:
+            try:
+                return func(*args, **kwargs)
+            except _errors as e:
+                # select.error has no `errno` attribute
+                if e.args[0] == errno.EINTR:
+                    continue
+                raise
+    return wrapped
+
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -71,6 +71,23 @@
                             'HAVE_BROKEN_SEM_GETVALUE', False)
 
 WIN32 = (sys.platform == "win32")
+if WIN32:
+    from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
+
+    def wait_for_handle(handle, timeout):
+        if timeout is None or timeout < 0.0:
+            timeout = INFINITE
+        else:
+            timeout = int(1000 * timeout)
+        return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
+else:
+    from select import select
+    _select = util._eintr_retry(select)
+
+    def wait_for_handle(handle, timeout):
+        if timeout is not None and timeout < 0.0:
+            timeout = None
+        return handle in _select([handle], [], [], timeout)[0]
 
 #
 # Some tests require ctypes
@@ -307,6 +324,26 @@
             ]
         self.assertEqual(result, expected)
 
+    @classmethod
+    def _test_sentinel(cls, event):
+        event.wait(10.0)
+
+    def test_sentinel(self):
+        if self.TYPE == "threads":
+            return
+        event = self.Event()
+        p = self.Process(target=self._test_sentinel, args=(event,))
+        with self.assertRaises(ValueError):
+            p.sentinel
+        p.start()
+        self.addCleanup(p.join)
+        sentinel = p.sentinel
+        self.assertIsInstance(sentinel, int)
+        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
+        event.set()
+        p.join()
+        self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
+
 #
 #
 #
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -187,6 +187,10 @@
 Library
 -------
 
+- Issue #12040: Expose a new attribute ``sentinel`` on instances of
+  :class:`multiprocessing.Process`.  Also, fix Process.join() to not use
+  polling anymore, when given a timeout.
+
 - Issue #11893: Remove obsolete internal wrapper class ``SSLFakeFile`` in the
   smtplib module.  Patch by Catalin Iacob.
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list