[Python-checkins] cpython (3.4): asyncio/tests: Fix ResourceWarnings related to unclosed transports

yury.selivanov python-checkins at python.org
Wed Dec 16 20:23:53 EST 2015


https://hg.python.org/cpython/rev/56095b0e7890
changeset:   99595:56095b0e7890
branch:      3.4
parent:      99592:5fdcadf94c0d
user:        Yury Selivanov <yselivanov at sprymix.com>
date:        Wed Dec 16 20:23:26 2015 -0500
summary:
  asyncio/tests: Fix ResourceWarnings related to unclosed transports

files:
  Lib/test/test_asyncio/test_base_events.py |  47 ++++++----
  1 files changed, 29 insertions(+), 18 deletions(-)


diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -1157,21 +1157,28 @@
         self.loop.add_writer = mock.Mock()
         self.loop.add_writer._is_coroutine = False
 
-        coro = self.loop.create_connection(MyProto, '1.2.3.4', 80)
-        self.loop.run_until_complete(coro)
-        sock.connect.assert_called_with(('1.2.3.4', 80))
-        m_socket.socket.assert_called_with(family=m_socket.AF_INET,
-                                           proto=m_socket.IPPROTO_TCP,
-                                           type=m_socket.SOCK_STREAM)
+        coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
+        t, p = self.loop.run_until_complete(coro)
+        try:
+            sock.connect.assert_called_with(('1.2.3.4', 80))
+            m_socket.socket.assert_called_with(family=m_socket.AF_INET,
+                                               proto=m_socket.IPPROTO_TCP,
+                                               type=m_socket.SOCK_STREAM)
+        finally:
+            t.close()
+            test_utils.run_briefly(self.loop)  # allow transport to close
 
         sock.family = socket.AF_INET6
-        coro = self.loop.create_connection(MyProto, '::2', 80)
-
-        self.loop.run_until_complete(coro)
-        sock.connect.assert_called_with(('::2', 80))
-        m_socket.socket.assert_called_with(family=m_socket.AF_INET6,
-                                           proto=m_socket.IPPROTO_TCP,
-                                           type=m_socket.SOCK_STREAM)
+        coro = self.loop.create_connection(asyncio.Protocol, '::2', 80)
+        t, p = self.loop.run_until_complete(coro)
+        try:
+            sock.connect.assert_called_with(('::2', 80))
+            m_socket.socket.assert_called_with(family=m_socket.AF_INET6,
+                                               proto=m_socket.IPPROTO_TCP,
+                                               type=m_socket.SOCK_STREAM)
+        finally:
+            t.close()
+            test_utils.run_briefly(self.loop)  # allow transport to close
 
     @patch_socket
     def test_create_connection_ip_addr(self, m_socket):
@@ -1559,11 +1566,15 @@
             reuse_address=False,
             reuse_port=reuseport_supported)
 
-        self.loop.run_until_complete(coro)
-        bind.assert_called_with(('1.2.3.4', 0))
-        m_socket.socket.assert_called_with(family=m_socket.AF_INET,
-                                           proto=m_socket.IPPROTO_UDP,
-                                           type=m_socket.SOCK_DGRAM)
+        t, p = self.loop.run_until_complete(coro)
+        try:
+            bind.assert_called_with(('1.2.3.4', 0))
+            m_socket.socket.assert_called_with(family=m_socket.AF_INET,
+                                               proto=m_socket.IPPROTO_UDP,
+                                               type=m_socket.SOCK_DGRAM)
+        finally:
+            t.close()
+            test_utils.run_briefly(self.loop)  # allow transport to close
 
     def test_accept_connection_retry(self):
         sock = mock.Mock()

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list