[Python-checkins] Extract tests for sock_*() functions into a separate file (GH-9761)

Andrew Svetlov webhook-mailer at python.org
Mon Oct 8 16:06:21 EDT 2018


https://github.com/python/cpython/commit/60d230c78f1e46832fded8b3a8ee604aafa5cc11
commit: 60d230c78f1e46832fded8b3a8ee604aafa5cc11
branch: master
author: Andrew Svetlov <andrew.svetlov at gmail.com>
committer: GitHub <noreply at github.com>
date: 2018-10-08T23:06:18+03:00
summary:

Extract tests for sock_*() functions into a separate file (GH-9761)

files:
A Lib/test/test_asyncio/test_sock_lowlevel.py
M Lib/test/test_asyncio/test_events.py

diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 2a90cda92c9d..607c1955ac58 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -419,108 +419,6 @@ def writer(data):
         r.close()
         self.assertEqual(read, data)
 
-    def _basetest_sock_client_ops(self, httpd, sock):
-        if not isinstance(self.loop, proactor_events.BaseProactorEventLoop):
-            # in debug mode, socket operations must fail
-            # if the socket is not in blocking mode
-            self.loop.set_debug(True)
-            sock.setblocking(True)
-            with self.assertRaises(ValueError):
-                self.loop.run_until_complete(
-                    self.loop.sock_connect(sock, httpd.address))
-            with self.assertRaises(ValueError):
-                self.loop.run_until_complete(
-                    self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
-            with self.assertRaises(ValueError):
-                self.loop.run_until_complete(
-                    self.loop.sock_recv(sock, 1024))
-            with self.assertRaises(ValueError):
-                self.loop.run_until_complete(
-                    self.loop.sock_recv_into(sock, bytearray()))
-            with self.assertRaises(ValueError):
-                self.loop.run_until_complete(
-                    self.loop.sock_accept(sock))
-
-        # test in non-blocking mode
-        sock.setblocking(False)
-        self.loop.run_until_complete(
-            self.loop.sock_connect(sock, httpd.address))
-        self.loop.run_until_complete(
-            self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
-        data = self.loop.run_until_complete(
-            self.loop.sock_recv(sock, 1024))
-        # consume data
-        self.loop.run_until_complete(
-            self.loop.sock_recv(sock, 1024))
-        sock.close()
-        self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
-
-    def _basetest_sock_recv_into(self, httpd, sock):
-        # same as _basetest_sock_client_ops, but using sock_recv_into
-        sock.setblocking(False)
-        self.loop.run_until_complete(
-            self.loop.sock_connect(sock, httpd.address))
-        self.loop.run_until_complete(
-            self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
-        data = bytearray(1024)
-        with memoryview(data) as buf:
-            nbytes = self.loop.run_until_complete(
-                self.loop.sock_recv_into(sock, buf[:1024]))
-            # consume data
-            self.loop.run_until_complete(
-                self.loop.sock_recv_into(sock, buf[nbytes:]))
-        sock.close()
-        self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
-
-    def test_sock_client_ops(self):
-        with test_utils.run_test_server() as httpd:
-            sock = socket.socket()
-            self._basetest_sock_client_ops(httpd, sock)
-            sock = socket.socket()
-            self._basetest_sock_recv_into(httpd, sock)
-
-    @support.skip_unless_bind_unix_socket
-    def test_unix_sock_client_ops(self):
-        with test_utils.run_test_unix_server() as httpd:
-            sock = socket.socket(socket.AF_UNIX)
-            self._basetest_sock_client_ops(httpd, sock)
-            sock = socket.socket(socket.AF_UNIX)
-            self._basetest_sock_recv_into(httpd, sock)
-
-    def test_sock_client_fail(self):
-        # Make sure that we will get an unused port
-        address = None
-        try:
-            s = socket.socket()
-            s.bind(('127.0.0.1', 0))
-            address = s.getsockname()
-        finally:
-            s.close()
-
-        sock = socket.socket()
-        sock.setblocking(False)
-        with self.assertRaises(ConnectionRefusedError):
-            self.loop.run_until_complete(
-                self.loop.sock_connect(sock, address))
-        sock.close()
-
-    def test_sock_accept(self):
-        listener = socket.socket()
-        listener.setblocking(False)
-        listener.bind(('127.0.0.1', 0))
-        listener.listen(1)
-        client = socket.socket()
-        client.connect(listener.getsockname())
-
-        f = self.loop.sock_accept(listener)
-        conn, addr = self.loop.run_until_complete(f)
-        self.assertEqual(conn.gettimeout(), 0)
-        self.assertEqual(addr, client.getsockname())
-        self.assertEqual(client.getpeername(), listener.getsockname())
-        client.close()
-        conn.close()
-        listener.close()
-
     @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
     def test_add_signal_handler(self):
         caught = 0
@@ -626,34 +524,6 @@ def test_create_unix_connection(self):
                 lambda: MyProto(loop=self.loop), httpd.address)
             self._basetest_create_connection(conn_fut, check_sockname)
 
-    def test_create_connection_sock(self):
-        with test_utils.run_test_server() as httpd:
-            sock = None
-            infos = self.loop.run_until_complete(
-                self.loop.getaddrinfo(
-                    *httpd.address, type=socket.SOCK_STREAM))
-            for family, type, proto, cname, address in infos:
-                try:
-                    sock = socket.socket(family=family, type=type, proto=proto)
-                    sock.setblocking(False)
-                    self.loop.run_until_complete(
-                        self.loop.sock_connect(sock, address))
-                except:
-                    pass
-                else:
-                    break
-            else:
-                assert False, 'Can not create socket.'
-
-            f = self.loop.create_connection(
-                lambda: MyProto(loop=self.loop), sock=sock)
-            tr, pr = self.loop.run_until_complete(f)
-            self.assertIsInstance(tr, asyncio.Transport)
-            self.assertIsInstance(pr, asyncio.Protocol)
-            self.loop.run_until_complete(pr.done)
-            self.assertGreater(pr.nbytes, 0)
-            tr.close()
-
     def check_ssl_extra_info(self, client, check_sockname=True,
                              peername=None, peercert={}):
         if check_sockname:
diff --git a/Lib/test/test_asyncio/test_sock_lowlevel.py b/Lib/test/test_asyncio/test_sock_lowlevel.py
new file mode 100644
index 000000000000..8d1661524c27
--- /dev/null
+++ b/Lib/test/test_asyncio/test_sock_lowlevel.py
@@ -0,0 +1,238 @@
+import socket
+import asyncio
+import sys
+from asyncio import proactor_events
+from test.test_asyncio import utils as test_utils
+from test import support
+
+
+class MyProto(asyncio.Protocol):
+    connected = None
+    done = None
+
+    def __init__(self, loop=None):
+        self.transport = None
+        self.state = 'INITIAL'
+        self.nbytes = 0
+        if loop is not None:
+            self.connected = asyncio.Future(loop=loop)
+            self.done = asyncio.Future(loop=loop)
+
+    def connection_made(self, transport):
+        self.transport = transport
+        assert self.state == 'INITIAL', self.state
+        self.state = 'CONNECTED'
+        if self.connected:
+            self.connected.set_result(None)
+        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
+
+    def data_received(self, data):
+        assert self.state == 'CONNECTED', self.state
+        self.nbytes += len(data)
+
+    def eof_received(self):
+        assert self.state == 'CONNECTED', self.state
+        self.state = 'EOF'
+
+    def connection_lost(self, exc):
+        assert self.state in ('CONNECTED', 'EOF'), self.state
+        self.state = 'CLOSED'
+        if self.done:
+            self.done.set_result(None)
+
+
+class BaseSockTestsMixin:
+
+    def create_event_loop(self):
+        raise NotImplementedError
+
+    def setUp(self):
+        self.loop = self.create_event_loop()
+        self.set_event_loop(self.loop)
+        super().setUp()
+
+    def tearDown(self):
+        # just in case if we have transport close callbacks
+        if not self.loop.is_closed():
+            test_utils.run_briefly(self.loop)
+
+        self.doCleanups()
+        support.gc_collect()
+        super().tearDown()
+
+    def _basetest_sock_client_ops(self, httpd, sock):
+        if not isinstance(self.loop, proactor_events.BaseProactorEventLoop):
+            # in debug mode, socket operations must fail
+            # if the socket is not in blocking mode
+            self.loop.set_debug(True)
+            sock.setblocking(True)
+            with self.assertRaises(ValueError):
+                self.loop.run_until_complete(
+                    self.loop.sock_connect(sock, httpd.address))
+            with self.assertRaises(ValueError):
+                self.loop.run_until_complete(
+                    self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
+            with self.assertRaises(ValueError):
+                self.loop.run_until_complete(
+                    self.loop.sock_recv(sock, 1024))
+            with self.assertRaises(ValueError):
+                self.loop.run_until_complete(
+                    self.loop.sock_recv_into(sock, bytearray()))
+            with self.assertRaises(ValueError):
+                self.loop.run_until_complete(
+                    self.loop.sock_accept(sock))
+
+        # test in non-blocking mode
+        sock.setblocking(False)
+        self.loop.run_until_complete(
+            self.loop.sock_connect(sock, httpd.address))
+        self.loop.run_until_complete(
+            self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
+        data = self.loop.run_until_complete(
+            self.loop.sock_recv(sock, 1024))
+        # consume data
+        self.loop.run_until_complete(
+            self.loop.sock_recv(sock, 1024))
+        sock.close()
+        self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+
+    def _basetest_sock_recv_into(self, httpd, sock):
+        # same as _basetest_sock_client_ops, but using sock_recv_into
+        sock.setblocking(False)
+        self.loop.run_until_complete(
+            self.loop.sock_connect(sock, httpd.address))
+        self.loop.run_until_complete(
+            self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
+        data = bytearray(1024)
+        with memoryview(data) as buf:
+            nbytes = self.loop.run_until_complete(
+                self.loop.sock_recv_into(sock, buf[:1024]))
+            # consume data
+            self.loop.run_until_complete(
+                self.loop.sock_recv_into(sock, buf[nbytes:]))
+        sock.close()
+        self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+
+    def test_sock_client_ops(self):
+        with test_utils.run_test_server() as httpd:
+            sock = socket.socket()
+            self._basetest_sock_client_ops(httpd, sock)
+            sock = socket.socket()
+            self._basetest_sock_recv_into(httpd, sock)
+
+    @support.skip_unless_bind_unix_socket
+    def test_unix_sock_client_ops(self):
+        with test_utils.run_test_unix_server() as httpd:
+            sock = socket.socket(socket.AF_UNIX)
+            self._basetest_sock_client_ops(httpd, sock)
+            sock = socket.socket(socket.AF_UNIX)
+            self._basetest_sock_recv_into(httpd, sock)
+
+    def test_sock_client_fail(self):
+        # Make sure that we will get an unused port
+        address = None
+        try:
+            s = socket.socket()
+            s.bind(('127.0.0.1', 0))
+            address = s.getsockname()
+        finally:
+            s.close()
+
+        sock = socket.socket()
+        sock.setblocking(False)
+        with self.assertRaises(ConnectionRefusedError):
+            self.loop.run_until_complete(
+                self.loop.sock_connect(sock, address))
+        sock.close()
+
+    def test_sock_accept(self):
+        listener = socket.socket()
+        listener.setblocking(False)
+        listener.bind(('127.0.0.1', 0))
+        listener.listen(1)
+        client = socket.socket()
+        client.connect(listener.getsockname())
+
+        f = self.loop.sock_accept(listener)
+        conn, addr = self.loop.run_until_complete(f)
+        self.assertEqual(conn.gettimeout(), 0)
+        self.assertEqual(addr, client.getsockname())
+        self.assertEqual(client.getpeername(), listener.getsockname())
+        client.close()
+        conn.close()
+        listener.close()
+
+    def test_create_connection_sock(self):
+        with test_utils.run_test_server() as httpd:
+            sock = None
+            infos = self.loop.run_until_complete(
+                self.loop.getaddrinfo(
+                    *httpd.address, type=socket.SOCK_STREAM))
+            for family, type, proto, cname, address in infos:
+                try:
+                    sock = socket.socket(family=family, type=type, proto=proto)
+                    sock.setblocking(False)
+                    self.loop.run_until_complete(
+                        self.loop.sock_connect(sock, address))
+                except BaseException:
+                    pass
+                else:
+                    break
+            else:
+                assert False, 'Can not create socket.'
+
+            f = self.loop.create_connection(
+                lambda: MyProto(loop=self.loop), sock=sock)
+            tr, pr = self.loop.run_until_complete(f)
+            self.assertIsInstance(tr, asyncio.Transport)
+            self.assertIsInstance(pr, asyncio.Protocol)
+            self.loop.run_until_complete(pr.done)
+            self.assertGreater(pr.nbytes, 0)
+            tr.close()
+
+
+if sys.platform == 'win32':
+
+    class SelectEventLoopTests(BaseSockTestsMixin,
+                               test_utils.TestCase):
+
+        def create_event_loop(self):
+            return asyncio.SelectorEventLoop()
+
+    class ProactorEventLoopTests(BaseSockTestsMixin,
+                                 test_utils.TestCase):
+
+        def create_event_loop(self):
+            return asyncio.ProactorEventLoop()
+
+else:
+    import selectors
+
+    if hasattr(selectors, 'KqueueSelector'):
+        class KqueueEventLoopTests(BaseSockTestsMixin,
+                                   test_utils.TestCase):
+
+            def create_event_loop(self):
+                return asyncio.SelectorEventLoop(
+                    selectors.KqueueSelector())
+
+    if hasattr(selectors, 'EpollSelector'):
+        class EPollEventLoopTests(BaseSockTestsMixin,
+                                  test_utils.TestCase):
+
+            def create_event_loop(self):
+                return asyncio.SelectorEventLoop(selectors.EpollSelector())
+
+    if hasattr(selectors, 'PollSelector'):
+        class PollEventLoopTests(BaseSockTestsMixin,
+                                 test_utils.TestCase):
+
+            def create_event_loop(self):
+                return asyncio.SelectorEventLoop(selectors.PollSelector())
+
+    # Should always exist.
+    class SelectEventLoopTests(BaseSockTestsMixin,
+                               test_utils.TestCase):
+
+        def create_event_loop(self):
+            return asyncio.SelectorEventLoop(selectors.SelectSelector())



More information about the Python-checkins mailing list