[Python-checkins] cpython (merge 3.5 -> default): Merge 3.5 -> default
andrew.svetlov
python-checkins at python.org
Tue Sep 29 18:24:44 CEST 2015
https://hg.python.org/cpython/rev/d325c372161a
changeset: 98398:d325c372161a
parent: 98395:aef6365294c8
parent: 98397:03b9579be90f
user: Andrew Svetlov <andrew.svetlov at gmail.com>
date: Tue Sep 29 18:38:22 2015 +0300
summary:
Merge 3.5 -> default
files:
Lib/asyncio/streams.py | 2 +-
Lib/test/test_asyncio/test_streams.py | 42 +++++++++++++++
2 files changed, 43 insertions(+), 1 deletions(-)
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -324,7 +324,7 @@
def __repr__(self):
info = ['StreamReader']
if self._buffer:
- info.append('%d bytes' % len(info))
+ info.append('%d bytes' % len(self._buffer))
if self._eof:
info.append('eof')
if self._limit != _DEFAULT_LIMIT:
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -632,6 +632,48 @@
protocol = asyncio.StreamReaderProtocol(reader)
self.assertIs(protocol._loop, self.loop)
+ def test___repr__(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ self.assertEqual("<StreamReader>", repr(stream))
+
+ def test___repr__nondefault_limit(self):
+ stream = asyncio.StreamReader(loop=self.loop, limit=123)
+ self.assertEqual("<StreamReader l=123>", repr(stream))
+
+ def test___repr__eof(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_eof()
+ self.assertEqual("<StreamReader eof>", repr(stream))
+
+ def test___repr__data(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(b'data')
+ self.assertEqual("<StreamReader 4 bytes>", repr(stream))
+
+ def test___repr__exception(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ exc = RuntimeError()
+ stream.set_exception(exc)
+ self.assertEqual("<StreamReader e=RuntimeError()>", repr(stream))
+
+ def test___repr__waiter(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream._waiter = asyncio.Future(loop=self.loop)
+ self.assertRegex(
+ repr(stream),
+ "<StreamReader w=<Future pending[\S ]*>>")
+ stream._waiter.set_result(None)
+ self.loop.run_until_complete(stream._waiter)
+ stream._waiter = None
+ self.assertEqual("<StreamReader>", repr(stream))
+
+ def test___repr__transport(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream._transport = mock.Mock()
+ stream._transport.__repr__ = mock.Mock()
+ stream._transport.__repr__.return_value = "<Transport>"
+ self.assertEqual("<StreamReader t=<Transport>>", repr(stream))
+
if __name__ == '__main__':
unittest.main()
--
Repository URL: https://hg.python.org/cpython
More information about the Python-checkins
mailing list