[Python-checkins] cpython: asyncio, Tulip issue 157: Improve test_events.py, avoid run_briefly() which is

victor.stinner python-checkins at python.org
Thu Mar 6 01:02:35 CET 2014


http://hg.python.org/cpython/rev/56c346e9ae4d
changeset:   89480:56c346e9ae4d
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Thu Mar 06 01:00:36 2014 +0100
summary:
  asyncio, Tulip issue 157: Improve test_events.py, avoid run_briefly() which is
not reliable

files:
  Lib/asyncio/test_utils.py            |   15 +-
  Lib/test/test_asyncio/test_events.py |  129 +++++++-------
  2 files changed, 71 insertions(+), 73 deletions(-)


diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py
--- a/Lib/asyncio/test_utils.py
+++ b/Lib/asyncio/test_utils.py
@@ -21,10 +21,11 @@
 except ImportError:  # pragma: no cover
     ssl = None
 
-from . import tasks
 from . import base_events
 from . import events
+from . import futures
 from . import selectors
+from . import tasks
 
 
 if sys.platform == 'win32':  # pragma: no cover
@@ -52,18 +53,14 @@
         gen.close()
 
 
-def run_until(loop, pred, timeout=None):
-    if timeout is not None:
-        deadline = time.time() + timeout
+def run_until(loop, pred, timeout=30):
+    deadline = time.time() + timeout
     while not pred():
         if timeout is not None:
             timeout = deadline - time.time()
             if timeout <= 0:
-                return False
-            loop.run_until_complete(tasks.sleep(timeout, loop=loop))
-        else:
-            run_briefly(loop)
-    return True
+                raise futures.TimeoutError()
+        loop.run_until_complete(tasks.sleep(0.001, loop=loop))
 
 
 def run_once(loop):
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -56,6 +56,7 @@
 
 
 class MyBaseProto(asyncio.Protocol):
+    connected = None
     done = None
 
     def __init__(self, loop=None):
@@ -63,12 +64,15 @@
         self.state = 'INITIAL'
         self.nbytes = 0
         if loop is not None:
+            self.connected = asyncio.Future(loop=loop)
             self.done = asyncio.Future(loop=loop)
 
     def connection_made(self, transport):
         self.transport = transport
         assert self.state == 'INITIAL', self.state
         self.state = 'CONNECTED'
+        if self.connected:
+            self.connected.set_result(None)
 
     def data_received(self, data):
         assert self.state == 'CONNECTED', self.state
@@ -330,7 +334,8 @@
 
     def test_reader_callback(self):
         r, w = test_utils.socketpair()
-        bytes_read = []
+        r.setblocking(False)
+        bytes_read = bytearray()
 
         def reader():
             try:
@@ -340,37 +345,40 @@
                 # at least on Linux -- see man select.
                 return
             if data:
-                bytes_read.append(data)
+                bytes_read.extend(data)
             else:
                 self.assertTrue(self.loop.remove_reader(r.fileno()))
                 r.close()
 
         self.loop.add_reader(r.fileno(), reader)
         self.loop.call_soon(w.send, b'abc')
-        test_utils.run_briefly(self.loop)
+        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
         self.loop.call_soon(w.send, b'def')
-        test_utils.run_briefly(self.loop)
+        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
         self.loop.call_soon(w.close)
         self.loop.call_soon(self.loop.stop)
         self.loop.run_forever()
-        self.assertEqual(b''.join(bytes_read), b'abcdef')
+        self.assertEqual(bytes_read, b'abcdef')
 
     def test_writer_callback(self):
         r, w = test_utils.socketpair()
         w.setblocking(False)
-        self.loop.add_writer(w.fileno(), w.send, b'x'*(256*1024))
-        test_utils.run_briefly(self.loop)
 
-        def remove_writer():
-            self.assertTrue(self.loop.remove_writer(w.fileno()))
+        def writer(data):
+            w.send(data)
+            self.loop.stop()
 
-        self.loop.call_soon(remove_writer)
-        self.loop.call_soon(self.loop.stop)
+        data = b'x' * 1024
+        self.loop.add_writer(w.fileno(), writer, data)
         self.loop.run_forever()
+
+        self.assertTrue(self.loop.remove_writer(w.fileno()))
+        self.assertFalse(self.loop.remove_writer(w.fileno()))
+
         w.close()
-        data = r.recv(256*1024)
+        read = r.recv(len(data) * 2)
         r.close()
-        self.assertGreaterEqual(len(data), 200)
+        self.assertEqual(read, data)
 
     def _basetest_sock_client_ops(self, httpd, sock):
         sock.setblocking(False)
@@ -464,10 +472,10 @@
         self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
         # Now set a handler and handle it.
         self.loop.add_signal_handler(signal.SIGINT, my_handler)
-        test_utils.run_briefly(self.loop)
+
         os.kill(os.getpid(), signal.SIGINT)
-        test_utils.run_briefly(self.loop)
-        self.assertEqual(caught, 1)
+        test_utils.run_until(self.loop, lambda: caught)
+
         # Removing it should restore the default handler.
         self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
         self.assertEqual(signal.getsignal(signal.SIGINT),
@@ -623,7 +631,7 @@
             self.assertIn(str(httpd.address), cm.exception.strerror)
 
     def test_create_server(self):
-        proto = MyProto()
+        proto = MyProto(self.loop)
         f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
         server = self.loop.run_until_complete(f)
         self.assertEqual(len(server.sockets), 1)
@@ -633,14 +641,11 @@
         client = socket.socket()
         client.connect(('127.0.0.1', port))
         client.sendall(b'xxx')
-        test_utils.run_briefly(self.loop)
-        test_utils.run_until(self.loop, lambda: proto is not None, 10)
-        self.assertIsInstance(proto, MyProto)
-        self.assertEqual('INITIAL', proto.state)
-        test_utils.run_briefly(self.loop)
+
+        self.loop.run_until_complete(proto.connected)
         self.assertEqual('CONNECTED', proto.state)
-        test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
-                             timeout=10)
+
+        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
         self.assertEqual(3, proto.nbytes)
 
         # extra info is available
