[Python-checkins] cpython: Rewrite eintr_tester.py to avoid os.fork()

victor.stinner python-checkins at python.org
Thu Sep 3 01:50:03 CEST 2015


https://hg.python.org/cpython/rev/ed0e6a9c11af
changeset:   97600:ed0e6a9c11af
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Thu Sep 03 01:38:44 2015 +0200
summary:
  Rewrite eintr_tester.py to avoid os.fork()

eintr_tester.py calls signal.setitimer() to send signals to the current process
every 100 ms. The test sometimes hangs on FreeBSD. Maybe there is a race
condition in the child process after fork(). It's unsafe to run arbitrary code
after fork().

This change replace os.fork() with a regular call to subprocess.Popen(). This
change avoids the risk of having a child process which continue to execute
eintr_tester.py instead of exiting. It also ensures that the child process
doesn't inherit unexpected file descriptors by mistake.

Since I'm unable to reproduce the issue on FreeBSD, I will have to watch
FreeBSD buildbots to check if the issue is fixed or not.

Remove previous attempt to debug: remove call to
faulthandler.dump_traceback_later().

files:
  Lib/test/eintrdata/eintr_tester.py |  257 ++++++++++------
  1 files changed, 157 insertions(+), 100 deletions(-)


diff --git a/Lib/test/eintrdata/eintr_tester.py b/Lib/test/eintrdata/eintr_tester.py
--- a/Lib/test/eintrdata/eintr_tester.py
+++ b/Lib/test/eintrdata/eintr_tester.py
@@ -8,12 +8,13 @@
 sub-second periodicity (contrarily to signal()).
 """
 
-import faulthandler
 import io
 import os
 import select
 import signal
 import socket
+import subprocess
+import sys
 import time
 import unittest
 
@@ -29,7 +30,7 @@
     # signal delivery periodicity
     signal_period = 0.1
     # default sleep time for tests - should obviously have:
-    # sleep_time > signal_period
+    # sleep_time > signal_period
     sleep_time = 0.2
 
     @classmethod
@@ -37,17 +38,10 @@
         cls.orig_handler = signal.signal(signal.SIGALRM, lambda *args: None)
         signal.setitimer(signal.ITIMER_REAL, cls.signal_delay,
                          cls.signal_period)
-        if hasattr(faulthandler, 'dump_traceback_later'):
-            # Most tests take less than 30 seconds, so 15 minutes should be
-            # enough. dump_traceback_later() is implemented with a thread, but
-            # pthread_sigmask() is used to mask all signaled on this thread.
-            faulthandler.dump_traceback_later(15 * 60, exit=True)
 
     @classmethod
     def stop_alarm(cls):
         signal.setitimer(signal.ITIMER_REAL, 0, 0)
-        if hasattr(faulthandler, 'cancel_dump_traceback_later'):
-            faulthandler.cancel_dump_traceback_later()
 
     @classmethod
     def tearDownClass(cls):
@@ -59,18 +53,22 @@
         # default sleep time
         time.sleep(cls.sleep_time)
 
+    def subprocess(self, *args, **kw):
+        cmd_args = (sys.executable, '-c') + args
+        return subprocess.Popen(cmd_args, **kw)
+
 
 @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
 class OSEINTRTest(EINTRBaseTest):
     """ EINTR tests for the os module. """
 
+    def new_sleep_process(self):
+        code = 'import time; time.sleep(%r)' % self.sleep_time
+        return self.subprocess(code)
+
     def _test_wait_multiple(self, wait_func):
         num = 3
-        for _ in range(num):
-            pid = os.fork()
-            if pid == 0:
-                self._sleep()
-                os._exit(0)
+        processes = [self.new_sleep_process() for _ in range(num)]
         for _ in range(num):
             wait_func()
 
@@ -82,12 +80,8 @@
         self._test_wait_multiple(lambda: os.wait3(0))
 
     def _test_wait_single(self, wait_func):
-        pid = os.fork()
-        if pid == 0:
-            self._sleep()
-            os._exit(0)
-        else:
-            wait_func(pid)
+        proc = self.new_sleep_process()
+        wait_func(proc.pid)
 
     def test_waitpid(self):
         self._test_wait_single(lambda pid: os.waitpid(pid, 0))
@@ -105,19 +99,24 @@
         # atomic
         datas = [b"hello", b"world", b"spam"]
 
-        pid = os.fork()
-        if pid == 0:
-            os.close(rd)
-            for data in datas:
-                # let the parent block on read()
-                self._sleep()
-                os.write(wr, data)
-            os._exit(0)
-        else:
-            self.addCleanup(os.waitpid, pid, 0)
+        code = '\n'.join((
+            'import os, sys, time',
+            '',
+            'wr = int(sys.argv[1])',
+            'datas = %r' % datas,
+            'sleep_time = %r' % self.sleep_time,
+            '',
+            'for data in datas:',
+            '    # let the parent block on read()',
+            '    time.sleep(sleep_time)',
+            '    os.write(wr, data)',
+        ))
+
+        with self.subprocess(code, str(wr), pass_fds=[wr]) as proc:
             os.close(wr)
             for data in datas:
                 self.assertEqual(data, os.read(rd, len(data)))
+            self.assertEqual(proc.wait(), 0)
 
     def test_write(self):
         rd, wr = os.pipe()
@@ -127,23 +126,34 @@
         # we must write enough data for the write() to block
         data = b"xyz" * support.PIPE_MAX_SIZE
 
-        pid = os.fork()
-        if pid == 0:
-            os.close(wr)
-            read_data = io.BytesIO()
-            # let the parent block on write()
-            self._sleep()
-            while len(read_data.getvalue()) < len(data):
-                chunk = os.read(rd, 2 * len(data))
-                read_data.write(chunk)
-            self.assertEqual(read_data.getvalue(), data)
-            os._exit(0)
-        else:
+        code = '\n'.join((
+            'import io, os, sys, time',
+            '',
+            'rd = int(sys.argv[1])',
+            'sleep_time = %r' % self.sleep_time,
+            'data = b"xyz" * %s' % support.PIPE_MAX_SIZE,
+            'data_len = len(data)',
+            '',
+            '# let the parent block on write()',
+            'time.sleep(sleep_time)',
+            '',
+            'read_data = io.BytesIO()',
+            'while len(read_data.getvalue()) < data_len:',
+            '    chunk = os.read(rd, 2 * data_len)',
+            '    read_data.write(chunk)',
+            '',
+            'value = read_data.getvalue()',
+            'if value != data:',
+            '    raise Exception("read error: %s vs %s bytes"',
+            '                    % (len(value), data_len))',
+        ))
+
+        with self.subprocess(code, str(rd), pass_fds=[rd]) as proc:
             os.close(rd)
             written = 0
             while written < len(data):
                 written += os.write(wr, memoryview(data)[written:])
-            self.assertEqual(0, os.waitpid(pid, 0)[1])
+            self.assertEqual(proc.wait(), 0)
 
 
 @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
@@ -159,19 +169,32 @@
         # single-byte payload guard us against partial recv
         datas = [b"x", b"y", b"z"]
 
-        pid = os.fork()
-        if pid == 0:
-            rd.close()
-            for data in datas:
-                # let the parent block on recv()
-                self._sleep()
-                wr.sendall(data)
-            os._exit(0)
-        else:
-            self.addCleanup(os.waitpid, pid, 0)
+        code = '\n'.join((
+            'import os, socket, sys, time',
+            '',
+            'fd = int(sys.argv[1])',
+            'family = %s' % int(wr.family),
+            'sock_type = %s' % int(wr.type),
+            'datas = %r' % datas,
+            'sleep_time = %r' % self.sleep_time,
+            '',
+            'wr = socket.fromfd(fd, family, sock_type)',
+            'os.close(fd)',
+            '',
+            'with wr:',
+            '    for data in datas:',
+            '        # let the parent block on recv()',
+            '        time.sleep(sleep_time)',
+            '        wr.sendall(data)',
+        ))
+
+        fd = wr.fileno()
+        proc = self.subprocess(code, str(fd), pass_fds=[fd])
+        with proc:
             wr.close()
             for data in datas:
                 self.assertEqual(data, recv_func(rd, len(data)))
+            self.assertEqual(proc.wait(), 0)
 
     def test_recv(self):
         self._test_recv(socket.socket.recv)
@@ -188,25 +211,43 @@
         # we must send enough data for the send() to block
         data = b"xyz" * (support.SOCK_MAX_SIZE // 3)
 
-        pid = os.fork()
-        if pid == 0:
-            wr.close()
-            # let the parent block on send()
-            self._sleep()
-            received_data = bytearray(len(data))
-            n = 0
-            while n < len(data):
-                n += rd.recv_into(memoryview(received_data)[n:])
-            self.assertEqual(received_data, data)
-            os._exit(0)
-        else:
+        code = '\n'.join((
+            'import os, socket, sys, time',
+            '',
+            'fd = int(sys.argv[1])',
+            'family = %s' % int(rd.family),
+            'sock_type = %s' % int(rd.type),
+            'sleep_time = %r' % self.sleep_time,
+            'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
+            'data_len = len(data)',
+            '',
+            'rd = socket.fromfd(fd, family, sock_type)',
+            'os.close(fd)',
+            '',
+            'with rd:',
+            '    # let the parent block on send()',
+            '    time.sleep(sleep_time)',
+            '',
+            '    received_data = bytearray(data_len)',
+            '    n = 0',
+            '    while n < data_len:',
+            '        n += rd.recv_into(memoryview(received_data)[n:])',
+            '',
+            'if received_data != data:',
+            '    raise Exception("recv error: %s vs %s bytes"',
+            '                    % (len(received_data), data_len))',
+        ))
+
+        fd = rd.fileno()
+        proc = self.subprocess(code, str(fd), pass_fds=[fd])
+        with proc:
             rd.close()
             written = 0
             while written < len(data):
                 sent = send_func(wr, memoryview(data)[written:])
                 # sendall() returns None
                 written += len(data) if sent is None else sent
-            self.assertEqual(0, os.waitpid(pid, 0)[1])
+            self.assertEqual(proc.wait(), 0)
 
     def test_send(self):
         self._test_send(socket.socket.send)
@@ -223,45 +264,60 @@
         self.addCleanup(sock.close)
 
         sock.bind((support.HOST, 0))
-        _, port = sock.getsockname()
+        port = sock.getsockname()[1]
         sock.listen()
 
-        pid = os.fork()
-        if pid == 0:
-            # let parent block on accept()
-            self._sleep()
-            with socket.create_connection((support.HOST, port)):
-                self._sleep()
-            os._exit(0)
-        else:
-            self.addCleanup(os.waitpid, pid, 0)
+        code = '\n'.join((
+            'import socket, time',
+            '',
+            'host = %r' % support.HOST,
+            'port = %s' % port,
+            'sleep_time = %r' % self.sleep_time,
+            '',
+            '# let parent block on accept()',
+            'time.sleep(sleep_time)',
+            'with socket.create_connection((host, port)):',
+            '    time.sleep(sleep_time)',
+        ))
+
+        with self.subprocess(code) as proc:
             client_sock, _ = sock.accept()
             client_sock.close()
+            self.assertEqual(proc.wait(), 0)
 
     @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()')
     def _test_open(self, do_open_close_reader, do_open_close_writer):
+        filename = support.TESTFN
+
         # Use a fifo: until the child opens it for reading, the parent will
         # block when trying to open it for writing.
-        support.unlink(support.TESTFN)
-        os.mkfifo(support.TESTFN)
-        self.addCleanup(support.unlink, support.TESTFN)
+        support.unlink(filename)
+        os.mkfifo(filename)
+        self.addCleanup(support.unlink, filename)
 
-        pid = os.fork()
-        if pid == 0:
-            # let the parent block
-            self._sleep()
-            do_open_close_reader(support.TESTFN)
-            os._exit(0)
-        else:
-            self.addCleanup(os.waitpid, pid, 0)
-            do_open_close_writer(support.TESTFN)
+        code = '\n'.join((
+            'import os, time',
+            '',
+            'path = %a' % filename,
+            'sleep_time = %r' % self.sleep_time,
+            '',
+            '# let the parent block',
+            'time.sleep(sleep_time)',
+            '',
+            do_open_close_reader,
+        ))
+
+        with self.subprocess(code) as proc:
+            do_open_close_writer(filename)
+
+            self.assertEqual(proc.wait(), 0)
 
     def test_open(self):
-        self._test_open(lambda path: open(path, 'r').close(),
+        self._test_open("open(path, 'r').close()",
                         lambda path: open(path, 'w').close())
 
     def test_os_open(self):
-        self._test_open(lambda path: os.close(os.open(path, os.O_RDONLY)),
+        self._test_open("os.close(os.open(path, os.O_RDONLY))",
                         lambda path: os.close(os.open(path, os.O_WRONLY)))
 
 
@@ -298,20 +354,21 @@
         old_handler = signal.signal(signum, lambda *args: None)
         self.addCleanup(signal.signal, signum, old_handler)
 
+        code = '\n'.join((
+            'import os, time',
+            'pid = %s' % os.getpid(),
+            'signum = %s' % int(signum),
+            'sleep_time = %r' % self.sleep_time,
+            'time.sleep(sleep_time)',
+            'os.kill(pid, signum)',
+        ))
+
         t0 = time.monotonic()
-        child_pid = os.fork()
-        if child_pid == 0:
-            # child
-            try:
-                self._sleep()
-                os.kill(pid, signum)
-            finally:
-                os._exit(0)
-        else:
+        with self.subprocess(code) as proc:
             # parent
             signal.sigwaitinfo([signum])
             dt = time.monotonic() - t0
-            os.waitpid(child_pid, 0)
+            self.assertEqual(proc.wait(), 0)
 
         self.assertGreaterEqual(dt, self.sleep_time)
 

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list