[issue14154] reimplement the bigmem test memory watchdog as a subprocess

Charles-François Natali report at bugs.python.org
Tue Mar 20 19:26:42 CET 2012


Charles-François Natali <neologix at free.fr> added the comment:

Here's a patch flushing stdout explicitely (should not be necessay
unless the watchdog crashes, but...).
Also, redirect stderr to /dev/null.

----------
Added file: http://bugs.python.org/file24974/mem_watchdog_2.diff

_______________________________________
Python tracker <report at bugs.python.org>
<http://bugs.python.org/issue14154>
_______________________________________
-------------- next part --------------
diff --git a/Lib/test/memory_watchdog.py b/Lib/test/memory_watchdog.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/memory_watchdog.py
@@ -0,0 +1,28 @@
+"""Memory watchdog: periodically read the memory usage of the main test process
+and print it out, until terminated."""
+# stdin should refer to the process' /proc/<PID>/statm: we don't pass the
+# process' PID to avoid a race condition in case of - unlikely - PID recycling.
+# If the process crashes, reading from the /proc entry will fail with ESRCH.
+
+
+import os
+import sys
+import time
+
+
+try:
+    page_size = os.sysconf('SC_PAGESIZE')
+except (ValueError, AttributeError):
+    try:
+        page_size = os.sysconf('SC_PAGE_SIZE')
+    except (ValueError, AttributeError):
+        page_size = 4096
+
+while True:
+    sys.stdin.seek(0)
+    statm = sys.stdin.read()
+    data = int(statm.split()[5])
+    sys.stdout.write(" ... process data size: {data:.1f}G\n"
+                     .format(data=data * page_size / (1024 ** 3)))
+    sys.stdout.flush()
+    time.sleep(1)
diff --git a/Lib/test/support.py b/Lib/test/support.py
--- a/Lib/test/support.py
+++ b/Lib/test/support.py
@@ -36,20 +36,10 @@
     multiprocessing = None
 
 try:
-    import faulthandler
-except ImportError:
-    faulthandler = None
-
-try:
     import zlib
 except ImportError:
     zlib = None
 
-try:
-    import fcntl
-except ImportError:
-    fcntl = None
-
 __all__ = [
     "Error", "TestFailed", "ResourceDenied", "import_module",
     "verbose", "use_resources", "max_memuse", "record_original_stdout",
@@ -1151,62 +1141,26 @@
     def __init__(self):
         self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
         self.started = False
-        self.thread = None
-        try:
-            self.page_size = os.sysconf('SC_PAGESIZE')
-        except (ValueError, AttributeError):
-            try:
-                self.page_size = os.sysconf('SC_PAGE_SIZE')
-            except (ValueError, AttributeError):
-                self.page_size = 4096
-
-    def consumer(self, fd):
-        HEADER = "l"
-        header_size = struct.calcsize(HEADER)
-        try:
-            while True:
-                header = os.read(fd, header_size)
-                if len(header) < header_size:
-                    # Pipe closed on other end
-                    break
-                data_len, = struct.unpack(HEADER, header)
-                data = os.read(fd, data_len)
-                statm = data.decode('ascii')
-                data = int(statm.split()[5])
-                print(" ... process data size: {data:.1f}G"
-                       .format(data=data * self.page_size / (1024 ** 3)))
-        finally:
-            os.close(fd)
 
     def start(self):
-        if not faulthandler or not hasattr(faulthandler, '_file_watchdog'):
-            return
         try:
-            rfd = os.open(self.procfile, os.O_RDONLY)
+            f = open(self.procfile, 'r')
         except OSError as e:
             warnings.warn('/proc not available for stats: {}'.format(e),
                           RuntimeWarning)
             sys.stderr.flush()
             return
-        pipe_fd, wfd = os.pipe()
-        # set the write end of the pipe non-blocking to avoid blocking the
-        # watchdog thread when the consumer doesn't drain the pipe fast enough
-        if fcntl:
-            flags = fcntl.fcntl(wfd, fcntl.F_GETFL)
-            fcntl.fcntl(wfd, fcntl.F_SETFL, flags|os.O_NONBLOCK)
-        # _file_watchdog() doesn't take the GIL in its child thread, and
-        # therefore collects statistics timely
-        faulthandler._file_watchdog(rfd, wfd, 1.0)
+
+        watchdog_script = findfile("memory_watchdog.py")
+        self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
+                                             stdin=f, stderr=subprocess.DEVNULL)
+        f.close()
         self.started = True