@@ -650,7 +655,7 @@
 
         # close connection
         proto.transport.close()
-        test_utils.run_briefly(self.loop)  # windows iocp
+        self.loop.run_until_complete(proto.done)
 
         self.assertEqual('CLOSED', proto.state)
 
@@ -672,27 +677,22 @@
 
     @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
     def test_create_unix_server(self):
-        proto = MyProto()
+        proto = MyProto(loop=self.loop)
         server, path = self._make_unix_server(lambda: proto)
         self.assertEqual(len(server.sockets), 1)
 
         client = socket.socket(socket.AF_UNIX)
         client.connect(path)
         client.sendall(b'xxx')
-        test_utils.run_briefly(self.loop)
-        test_utils.run_until(self.loop, lambda: proto is not None, 10)
 
-        self.assertIsInstance(proto, MyProto)
-        self.assertEqual('INITIAL', proto.state)
-        test_utils.run_briefly(self.loop)
+        self.loop.run_until_complete(proto.connected)
         self.assertEqual('CONNECTED', proto.state)
-        test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
-                             timeout=10)
+        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
         self.assertEqual(3, proto.nbytes)
 
         # close connection
         proto.transport.close()
-        test_utils.run_briefly(self.loop)  # windows iocp
+        self.loop.run_until_complete(proto.done)
 
         self.assertEqual('CLOSED', proto.state)
 
@@ -735,12 +735,10 @@
         client, pr = self.loop.run_until_complete(f_c)
 
         client.write(b'xxx')
-        test_utils.run_briefly(self.loop)
-        self.assertIsInstance(proto, MyProto)
-        test_utils.run_briefly(self.loop)
+        self.loop.run_until_complete(proto.connected)
         self.assertEqual('CONNECTED', proto.state)
-        test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
-                             timeout=10)
+
+        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
         self.assertEqual(3, proto.nbytes)
 
         # extra info is available
@@ -774,12 +772,9 @@
         client, pr = self.loop.run_until_complete(f_c)
 
         client.write(b'xxx')
-        test_utils.run_briefly(self.loop)
-        self.assertIsInstance(proto, MyProto)
-        test_utils.run_briefly(self.loop)
+        self.loop.run_until_complete(proto.connected)
         self.assertEqual('CONNECTED', proto.state)
-        test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
-                             timeout=10)
+        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
         self.assertEqual(3, proto.nbytes)
 
         # close connection
@@ -1044,15 +1039,9 @@
 
         self.assertEqual('INITIALIZED', client.state)
         transport.sendto(b'xxx')
-        for _ in range(1000):
-            if server.nbytes:
-                break
-            test_utils.run_briefly(self.loop)
+        test_utils.run_until(self.loop, lambda: server.nbytes)
         self.assertEqual(3, server.nbytes)
-        for _ in range(1000):
-            if client.nbytes:
-                break
-            test_utils.run_briefly(self.loop)
+        test_utils.run_until(self.loop, lambda: client.nbytes)
 
         # received
         self.assertEqual(8, client.nbytes)
@@ -1097,11 +1086,11 @@
         self.loop.run_until_complete(connect())
 
         os.write(wpipe, b'1')
-        test_utils.run_briefly(self.loop)
+        test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
         self.assertEqual(1, proto.nbytes)
 
         os.write(wpipe, b'2345')
-        test_utils.run_briefly(self.loop)
+        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
         self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
         self.assertEqual(5, proto.nbytes)
 
@@ -1166,14 +1155,19 @@
         self.assertEqual('CONNECTED', proto.state)
 
         transport.write(b'1')
-        test_utils.run_briefly(self.loop)
-        data = os.read(rpipe, 1024)
+
+        data = bytearray()
+        def reader(data):
+            chunk = os.read(rpipe, 1024)
+            data += chunk
+            return len(data)
+
+        test_utils.run_until(self.loop, lambda: reader(data) >= 1)
         self.assertEqual(b'1', data)
 
         transport.write(b'2345')
-        test_utils.run_briefly(self.loop)
-        data = os.read(rpipe, 1024)
-        self.assertEqual(b'2345', data)
+        test_utils.run_until(self.loop, lambda: reader(data) >= 5)
+        self.assertEqual(b'12345', data)
         self.assertEqual('CONNECTED', proto.state)
 
         os.close(rpipe)
@@ -1225,14 +1219,21 @@
         self.assertEqual('CONNECTED', proto.state)
 
         transport.write(b'1')
-        test_utils.run_briefly(self.loop)
-        data = os.read(master, 1024)
+
+        data = bytearray()
+        def reader(data):
+            chunk = os.read(master, 1024)
+            data += chunk
+            return len(data)
+
+        test_utils.run_until(self.loop, lambda: reader(data) >= 1,
+                             timeout=10)
         self.assertEqual(b'1', data)
 
         transport.write(b'2345')
-        test_utils.run_briefly(self.loop)
-        data = os.read(master, 1024)
-        self.assertEqual(b'2345', data)
+        test_utils.run_until(self.loop, lambda: reader(data) >= 5,
+                             timeout=10)
+        self.assertEqual(b'12345', data)
         self.assertEqual('CONNECTED', proto.state)
 
         os.close(master)

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list