[Python-checkins] cpython (3.4): asyncio/tests: Fix ResourceWarnings related to unclosed transports
yury.selivanov
python-checkins at python.org
Wed Dec 16 20:23:53 EST 2015
https://hg.python.org/cpython/rev/56095b0e7890
changeset: 99595:56095b0e7890
branch: 3.4
parent: 99592:5fdcadf94c0d
user: Yury Selivanov <yselivanov at sprymix.com>
date: Wed Dec 16 20:23:26 2015 -0500
summary:
asyncio/tests: Fix ResourceWarnings related to unclosed transports
files:
Lib/test/test_asyncio/test_base_events.py | 47 ++++++----
1 files changed, 29 insertions(+), 18 deletions(-)
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -1157,21 +1157,28 @@
self.loop.add_writer = mock.Mock()
self.loop.add_writer._is_coroutine = False
- coro = self.loop.create_connection(MyProto, '1.2.3.4', 80)
- self.loop.run_until_complete(coro)
- sock.connect.assert_called_with(('1.2.3.4', 80))
- m_socket.socket.assert_called_with(family=m_socket.AF_INET,
- proto=m_socket.IPPROTO_TCP,
- type=m_socket.SOCK_STREAM)
+ coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
+ t, p = self.loop.run_until_complete(coro)
+ try:
+ sock.connect.assert_called_with(('1.2.3.4', 80))
+ m_socket.socket.assert_called_with(family=m_socket.AF_INET,
+ proto=m_socket.IPPROTO_TCP,
+ type=m_socket.SOCK_STREAM)
+ finally:
+ t.close()
+ test_utils.run_briefly(self.loop) # allow transport to close
sock.family = socket.AF_INET6
- coro = self.loop.create_connection(MyProto, '::2', 80)
-
- self.loop.run_until_complete(coro)
- sock.connect.assert_called_with(('::2', 80))
- m_socket.socket.assert_called_with(family=m_socket.AF_INET6,
- proto=m_socket.IPPROTO_TCP,
- type=m_socket.SOCK_STREAM)
+ coro = self.loop.create_connection(asyncio.Protocol, '::2', 80)
+ t, p = self.loop.run_until_complete(coro)
+ try:
+ sock.connect.assert_called_with(('::2', 80))
+ m_socket.socket.assert_called_with(family=m_socket.AF_INET6,
+ proto=m_socket.IPPROTO_TCP,
+ type=m_socket.SOCK_STREAM)
+ finally:
+ t.close()
+ test_utils.run_briefly(self.loop) # allow transport to close
@patch_socket
def test_create_connection_ip_addr(self, m_socket):
@@ -1559,11 +1566,15 @@
reuse_address=False,
reuse_port=reuseport_supported)
- self.loop.run_until_complete(coro)
- bind.assert_called_with(('1.2.3.4', 0))
- m_socket.socket.assert_called_with(family=m_socket.AF_INET,
- proto=m_socket.IPPROTO_UDP,
- type=m_socket.SOCK_DGRAM)
+ t, p = self.loop.run_until_complete(coro)
+ try:
+ bind.assert_called_with(('1.2.3.4', 0))
+ m_socket.socket.assert_called_with(family=m_socket.AF_INET,
+ proto=m_socket.IPPROTO_UDP,
+ type=m_socket.SOCK_DGRAM)
+ finally:
+ t.close()
+ test_utils.run_briefly(self.loop) # allow transport to close
def test_accept_connection_retry(self):
sock = mock.Mock()
--
Repository URL: https://hg.python.org/cpython
More information about the Python-checkins
mailing list