-        self.thread = threading.Thread(target=self.consumer, args=(pipe_fd,))
-        self.thread.daemon = True
-        self.thread.start()
 
     def stop(self):
-        if not self.started:
-            return
-        faulthandler._cancel_file_watchdog()
-        self.thread.join()
+        if self.started:
+            self.mem_watchdog.terminate()
+            self.mem_watchdog.wait()
 
 
 def bigmemtest(size, memuse, dry_run=True):
@@ -1234,7 +1188,7 @@
                     "not enough memory: %.1fG minimum needed"
                     % (size * memuse / (1024 ** 3)))
 
-            if real_max_memuse and verbose and faulthandler and threading:
+            if real_max_memuse and verbose:
                 print()
                 print(" ... expected peak memory use: {peak:.1f}G"
                       .format(peak=size * memuse / (1024 ** 3)))
diff --git a/Modules/faulthandler.c b/Modules/faulthandler.c
--- a/Modules/faulthandler.c
+++ b/Modules/faulthandler.c
@@ -13,7 +13,6 @@
 
 #ifdef WITH_THREAD
 #  define FAULTHANDLER_LATER
-#  define FAULTHANDLER_WATCHDOG
 #endif
 
 #ifndef MS_WINDOWS
@@ -66,20 +65,6 @@
 } thread;
 #endif
 
-#ifdef FAULTHANDLER_WATCHDOG
-static struct {
-    int rfd;
-    int wfd;
-    PY_TIMEOUT_T period_us;   /* period in microseconds */
-    /* The main thread always holds this lock. It is only released when
-       faulthandler_watchdog() is interrupted before this thread exits, or at
-       Python exit. */
-    PyThread_type_lock cancel_event;
-    /* released by child thread when joined */
-    PyThread_type_lock running;
-} watchdog;
-#endif
-
 #ifdef FAULTHANDLER_USER
 typedef struct {
     int enabled;
@@ -604,139 +589,6 @@
 }
 #endif  /* FAULTHANDLER_LATER */
 
-#ifdef FAULTHANDLER_WATCHDOG
-
-static void
-file_watchdog(void *unused)
-{
-    PyLockStatus st;
-    PY_TIMEOUT_T timeout;
-
-#define MAXDATA 1024
-    char buf1[MAXDATA], buf2[MAXDATA];
-    char *data = buf1, *old_data = buf2;
-    Py_ssize_t data_len, old_data_len = -1;
-
-#if defined(HAVE_PTHREAD_SIGMASK) && !defined(HAVE_BROKEN_PTHREAD_SIGMASK)
-    sigset_t set;
-
-    /* we don't want to receive any signal */
-    sigfillset(&set);
-    pthread_sigmask(SIG_SETMASK, &set, NULL);
-#endif
-
-    /* On first pass, feed file contents immediately */
-    timeout = 0;
-    do {
-        st = PyThread_acquire_lock_timed(watchdog.cancel_event,
-                                         timeout, 0);
-        timeout = watchdog.period_us;
-        if (st == PY_LOCK_ACQUIRED) {
-            PyThread_release_lock(watchdog.cancel_event);
-            break;
-        }
-        /* Timeout => read and write data */
-        assert(st == PY_LOCK_FAILURE);
-
-        if (lseek(watchdog.rfd, 0, SEEK_SET) < 0) {
-            break;
-        }
-        data_len = read(watchdog.rfd, data, MAXDATA);
-        if (data_len < 0) {
-            break;
-        }
-        if (data_len != old_data_len || memcmp(data, old_data, data_len)) {
-            char *tdata;
-            Py_ssize_t tlen;
-            /* Contents changed, feed them to wfd */
-            long x = (long) data_len;
-            /* We can't do anything if the consumer is too slow, just bail out */
-            if (write(watchdog.wfd, (void *) &x, sizeof(x)) < sizeof(x))
-                break;
-            if (write(watchdog.wfd, data, data_len) < data_len)
-                break;
-            tdata = data;
-            data = old_data;
-            old_data = tdata;
-            tlen = data_len;
-            data_len = old_data_len;
-            old_data_len = tlen;
-        }
-    } while (1);
-
-    close(watchdog.rfd);
-    close(watchdog.wfd);
-
-    /* The only way out */
-    PyThread_release_lock(watchdog.running);
-#undef MAXDATA
-}
-
-static void
-cancel_file_watchdog(void)
-{
-    /* Notify cancellation */
-    PyThread_release_lock(watchdog.cancel_event);
-
-    /* Wait for thread to join */
-    PyThread_acquire_lock(watchdog.running, 1);
-    PyThread_release_lock(watchdog.running);
-
-    /* The main thread should always hold the cancel_event lock */
-    PyThread_acquire_lock(watchdog.cancel_event, 1);
-}
-
-static PyObject*
-faulthandler_file_watchdog(PyObject *self,
-                           PyObject *args, PyObject *kwargs)
-{
-    static char *kwlist[] = {"rfd", "wfd", "period", NULL};
-    double period;
-    PY_TIMEOUT_T period_us;
-    int rfd, wfd;
-
-    if (!PyArg_ParseTupleAndKeywords(args, kwargs,
-        "iid:_file_watchdog", kwlist,
-        &rfd, &wfd, &period))
-        return NULL;
-    if ((period * 1e6) >= (double) PY_TIMEOUT_MAX) {
-        PyErr_SetString(PyExc_OverflowError,  "period value is too large");
-        return NULL;
-    }
-    period_us = (PY_TIMEOUT_T)(period * 1e6);
-    if (period_us <= 0) {
-        PyErr_SetString(PyExc_ValueError, "period must be greater than 0");
-        return NULL;
-    }
-
-    /* Cancel previous thread, if running */
-    cancel_file_watchdog();
-
-    watchdog.rfd = rfd;
-    watchdog.wfd = wfd;
-    watchdog.period_us = period_us;
-
-    /* Arm these locks to serve as events when released */
-    PyThread_acquire_lock(watchdog.running, 1);
-
-    if (PyThread_start_new_thread(file_watchdog, NULL) == -1) {
-        PyThread_release_lock(watchdog.running);
-        PyErr_SetString(PyExc_RuntimeError,
-                        "unable to start file watchdog thread");
-        return NULL;
-    }
-
-    Py_RETURN_NONE;
-}
-
-static PyObject*
-faulthandler_cancel_file_watchdog(PyObject *self)
-{
-    cancel_file_watchdog();
-    Py_RETURN_NONE;
-}
-#endif  /* FAULTHANDLER_WATCHDOG */
-
 #ifdef FAULTHANDLER_USER
 static int
 faulthandler_register(int signum, int chain, _Py_sighandler_t *p_previous)
@@ -1126,18 +978,6 @@
                "to dump_tracebacks_later().")},
 #endif
 
-#ifdef FAULTHANDLER_WATCHDOG
-    {"_file_watchdog",
-     (PyCFunction)faulthandler_file_watchdog, METH_VARARGS|METH_KEYWORDS,
-     PyDoc_STR("_file_watchdog(rfd, wfd, period):\n"
-               "feed the contents of 'rfd' to 'wfd', if changed,\n"
-               "every 'period seconds'.")},
-    {"_cancel_file_watchdog",
-     (PyCFunction)faulthandler_cancel_file_watchdog, METH_NOARGS,
-     PyDoc_STR("_cancel_file_watchdog():\ncancel the previous call "
-               "to _file_watchdog().")},
-#endif
-
 #ifdef FAULTHANDLER_USER
     {"register",
      (PyCFunction)faulthandler_register_py, METH_VARARGS|METH_KEYWORDS,
@@ -1263,16 +1103,6 @@
     }
     PyThread_acquire_lock(thread.cancel_event, 1);
 #endif
-#ifdef FAULTHANDLER_WATCHDOG
-    watchdog.cancel_event = PyThread_allocate_lock();
-    watchdog.running = PyThread_allocate_lock();
-    if (!watchdog.cancel_event || !watchdog.running) {
-        PyErr_SetString(PyExc_RuntimeError,
-                        "could not allocate locks for faulthandler");
-        return -1;
-    }
-    PyThread_acquire_lock(watchdog.cancel_event, 1);
-#endif
 
     return faulthandler_env_options();
 }
@@ -1297,20 +1127,6 @@
     }
 #endif
 
-#ifdef FAULTHANDLER_WATCHDOG
-    /* file watchdog */
-    if (watchdog.cancel_event) {
-        cancel_file_watchdog();
-        PyThread_release_lock(watchdog.cancel_event);
-        PyThread_free_lock(watchdog.cancel_event);
-        watchdog.cancel_event = NULL;
-    }
-    if (watchdog.running) {
-        PyThread_free_lock(watchdog.running);
-        watchdog.running = NULL;
-    }
-#endif
-
 #ifdef FAULTHANDLER_USER
     /* user */
     if (user_signals != NULL) {


More information about the Python-bugs-list mailing list