Python-checkins
Threads by month
- ----- 2026 -----
- June
- May
- April
- March
- February
- January
- ----- 2025 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2009 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2008 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2007 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2006 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2005 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2004 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2003 -----
- December
- November
- October
- September
- August
September 2025
- 1 participants
- 644 discussions
Sept. 30, 2025
https://github.com/python/cpython/commit/98a41af5b09343fa030e05655d8ea823e6…
commit: 98a41af5b09343fa030e05655d8ea823e6336380
branch: main
author: Adam Turner <9087854+AA-Turner(a)users.noreply.github.com>
committer: AA-Turner <9087854+AA-Turner(a)users.noreply.github.com>
date: 2025-09-30T18:18:13Z
summary:
GH-123299: Copyedit 3.14 What's New: Deprecated (#139433)
files:
M Doc/whatsnew/3.14.rst
diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst
index 40a2afe3b8e1ea..cef201e55827b3 100644
--- a/Doc/whatsnew/3.14.rst
+++ b/Doc/whatsnew/3.14.rst
@@ -2660,25 +2660,34 @@ urllib
Deprecated
==========
+New Deprecations
+----------------
+
+* Passing a complex number as the *real* or *imag* argument in the
+ :func:`complex` constructor is now deprecated;
+ complex numbers should only be passed as a single positional argument.
+ (Contributed by Serhiy Storchaka in :gh:`109218`.)
+
* :mod:`argparse`:
- * Passing the undocumented keyword argument *prefix_chars* to
- :meth:`~argparse.ArgumentParser.add_argument_group` is now
- deprecated.
+ * Passing the undocumented keyword argument *prefix_chars* to the
+ :meth:`~argparse.ArgumentParser.add_argument_group` method is now deprecated.
(Contributed by Savannah Ostrowski in :gh:`125563`.)
+
* Deprecated the :class:`argparse.FileType` type converter.
- Anything with resource management should be done downstream after the
- arguments are parsed.
+ Anything relating to resource management should be handled
+ downstream, after the arguments have been parsed.
(Contributed by Serhiy Storchaka in :gh:`58032`.)
* :mod:`asyncio`:
- * :func:`!asyncio.iscoroutinefunction` is deprecated
+ * The :func:`!asyncio.iscoroutinefunction` is now deprecated
and will be removed in Python 3.16;
use :func:`inspect.iscoroutinefunction` instead.
(Contributed by Jiahao Li and Kumar Aditya in :gh:`122875`.)
- * :mod:`asyncio` policy system is deprecated and will be removed in Python 3.16.
+ * The :mod:`asyncio` policy system is deprecated
+ and will be removed in Python 3.16.
In particular, the following classes and functions are deprecated:
* :class:`asyncio.AbstractEventLoopPolicy`
@@ -2689,99 +2698,102 @@ Deprecated
* :func:`asyncio.set_event_loop_policy`
Users should use :func:`asyncio.run` or :class:`asyncio.Runner` with
- *loop_factory* to use the desired event loop implementation.
+ the *loop_factory* argument to use the desired event loop implementation.
- For example, to use :class:`asyncio.SelectorEventLoop` on Windows::
+ For example, to use :class:`asyncio.SelectorEventLoop` on Windows:
- import asyncio
+ .. code-block:: python
- async def main():
- ...
+ import asyncio
- asyncio.run(main(), loop_factory=asyncio.SelectorEventLoop)
+ async def main():
+ ...
- (Contributed by Kumar Aditya in :gh:`127949`.)
+ asyncio.run(main(), loop_factory=asyncio.SelectorEventLoop)
-* :mod:`builtins`:
- Passing a complex number as the *real* or *imag* argument in the
- :func:`complex` constructor is now deprecated; it should only be passed
- as a single positional argument.
- (Contributed by Serhiy Storchaka in :gh:`109218`.)
+ (Contributed by Kumar Aditya in :gh:`127949`.)
* :mod:`codecs`:
- :func:`codecs.open` is now deprecated. Use :func:`open` instead.
+ The :func:`codecs.open` function is now deprecated,
+ and will be removed in a future version of Python.
+ Use :func:`open` instead.
(Contributed by Inada Naoki in :gh:`133036`.)
* :mod:`ctypes`:
* On non-Windows platforms, setting :attr:`.Structure._pack_` to use a
- MSVC-compatible default memory layout is deprecated in favor of setting
- :attr:`.Structure._layout_` to ``'ms'``.
+ MSVC-compatible default memory layout is now deprecated in favor of setting
+ :attr:`.Structure._layout_` to ``'ms'``, and will be removed in Python 3.19.
(Contributed by Petr Viktorin in :gh:`131747`.)
- * Calling :func:`ctypes.POINTER` on a string is deprecated.
- Use :ref:`ctypes-incomplete-types` for self-referential structures.
+ * Calling :func:`ctypes.POINTER` on a string is now deprecated.
+ Use :ref:`incomplete types <ctypes-incomplete-types>`
+ for self-referential structures.
Also, the internal ``ctypes._pointer_type_cache`` is deprecated.
See :func:`ctypes.POINTER` for updated implementation details.
(Contributed by Sergey Myrianov in :gh:`100926`.)
* :mod:`functools`:
Calling the Python implementation of :func:`functools.reduce` with *function*
- or *sequence* as keyword arguments is now deprecated.
+ or *sequence* as keyword arguments is now deprecated;
+ the parameters will be made positional-only in Python 3.16.
(Contributed by Kirill Podoprigora in :gh:`121676`.)
* :mod:`logging`:
- Support for custom logging handlers with the *strm* argument is deprecated
- and scheduled for removal in Python 3.16. Define handlers with the *stream*
- argument instead. (Contributed by Mariusz Felisiak in :gh:`115032`.)
+ Support for custom logging handlers with the *strm* argument
+ is now deprecated and scheduled for removal in Python 3.16.
+ Define handlers with the *stream* argument instead.
+ (Contributed by Mariusz Felisiak in :gh:`115032`.)
* :mod:`mimetypes`:
- Valid extensions start with a '.' or are empty for
+ Valid extensions are either empty or must start with '.' for
:meth:`mimetypes.MimeTypes.add_type`.
Undotted extensions are deprecated and will
raise a :exc:`ValueError` in Python 3.16.
(Contributed by Hugo van Kemenade in :gh:`75223`.)
-* :mod:`!nturl2path`: This module is now deprecated. Call
- :func:`urllib.request.url2pathname` and :func:`~urllib.request.pathname2url`
- instead.
+* :mod:`!nturl2path`:
+ This module is now deprecated. Call :func:`urllib.request.url2pathname`
+ and :func:`~urllib.request.pathname2url` instead.
(Contributed by Barney Gale in :gh:`125866`.)
* :mod:`os`:
- :term:`Soft deprecate <soft deprecated>` :func:`os.popen` and
- :func:`os.spawn* <os.spawnl>` functions. They should no longer be used to
- write new code. The :mod:`subprocess` module is recommended instead.
+ The :func:`os.popen` and :func:`os.spawn* <os.spawnl>` functions
+ are now :term:`soft deprecated`.
+ They should no longer be used to write new code.
+ The :mod:`subprocess` module is recommended instead.
(Contributed by Victor Stinner in :gh:`120743`.)
* :mod:`pathlib`:
- :meth:`!pathlib.PurePath.as_uri` is deprecated and will be removed in Python
- 3.19. Use :meth:`pathlib.Path.as_uri` instead.
+ :meth:`!pathlib.PurePath.as_uri` is now deprecated
+ and scheduled for removal in Python 3.19.
+ Use :meth:`pathlib.Path.as_uri` instead.
(Contributed by Barney Gale in :gh:`123599`.)
* :mod:`pdb`:
The undocumented ``pdb.Pdb.curframe_locals`` attribute is now a deprecated
- read-only property. The low overhead dynamic frame locals access added in
- Python 3.13 by PEP 667 means the frame locals cache reference previously
- stored in this attribute is no longer needed. Derived debuggers should access
+ read-only property, which will be removed in a future version of Python.
+ The low overhead dynamic frame locals access added in Python 3.13 by :pep:`667`
+ means the frame locals cache reference previously stored in this attribute
+ is no longer needed. Derived debuggers should access
``pdb.Pdb.curframe.f_locals`` directly in Python 3.13 and later versions.
(Contributed by Tian Gao in :gh:`124369` and :gh:`125951`.)
* :mod:`symtable`:
- Deprecate :meth:`symtable.Class.get_methods` due to the lack of interest.
+ Deprecate :meth:`symtable.Class.get_methods` due to the lack of interest,
+ scheduled for removal in Python 3.16.
(Contributed by Bénédikt Tran in :gh:`119698`.)
* :mod:`tkinter`:
The :class:`!tkinter.Variable` methods :meth:`!trace_variable`,
:meth:`!trace_vdelete` and :meth:`!trace_vinfo` are now deprecated.
- Use :meth:`!trace_add`, :meth:`!trace_remove` and :meth:`!trace_info`
- instead.
+ Use :meth:`!trace_add`, :meth:`!trace_remove` and :meth:`!trace_info` instead.
(Contributed by Serhiy Storchaka in :gh:`120220`.)
* :mod:`urllib.parse`:
Accepting objects with false values (like ``0`` and ``[]``) except empty
- strings, byte-like objects and ``None`` in :mod:`urllib.parse` functions
- :func:`~urllib.parse.parse_qsl` and :func:`~urllib.parse.parse_qs` is now
- deprecated.
+ strings, bytes-like objects and ``None`` in :func:`~urllib.parse.parse_qsl`
+ and :func:`~urllib.parse.parse_qs` is now deprecated.
(Contributed by Serhiy Storchaka in :gh:`116897`.)
.. Add deprecations above alphabetically, not here at the end.
1
0
[3.13] gh-139210: Fix use-after-free in xml.etree.ElementTree.iterparse() (GH-139211) (GH-139456)
by serhiy-storchaka Sept. 30, 2025
by serhiy-storchaka Sept. 30, 2025
Sept. 30, 2025
https://github.com/python/cpython/commit/f48128b6b3722ee2b2cef026e9679e37bd…
commit: f48128b6b3722ee2b2cef026e9679e37bd5b2517
branch: 3.13
author: Miss Islington (bot) <31488909+miss-islington(a)users.noreply.github.com>
committer: serhiy-storchaka <storchaka(a)gmail.com>
date: 2025-09-30T18:14:44Z
summary:
[3.13] gh-139210: Fix use-after-free in xml.etree.ElementTree.iterparse() (GH-139211) (GH-139456)
(cherry picked from commit c86eb4d3ac5984efc1ea920ba643e3c4f02fdee8)
Co-authored-by: Ken Jin <kenjin(a)python.org>
files:
A Misc/NEWS.d/next/Core_and_Builtins/2025-09-21-15-58-57.gh-issue-139210.HGbMvz.rst
M Lib/test/test_xml_etree.py
M Modules/_elementtree.c
diff --git a/Lib/test/test_xml_etree.py b/Lib/test/test_xml_etree.py
index 12efa006cd509d..78598b35dae050 100644
--- a/Lib/test/test_xml_etree.py
+++ b/Lib/test/test_xml_etree.py
@@ -1750,6 +1750,8 @@ def __next__(self):
def test_unknown_event(self):
with self.assertRaises(ValueError):
ET.XMLPullParser(events=('start', 'end', 'bogus'))
+ with self.assertRaisesRegex(ValueError, "unknown event 'bogus'"):
+ ET.XMLPullParser(events=(x.decode() for x in (b'start', b'end', b'bogus')))
@unittest.skipIf(pyexpat.version_info < (2, 6, 0),
f'Expat {pyexpat.version_info} does not '
diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2025-09-21-15-58-57.gh-issue-139210.HGbMvz.rst b/Misc/NEWS.d/next/Core_and_Builtins/2025-09-21-15-58-57.gh-issue-139210.HGbMvz.rst
new file mode 100644
index 00000000000000..1227b29a68a9d7
--- /dev/null
+++ b/Misc/NEWS.d/next/Core_and_Builtins/2025-09-21-15-58-57.gh-issue-139210.HGbMvz.rst
@@ -0,0 +1 @@
+Fix use-after-free when reporting unknown event in :func:`xml.etree.ElementTree.iterparse`. Patch by Ken Jin.
diff --git a/Modules/_elementtree.c b/Modules/_elementtree.c
index 3926ef3ef835e1..020b7454add224 100644
--- a/Modules/_elementtree.c
+++ b/Modules/_elementtree.c
@@ -4180,8 +4180,8 @@ _elementtree_XMLParser__setevents_impl(XMLParserObject *self,
(XML_ProcessingInstructionHandler) expat_pi_handler
);
} else {
- Py_DECREF(events_seq);
PyErr_Format(PyExc_ValueError, "unknown event '%s'", event_name);
+ Py_DECREF(events_seq);
return NULL;
}
}
1
0
gh-139210: Fix use-after-free in xml.etree.ElementTree.iterparse() (GH-139211)
by serhiy-storchaka Sept. 30, 2025
by serhiy-storchaka Sept. 30, 2025
Sept. 30, 2025
https://github.com/python/cpython/commit/c86eb4d3ac5984efc1ea920ba643e3c4f0…
commit: c86eb4d3ac5984efc1ea920ba643e3c4f02fdee8
branch: main
author: Ken Jin <kenjin(a)python.org>
committer: serhiy-storchaka <storchaka(a)gmail.com>
date: 2025-09-30T20:50:50+03:00
summary:
gh-139210: Fix use-after-free in xml.etree.ElementTree.iterparse() (GH-139211)
files:
A Misc/NEWS.d/next/Core_and_Builtins/2025-09-21-15-58-57.gh-issue-139210.HGbMvz.rst
M Lib/test/test_xml_etree.py
M Modules/_elementtree.c
diff --git a/Lib/test/test_xml_etree.py b/Lib/test/test_xml_etree.py
index bf6d5074fdebd8..f65baa0cfae2ad 100644
--- a/Lib/test/test_xml_etree.py
+++ b/Lib/test/test_xml_etree.py
@@ -1749,6 +1749,8 @@ def __next__(self):
def test_unknown_event(self):
with self.assertRaises(ValueError):
ET.XMLPullParser(events=('start', 'end', 'bogus'))
+ with self.assertRaisesRegex(ValueError, "unknown event 'bogus'"):
+ ET.XMLPullParser(events=(x.decode() for x in (b'start', b'end', b'bogus')))
@unittest.skipIf(pyexpat.version_info < (2, 6, 0),
f'Expat {pyexpat.version_info} does not '
diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2025-09-21-15-58-57.gh-issue-139210.HGbMvz.rst b/Misc/NEWS.d/next/Core_and_Builtins/2025-09-21-15-58-57.gh-issue-139210.HGbMvz.rst
new file mode 100644
index 00000000000000..1227b29a68a9d7
--- /dev/null
+++ b/Misc/NEWS.d/next/Core_and_Builtins/2025-09-21-15-58-57.gh-issue-139210.HGbMvz.rst
@@ -0,0 +1 @@
+Fix use-after-free when reporting unknown event in :func:`xml.etree.ElementTree.iterparse`. Patch by Ken Jin.
diff --git a/Modules/_elementtree.c b/Modules/_elementtree.c
index b9e12ab2026f65..9263f14b57f972 100644
--- a/Modules/_elementtree.c
+++ b/Modules/_elementtree.c
@@ -4214,8 +4214,8 @@ _elementtree_XMLParser__setevents_impl(XMLParserObject *self,
(XML_ProcessingInstructionHandler) expat_pi_handler
);
} else {
- Py_DECREF(events_seq);
PyErr_Format(PyExc_ValueError, "unknown event '%s'", event_name);
+ Py_DECREF(events_seq);
return NULL;
}
}
1
0
[3.13] gh-130567: Enable previously skipped locale tests on FreeBSD and macOS (GH-138652) (GH-139422)
by encukou Sept. 30, 2025
by encukou Sept. 30, 2025
Sept. 30, 2025
https://github.com/python/cpython/commit/0ef302e019caac0939a03bd041f0d6f239…
commit: 0ef302e019caac0939a03bd041f0d6f239ebef8a
branch: 3.13
author: Miss Islington (bot) <31488909+miss-islington(a)users.noreply.github.com>
committer: encukou <encukou(a)gmail.com>
date: 2025-09-30T17:53:45+02:00
summary:
[3.13] gh-130567: Enable previously skipped locale tests on FreeBSD and macOS (GH-138652) (GH-139422)
(cherry picked from commit 1b8dcdacc75caa8175f89c7e739e45d856ebf511)
Co-authored-by: Serhiy Storchaka <storchaka(a)gmail.com>
files:
M Lib/test/test_locale.py
diff --git a/Lib/test/test_locale.py b/Lib/test/test_locale.py
index 878a557e180fe4..71d03f3a3f904b 100644
--- a/Lib/test/test_locale.py
+++ b/Lib/test/test_locale.py
@@ -345,8 +345,7 @@ def setUp(self):
enc = codecs.lookup(locale.getencoding() or 'ascii').name
if enc not in ('utf-8', 'iso8859-1', 'cp1252'):
raise unittest.SkipTest('encoding not suitable')
- if enc != 'iso8859-1' and (sys.platform == 'darwin' or is_android or
- sys.platform.startswith('freebsd')):
+ if enc != 'iso8859-1' and is_android:
raise unittest.SkipTest('wcscoll/wcsxfrm have known bugs')
BaseLocalizedTest.setUp(self)
1
0
Sept. 30, 2025
https://github.com/python/cpython/commit/f752fde452d3bf6ff7dd45a474fbb3de9f…
commit: f752fde452d3bf6ff7dd45a474fbb3de9f977bf6
branch: main
author: Cody Maloney <cmaloney(a)users.noreply.github.com>
committer: vstinner <vstinner(a)python.org>
date: 2025-09-30T17:37:15+02:00
summary:
gh-138013: Split TextIO tests from test_io.test_general (#139173)
gh-138013: Split TextIO tests from test_general
These tests take 1.3 seconds on my dev machine, match fairly closely
with testing `textio.c` implementation only.
files:
A Lib/test/test_io/test_textio.py
M Lib/test/test_io/test_general.py
diff --git a/Lib/test/test_io/test_general.py b/Lib/test/test_io/test_general.py
index 5f645e3abbe230..c4a26719374450 100644
--- a/Lib/test/test_io/test_general.py
+++ b/Lib/test/test_io/test_general.py
@@ -6,7 +6,6 @@
import abc
import array
import errno
-import locale
import os
import pickle
import random
@@ -28,23 +27,10 @@
from test.support.os_helper import FakePath
from .utils import byteslike, CTestCase, PyTestCase
-import codecs
import io # C implementation of io
import _pyio as pyio # Python implementation of io
-def _default_chunk_size():
- """Get the default TextIOWrapper chunk size"""
- with open(__file__, "r", encoding="latin-1") as f:
- return f._CHUNK_SIZE
-
-
-
-class BadIndex:
- def __index__(self):
- 1/0
-
-
class IOTest:
def setUp(self):
@@ -2393,1640 +2379,6 @@ class PyBufferedRandomTest(BufferedRandomTest, PyTestCase):
tp = pyio.BufferedRandom
-# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
-# properties:
-# - A single output character can correspond to many bytes of input.
-# - The number of input bytes to complete the character can be
-# undetermined until the last input byte is received.
-# - The number of input bytes can vary depending on previous input.
-# - A single input byte can correspond to many characters of output.
-# - The number of output characters can be undetermined until the
-# last input byte is received.
-# - The number of output characters can vary depending on previous input.
-
-class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
- """
- For testing seek/tell behavior with a stateful, buffering decoder.
-
- Input is a sequence of words. Words may be fixed-length (length set
- by input) or variable-length (period-terminated). In variable-length
- mode, extra periods are ignored. Possible words are:
- - 'i' followed by a number sets the input length, I (maximum 99).
- When I is set to 0, words are space-terminated.
- - 'o' followed by a number sets the output length, O (maximum 99).
- - Any other word is converted into a word followed by a period on
- the output. The output word consists of the input word truncated
- or padded out with hyphens to make its length equal to O. If O
- is 0, the word is output verbatim without truncating or padding.
- I and O are initially set to 1. When I changes, any buffered input is
- re-scanned according to the new I. EOF also terminates the last word.
- """
-
- def __init__(self, errors='strict'):
- codecs.IncrementalDecoder.__init__(self, errors)
- self.reset()
-
- def __repr__(self):
- return '<SID %x>' % id(self)
-
- def reset(self):
- self.i = 1
- self.o = 1
- self.buffer = bytearray()
-
- def getstate(self):
- i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
- return bytes(self.buffer), i*100 + o
-
- def setstate(self, state):
- buffer, io = state
- self.buffer = bytearray(buffer)
- i, o = divmod(io, 100)
- self.i, self.o = i ^ 1, o ^ 1
-
- def decode(self, input, final=False):
- output = ''
- for b in input:
- if self.i == 0: # variable-length, terminated with period
- if b == ord('.'):
- if self.buffer:
- output += self.process_word()
- else:
- self.buffer.append(b)
- else: # fixed-length, terminate after self.i bytes
- self.buffer.append(b)
- if len(self.buffer) == self.i:
- output += self.process_word()
- if final and self.buffer: # EOF terminates the last word
- output += self.process_word()
- return output
-
- def process_word(self):
- output = ''
- if self.buffer[0] == ord('i'):
- self.i = min(99, int(self.buffer[1:] or 0)) # set input length
- elif self.buffer[0] == ord('o'):
- self.o = min(99, int(self.buffer[1:] or 0)) # set output length
- else:
- output = self.buffer.decode('ascii')
- if len(output) < self.o:
- output += '-'*self.o # pad out with hyphens
- if self.o:
- output = output[:self.o] # truncate to output length
- output += '.'
- self.buffer = bytearray()
- return output
-
- codecEnabled = False
-
-
-# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
-# when registering codecs and cleanup functions.
-def lookupTestDecoder(name):
- if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
- latin1 = codecs.lookup('latin-1')
- return codecs.CodecInfo(
- name='test_decoder', encode=latin1.encode, decode=None,
- incrementalencoder=None,
- streamreader=None, streamwriter=None,
- incrementaldecoder=StatefulIncrementalDecoder)
-
-
-class StatefulIncrementalDecoderTest(unittest.TestCase):
- """
- Make sure the StatefulIncrementalDecoder actually works.
- """
-
- test_cases = [
- # I=1, O=1 (fixed-length input == fixed-length output)
- (b'abcd', False, 'a.b.c.d.'),
- # I=0, O=0 (variable-length input, variable-length output)
- (b'oiabcd', True, 'abcd.'),
- # I=0, O=0 (should ignore extra periods)
- (b'oi...abcd...', True, 'abcd.'),
- # I=0, O=6 (variable-length input, fixed-length output)
- (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
- # I=2, O=6 (fixed-length input < fixed-length output)
- (b'i.i2.o6xyz', True, 'xy----.z-----.'),
- # I=6, O=3 (fixed-length input > fixed-length output)
- (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
- # I=0, then 3; O=29, then 15 (with longer output)
- (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
- 'a----------------------------.' +
- 'b----------------------------.' +
- 'cde--------------------------.' +
- 'abcdefghijabcde.' +
- 'a.b------------.' +
- '.c.------------.' +
- 'd.e------------.' +
- 'k--------------.' +
- 'l--------------.' +
- 'm--------------.')
- ]
-
- def test_decoder(self):
- # Try a few one-shot test cases.
- for input, eof, output in self.test_cases:
- d = StatefulIncrementalDecoder()
- self.assertEqual(d.decode(input, eof), output)
-
- # Also test an unfinished decode, followed by forcing EOF.
- d = StatefulIncrementalDecoder()
- self.assertEqual(d.decode(b'oiabcd'), '')
- self.assertEqual(d.decode(b'', 1), 'abcd.')
-
-class TextIOWrapperTest:
-
- def setUp(self):
- self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
- self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
- os_helper.unlink(os_helper.TESTFN)
- codecs.register(lookupTestDecoder)
- self.addCleanup(codecs.unregister, lookupTestDecoder)
-
- def tearDown(self):
- os_helper.unlink(os_helper.TESTFN)
-
- def test_constructor(self):
- r = self.BytesIO(b"\xc3\xa9\n\n")
- b = self.BufferedReader(r, 1000)
- t = self.TextIOWrapper(b, encoding="utf-8")
- t.__init__(b, encoding="latin-1", newline="\r\n")
- self.assertEqual(t.encoding, "latin-1")
- self.assertEqual(t.line_buffering, False)
- t.__init__(b, encoding="utf-8", line_buffering=True)
- self.assertEqual(t.encoding, "utf-8")
- self.assertEqual(t.line_buffering, True)
- self.assertEqual("\xe9\n", t.readline())
- invalid_type = TypeError if self.is_C else ValueError
- with self.assertRaises(invalid_type):
- t.__init__(b, encoding=42)
- with self.assertRaises(UnicodeEncodeError):
- t.__init__(b, encoding='\udcfe')
- with self.assertRaises(ValueError):
- t.__init__(b, encoding='utf-8\0')
- with self.assertRaises(invalid_type):
- t.__init__(b, encoding="utf-8", errors=42)
- if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
- with self.assertRaises(UnicodeEncodeError):
- t.__init__(b, encoding="utf-8", errors='\udcfe')
- if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
- with self.assertRaises(ValueError):
- t.__init__(b, encoding="utf-8", errors='replace\0')
- with self.assertRaises(TypeError):
- t.__init__(b, encoding="utf-8", newline=42)
- with self.assertRaises(ValueError):
- t.__init__(b, encoding="utf-8", newline='\udcfe')
- with self.assertRaises(ValueError):
- t.__init__(b, encoding="utf-8", newline='\n\0')
- with self.assertRaises(ValueError):
- t.__init__(b, encoding="utf-8", newline='xyzzy')
-
- def test_uninitialized(self):
- t = self.TextIOWrapper.__new__(self.TextIOWrapper)
- del t
- t = self.TextIOWrapper.__new__(self.TextIOWrapper)
- self.assertRaises(Exception, repr, t)
- self.assertRaisesRegex((ValueError, AttributeError),
- 'uninitialized|has no attribute',
- t.read, 0)
- t.__init__(self.MockRawIO(), encoding="utf-8")
- self.assertEqual(t.read(0), '')
-
- def test_non_text_encoding_codecs_are_rejected(self):
- # Ensure the constructor complains if passed a codec that isn't
- # marked as a text encoding
- # http://bugs.python.org/issue20404
- r = self.BytesIO()
- b = self.BufferedWriter(r)
- with self.assertRaisesRegex(LookupError, "is not a text encoding"):
- self.TextIOWrapper(b, encoding="hex")
-
- def test_detach(self):
- r = self.BytesIO()
- b = self.BufferedWriter(r)
- t = self.TextIOWrapper(b, encoding="ascii")
- self.assertIs(t.detach(), b)
-
- t = self.TextIOWrapper(b, encoding="ascii")
- t.write("howdy")
- self.assertFalse(r.getvalue())
- t.detach()
- self.assertEqual(r.getvalue(), b"howdy")
- self.assertRaises(ValueError, t.detach)
-
- # Operations independent of the detached stream should still work
- repr(t)
- self.assertEqual(t.encoding, "ascii")
- self.assertEqual(t.errors, "strict")
- self.assertFalse(t.line_buffering)
- self.assertFalse(t.write_through)
-
- def test_repr(self):
- raw = self.BytesIO("hello".encode("utf-8"))
- b = self.BufferedReader(raw)
- t = self.TextIOWrapper(b, encoding="utf-8")
- modname = self.TextIOWrapper.__module__
- self.assertRegex(repr(t),
- r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
- raw.name = "dummy"
- self.assertRegex(repr(t),
- r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
- t.mode = "r"
- self.assertRegex(repr(t),
- r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
- raw.name = b"dummy"
- self.assertRegex(repr(t),
- r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
-
- t.buffer.detach()
- repr(t) # Should not raise an exception
-
- def test_recursive_repr(self):
- # Issue #25455
- raw = self.BytesIO()
- t = self.TextIOWrapper(raw, encoding="utf-8")
- with support.swap_attr(raw, 'name', t), support.infinite_recursion(25):
- with self.assertRaises(RuntimeError):
- repr(t) # Should not crash
-
- def test_subclass_repr(self):
- class TestSubclass(self.TextIOWrapper):
- pass
-
- f = TestSubclass(self.StringIO())
- self.assertIn(TestSubclass.__name__, repr(f))
-
- def test_line_buffering(self):
- r = self.BytesIO()
- b = self.BufferedWriter(r, 1000)
- t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
- t.write("X")
- self.assertEqual(r.getvalue(), b"") # No flush happened
- t.write("Y\nZ")
- self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
- t.write("A\rB")
- self.assertEqual(r.getvalue(), b"XY\nZA\rB")
-
- def test_reconfigure_line_buffering(self):
- r = self.BytesIO()
- b = self.BufferedWriter(r, 1000)
- t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
- t.write("AB\nC")
- self.assertEqual(r.getvalue(), b"")
-
- t.reconfigure(line_buffering=True) # implicit flush
- self.assertEqual(r.getvalue(), b"AB\nC")
- t.write("DEF\nG")
- self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
- t.write("H")
- self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
- t.reconfigure(line_buffering=False) # implicit flush
- self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
- t.write("IJ")
- self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
-
- # Keeping default value
- t.reconfigure()
- t.reconfigure(line_buffering=None)
- self.assertEqual(t.line_buffering, False)
- t.reconfigure(line_buffering=True)
- t.reconfigure()
- t.reconfigure(line_buffering=None)
- self.assertEqual(t.line_buffering, True)
-
- @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
- def test_default_encoding(self):
- with os_helper.EnvironmentVarGuard() as env:
- # try to get a user preferred encoding different than the current
- # locale encoding to check that TextIOWrapper() uses the current
- # locale encoding and not the user preferred encoding
- env.unset('LC_ALL', 'LANG', 'LC_CTYPE')
-
- current_locale_encoding = locale.getencoding()
- b = self.BytesIO()
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", EncodingWarning)
- t = self.TextIOWrapper(b)
- self.assertEqual(t.encoding, current_locale_encoding)
-
- def test_encoding(self):
- # Check the encoding attribute is always set, and valid
- b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="utf-8")
- self.assertEqual(t.encoding, "utf-8")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", EncodingWarning)
- t = self.TextIOWrapper(b)
- self.assertIsNotNone(t.encoding)
- codecs.lookup(t.encoding)
-
- def test_encoding_errors_reading(self):
- # (1) default
- b = self.BytesIO(b"abc\n\xff\n")
- t = self.TextIOWrapper(b, encoding="ascii")
- self.assertRaises(UnicodeError, t.read)
- # (2) explicit strict
- b = self.BytesIO(b"abc\n\xff\n")
- t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
- self.assertRaises(UnicodeError, t.read)
- # (3) ignore
- b = self.BytesIO(b"abc\n\xff\n")
- t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
- self.assertEqual(t.read(), "abc\n\n")
- # (4) replace
- b = self.BytesIO(b"abc\n\xff\n")
- t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
- self.assertEqual(t.read(), "abc\n\ufffd\n")
-
- def test_encoding_errors_writing(self):
- # (1) default
- b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="ascii")
- self.assertRaises(UnicodeError, t.write, "\xff")
- # (2) explicit strict
- b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
- self.assertRaises(UnicodeError, t.write, "\xff")
- # (3) ignore
- b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
- newline="\n")
- t.write("abc\xffdef\n")
- t.flush()
- self.assertEqual(b.getvalue(), b"abcdef\n")
- # (4) replace
- b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
- newline="\n")
- t.write("abc\xffdef\n")
- t.flush()
- self.assertEqual(b.getvalue(), b"abc?def\n")
-
- def test_newlines(self):
- input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
-
- tests = [
- [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
- [ '', input_lines ],
- [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
- [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
- [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
- ]
- encodings = (
- 'utf-8', 'latin-1',
- 'utf-16', 'utf-16-le', 'utf-16-be',
- 'utf-32', 'utf-32-le', 'utf-32-be',
- )
-
- # Try a range of buffer sizes to test the case where \r is the last
- # character in TextIOWrapper._pending_line.
- for encoding in encodings:
- # XXX: str.encode() should return bytes
- data = bytes(''.join(input_lines).encode(encoding))
- for do_reads in (False, True):
- for bufsize in range(1, 10):
- for newline, exp_lines in tests:
- bufio = self.BufferedReader(self.BytesIO(data), bufsize)
- textio = self.TextIOWrapper(bufio, newline=newline,
- encoding=encoding)
- if do_reads:
- got_lines = []
- while True:
- c2 = textio.read(2)
- if c2 == '':
- break
- self.assertEqual(len(c2), 2)
- got_lines.append(c2 + textio.readline())
- else:
- got_lines = list(textio)
-
- for got_line, exp_line in zip(got_lines, exp_lines):
- self.assertEqual(got_line, exp_line)
- self.assertEqual(len(got_lines), len(exp_lines))
-
- def test_newlines_input(self):
- testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
- normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
- for newline, expected in [
- (None, normalized.decode("ascii").splitlines(keepends=True)),
- ("", testdata.decode("ascii").splitlines(keepends=True)),
- ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
- ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
- ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
- ]:
- buf = self.BytesIO(testdata)
- txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
- self.assertEqual(txt.readlines(), expected)
- txt.seek(0)
- self.assertEqual(txt.read(), "".join(expected))
-
- def test_newlines_output(self):
- testdict = {
- "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
- "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
- "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
- "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
- }
- tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
- for newline, expected in tests:
- buf = self.BytesIO()
- txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
- txt.write("AAA\nB")
- txt.write("BB\nCCC\n")
- txt.write("X\rY\r\nZ")
- txt.flush()
- self.assertEqual(buf.closed, False)
- self.assertEqual(buf.getvalue(), expected)
-
- def test_destructor(self):
- l = []
- base = self.BytesIO
- class MyBytesIO(base):
- def close(self):
- l.append(self.getvalue())
- base.close(self)
- b = MyBytesIO()
- t = self.TextIOWrapper(b, encoding="ascii")
- t.write("abc")
- del t
- support.gc_collect()
- self.assertEqual([b"abc"], l)
-
- def test_override_destructor(self):
- record = []
- class MyTextIO(self.TextIOWrapper):
- def __del__(self):
- record.append(1)
- try:
- f = super().__del__
- except AttributeError:
- pass
- else:
- f()
- def close(self):
- record.append(2)
- super().close()
- def flush(self):
- record.append(3)
- super().flush()
- b = self.BytesIO()
- t = MyTextIO(b, encoding="ascii")
- del t
- support.gc_collect()
- self.assertEqual(record, [1, 2, 3])
-
- def test_error_through_destructor(self):
- # Test that the exception state is not modified by a destructor,
- # even if close() fails.
- rawio = self.CloseFailureIO()
- with support.catch_unraisable_exception() as cm:
- with self.assertRaises(AttributeError):
- self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
-
- self.assertEqual(cm.unraisable.exc_type, OSError)
-
- # Systematic tests of the text I/O API
-
- def test_basic_io(self):
- for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
- for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
- f = self.open(os_helper.TESTFN, "w+", encoding=enc)
- f._CHUNK_SIZE = chunksize
- self.assertEqual(f.write("abc"), 3)
- f.close()
- f = self.open(os_helper.TESTFN, "r+", encoding=enc)
- f._CHUNK_SIZE = chunksize
- self.assertEqual(f.tell(), 0)
- self.assertEqual(f.read(), "abc")
- cookie = f.tell()
- self.assertEqual(f.seek(0), 0)
- self.assertEqual(f.read(None), "abc")
- f.seek(0)
- self.assertEqual(f.read(2), "ab")
- self.assertEqual(f.read(1), "c")
- self.assertEqual(f.read(1), "")
- self.assertEqual(f.read(), "")
- self.assertEqual(f.tell(), cookie)
- self.assertEqual(f.seek(0), 0)
- self.assertEqual(f.seek(0, 2), cookie)
- self.assertEqual(f.write("def"), 3)
- self.assertEqual(f.seek(cookie), cookie)
- self.assertEqual(f.read(), "def")
- if enc.startswith("utf"):
- self.multi_line_test(f, enc)
- f.close()
-
- def multi_line_test(self, f, enc):
- f.seek(0)
- f.truncate()
- sample = "s\xff\u0fff\uffff"
- wlines = []
- for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
- chars = []
- for i in range(size):
- chars.append(sample[i % len(sample)])
- line = "".join(chars) + "\n"
- wlines.append((f.tell(), line))
- f.write(line)
- f.seek(0)
- rlines = []
- while True:
- pos = f.tell()
- line = f.readline()
- if not line:
- break
- rlines.append((pos, line))
- self.assertEqual(rlines, wlines)
-
- def test_telling(self):
- f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
- p0 = f.tell()
- f.write("\xff\n")
- p1 = f.tell()
- f.write("\xff\n")
- p2 = f.tell()
- f.seek(0)
- self.assertEqual(f.tell(), p0)
- self.assertEqual(f.readline(), "\xff\n")
- self.assertEqual(f.tell(), p1)
- self.assertEqual(f.readline(), "\xff\n")
- self.assertEqual(f.tell(), p2)
- f.seek(0)
- for line in f:
- self.assertEqual(line, "\xff\n")
- self.assertRaises(OSError, f.tell)
- self.assertEqual(f.tell(), p2)
- f.close()
-
- def test_seeking(self):
- chunk_size = _default_chunk_size()
- prefix_size = chunk_size - 2
- u_prefix = "a" * prefix_size
- prefix = bytes(u_prefix.encode("utf-8"))
- self.assertEqual(len(u_prefix), len(prefix))
- u_suffix = "\u8888\n"
- suffix = bytes(u_suffix.encode("utf-8"))
- line = prefix + suffix
- with self.open(os_helper.TESTFN, "wb") as f:
- f.write(line*2)
- with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
- s = f.read(prefix_size)
- self.assertEqual(s, str(prefix, "ascii"))
- self.assertEqual(f.tell(), prefix_size)
- self.assertEqual(f.readline(), u_suffix)
-
- def test_seeking_too(self):
- # Regression test for a specific bug
- data = b'\xe0\xbf\xbf\n'
- with self.open(os_helper.TESTFN, "wb") as f:
- f.write(data)
- with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
- f._CHUNK_SIZE # Just test that it exists
- f._CHUNK_SIZE = 2
- f.readline()
- f.tell()
-
- def test_seek_and_tell(self):
- #Test seek/tell using the StatefulIncrementalDecoder.
- # Make test faster by doing smaller seeks
- CHUNK_SIZE = 128
-
- def test_seek_and_tell_with_data(data, min_pos=0):
- """Tell/seek to various points within a data stream and ensure
- that the decoded data returned by read() is consistent."""
- f = self.open(os_helper.TESTFN, 'wb')
- f.write(data)
- f.close()
- f = self.open(os_helper.TESTFN, encoding='test_decoder')
- f._CHUNK_SIZE = CHUNK_SIZE
- decoded = f.read()
- f.close()
-
- for i in range(min_pos, len(decoded) + 1): # seek positions
- for j in [1, 5, len(decoded) - i]: # read lengths
- f = self.open(os_helper.TESTFN, encoding='test_decoder')
- self.assertEqual(f.read(i), decoded[:i])
- cookie = f.tell()
- self.assertEqual(f.read(j), decoded[i:i + j])
- f.seek(cookie)
- self.assertEqual(f.read(), decoded[i:])
- f.close()
-
- # Enable the test decoder.
- StatefulIncrementalDecoder.codecEnabled = 1
-
- # Run the tests.
- try:
- # Try each test case.
- for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
- test_seek_and_tell_with_data(input)
-
- # Position each test case so that it crosses a chunk boundary.
- for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
- offset = CHUNK_SIZE - len(input)//2
- prefix = b'.'*offset
- # Don't bother seeking into the prefix (takes too long).
- min_pos = offset*2
- test_seek_and_tell_with_data(prefix + input, min_pos)
-
- # Ensure our test decoder won't interfere with subsequent tests.
- finally:
- StatefulIncrementalDecoder.codecEnabled = 0
-
- def test_multibyte_seek_and_tell(self):
- f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
- f.write("AB\n\u3046\u3048\n")
- f.close()
-
- f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
- self.assertEqual(f.readline(), "AB\n")
- p0 = f.tell()
- self.assertEqual(f.readline(), "\u3046\u3048\n")
- p1 = f.tell()
- f.seek(p0)
- self.assertEqual(f.readline(), "\u3046\u3048\n")
- self.assertEqual(f.tell(), p1)
- f.close()
-
- def test_seek_with_encoder_state(self):
- f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
- f.write("\u00e6\u0300")
- p0 = f.tell()
- f.write("\u00e6")
- f.seek(p0)
- f.write("\u0300")
- f.close()
-
- f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
- self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
- f.close()
-
- def test_encoded_writes(self):
- data = "1234567890"
- tests = ("utf-16",
- "utf-16-le",
- "utf-16-be",
- "utf-32",
- "utf-32-le",
- "utf-32-be")
- for encoding in tests:
- buf = self.BytesIO()
- f = self.TextIOWrapper(buf, encoding=encoding)
- # Check if the BOM is written only once (see issue1753).
- f.write(data)
- f.write(data)
- f.seek(0)
- self.assertEqual(f.read(), data * 2)
- f.seek(0)
- self.assertEqual(f.read(), data * 2)
- self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
-
- def test_unreadable(self):
- class UnReadable(self.BytesIO):
- def readable(self):
- return False
- txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
- self.assertRaises(OSError, txt.read)
-
- def test_read_one_by_one(self):
- txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
- reads = ""
- while True:
- c = txt.read(1)
- if not c:
- break
- reads += c
- self.assertEqual(reads, "AA\nBB")
-
- def test_readlines(self):
- txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
- self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
- txt.seek(0)
- self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
- txt.seek(0)
- self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
-
- # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
- def test_read_by_chunk(self):
- # make sure "\r\n" straddles 128 char boundary.
- txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
- reads = ""
- while True:
- c = txt.read(128)
- if not c:
- break
- reads += c
- self.assertEqual(reads, "A"*127+"\nB")
-
- def test_writelines(self):
- l = ['ab', 'cd', 'ef']
- buf = self.BytesIO()
- txt = self.TextIOWrapper(buf, encoding="utf-8")
- txt.writelines(l)
- txt.flush()
- self.assertEqual(buf.getvalue(), b'abcdef')
-
- def test_writelines_userlist(self):
- l = UserList(['ab', 'cd', 'ef'])
- buf = self.BytesIO()
- txt = self.TextIOWrapper(buf, encoding="utf-8")
- txt.writelines(l)
- txt.flush()
- self.assertEqual(buf.getvalue(), b'abcdef')
-
- def test_writelines_error(self):
- txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
- self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
- self.assertRaises(TypeError, txt.writelines, None)
- self.assertRaises(TypeError, txt.writelines, b'abc')
-
- def test_issue1395_1(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
-
- # read one char at a time
- reads = ""
- while True:
- c = txt.read(1)
- if not c:
- break
- reads += c
- self.assertEqual(reads, self.normalized)
-
- def test_issue1395_2(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- txt._CHUNK_SIZE = 4
-
- reads = ""
- while True:
- c = txt.read(4)
- if not c:
- break
- reads += c
- self.assertEqual(reads, self.normalized)
-
- def test_issue1395_3(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- txt._CHUNK_SIZE = 4
-
- reads = txt.read(4)
- reads += txt.read(4)
- reads += txt.readline()
- reads += txt.readline()
- reads += txt.readline()
- self.assertEqual(reads, self.normalized)
-
- def test_issue1395_4(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- txt._CHUNK_SIZE = 4
-
- reads = txt.read(4)
- reads += txt.read()
- self.assertEqual(reads, self.normalized)
-
- def test_issue1395_5(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- txt._CHUNK_SIZE = 4
-
- reads = txt.read(4)
- pos = txt.tell()
- txt.seek(0)
- txt.seek(pos)
- self.assertEqual(txt.read(4), "BBB\n")
-
- def test_issue2282(self):
- buffer = self.BytesIO(self.testdata)
- txt = self.TextIOWrapper(buffer, encoding="ascii")
-
- self.assertEqual(buffer.seekable(), txt.seekable())
-
- def test_append_bom(self):
- # The BOM is not written again when appending to a non-empty file
- filename = os_helper.TESTFN
- for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
- with self.open(filename, 'w', encoding=charset) as f:
- f.write('aaa')
- pos = f.tell()
- with self.open(filename, 'rb') as f:
- self.assertEqual(f.read(), 'aaa'.encode(charset))
-
- with self.open(filename, 'a', encoding=charset) as f:
- f.write('xxx')
- with self.open(filename, 'rb') as f:
- self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
-
- def test_seek_bom(self):
- # Same test, but when seeking manually
- filename = os_helper.TESTFN
- for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
- with self.open(filename, 'w', encoding=charset) as f:
- f.write('aaa')
- pos = f.tell()
- with self.open(filename, 'r+', encoding=charset) as f:
- f.seek(pos)
- f.write('zzz')
- f.seek(0)
- f.write('bbb')
- with self.open(filename, 'rb') as f:
- self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
-
- def test_seek_append_bom(self):
- # Same test, but first seek to the start and then to the end
- filename = os_helper.TESTFN
- for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
- with self.open(filename, 'w', encoding=charset) as f:
- f.write('aaa')
- with self.open(filename, 'a', encoding=charset) as f:
- f.seek(0)
- f.seek(0, self.SEEK_END)
- f.write('xxx')
- with self.open(filename, 'rb') as f:
- self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
-
- def test_errors_property(self):
- with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
- self.assertEqual(f.errors, "strict")
- with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
- self.assertEqual(f.errors, "replace")
-
- @support.no_tracing
- @threading_helper.requires_working_threading()
- def test_threads_write(self):
- # Issue6750: concurrent writes could duplicate data
- event = threading.Event()
- with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
- def run(n):
- text = "Thread%03d\n" % n
- event.wait()
- f.write(text)
- threads = [threading.Thread(target=run, args=(x,))
- for x in range(20)]
- with threading_helper.start_threads(threads, event.set):
- time.sleep(0.02)
- with self.open(os_helper.TESTFN, encoding="utf-8") as f:
- content = f.read()
- for n in range(20):
- self.assertEqual(content.count("Thread%03d\n" % n), 1)
-
- def test_flush_error_on_close(self):
- # Test that text file is closed despite failed flush
- # and that flush() is called before file closed.
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- closed = []
- def bad_flush():
- closed[:] = [txt.closed, txt.buffer.closed]
- raise OSError()
- txt.flush = bad_flush
- self.assertRaises(OSError, txt.close) # exception not swallowed
- self.assertTrue(txt.closed)
- self.assertTrue(txt.buffer.closed)
- self.assertTrue(closed) # flush() called
- self.assertFalse(closed[0]) # flush() called before file closed
- self.assertFalse(closed[1])
- txt.flush = lambda: None # break reference loop
-
- def test_close_error_on_close(self):
- buffer = self.BytesIO(self.testdata)
- def bad_flush():
- raise OSError('flush')
- def bad_close():
- raise OSError('close')
- buffer.close = bad_close
- txt = self.TextIOWrapper(buffer, encoding="ascii")
- txt.flush = bad_flush
- with self.assertRaises(OSError) as err: # exception not swallowed
- txt.close()
- self.assertEqual(err.exception.args, ('close',))
- self.assertIsInstance(err.exception.__context__, OSError)
- self.assertEqual(err.exception.__context__.args, ('flush',))
- self.assertFalse(txt.closed)
-
- # Silence destructor error
- buffer.close = lambda: None
- txt.flush = lambda: None
-
- def test_nonnormalized_close_error_on_close(self):
- # Issue #21677
- buffer = self.BytesIO(self.testdata)
- def bad_flush():
- raise non_existing_flush
- def bad_close():
- raise non_existing_close
- buffer.close = bad_close
- txt = self.TextIOWrapper(buffer, encoding="ascii")
- txt.flush = bad_flush
- with self.assertRaises(NameError) as err: # exception not swallowed
- txt.close()
- self.assertIn('non_existing_close', str(err.exception))
- self.assertIsInstance(err.exception.__context__, NameError)
- self.assertIn('non_existing_flush', str(err.exception.__context__))
- self.assertFalse(txt.closed)
-
- # Silence destructor error
- buffer.close = lambda: None
- txt.flush = lambda: None
-
- def test_multi_close(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- txt.close()
- txt.close()
- txt.close()
- self.assertRaises(ValueError, txt.flush)
-
- def test_unseekable(self):
- txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
- self.assertRaises(self.UnsupportedOperation, txt.tell)
- self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
-
- def test_readonly_attributes(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- buf = self.BytesIO(self.testdata)
- with self.assertRaises(AttributeError):
- txt.buffer = buf
-
- def test_rawio(self):
- # Issue #12591: TextIOWrapper must work with raw I/O objects, so
- # that subprocess.Popen() can have the required unbuffered
- # semantics with universal_newlines=True.
- raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
- txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
- # Reads
- self.assertEqual(txt.read(4), 'abcd')
- self.assertEqual(txt.readline(), 'efghi\n')
- self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
-
- def test_rawio_write_through(self):
- # Issue #12591: with write_through=True, writes don't need a flush
- raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
- txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
- write_through=True)
- txt.write('1')
- txt.write('23\n4')
- txt.write('5')
- self.assertEqual(b''.join(raw._write_stack), b'123\n45')
-
- def test_bufio_write_through(self):
- # Issue #21396: write_through=True doesn't force a flush()
- # on the underlying binary buffered object.
- flush_called, write_called = [], []
- class BufferedWriter(self.BufferedWriter):
- def flush(self, *args, **kwargs):
- flush_called.append(True)
- return super().flush(*args, **kwargs)
- def write(self, *args, **kwargs):
- write_called.append(True)
- return super().write(*args, **kwargs)
-
- rawio = self.BytesIO()
- data = b"a"
- bufio = BufferedWriter(rawio, len(data)*2)
- textio = self.TextIOWrapper(bufio, encoding='ascii',
- write_through=True)
- # write to the buffered io but don't overflow the buffer
- text = data.decode('ascii')
- textio.write(text)
-
- # buffer.flush is not called with write_through=True
- self.assertFalse(flush_called)
- # buffer.write *is* called with write_through=True
- self.assertTrue(write_called)
- self.assertEqual(rawio.getvalue(), b"") # no flush
-
- write_called = [] # reset
- textio.write(text * 10) # total content is larger than bufio buffer
- self.assertTrue(write_called)
- self.assertEqual(rawio.getvalue(), data * 11) # all flushed
-
- def test_reconfigure_write_through(self):
- raw = self.MockRawIO([])
- t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
- t.write('1')
- t.reconfigure(write_through=True) # implied flush
- self.assertEqual(t.write_through, True)
- self.assertEqual(b''.join(raw._write_stack), b'1')
- t.write('23')
- self.assertEqual(b''.join(raw._write_stack), b'123')
- t.reconfigure(write_through=False)
- self.assertEqual(t.write_through, False)
- t.write('45')
- t.flush()
- self.assertEqual(b''.join(raw._write_stack), b'12345')
- # Keeping default value
- t.reconfigure()
- t.reconfigure(write_through=None)
- self.assertEqual(t.write_through, False)
- t.reconfigure(write_through=True)
- t.reconfigure()
- t.reconfigure(write_through=None)
- self.assertEqual(t.write_through, True)
-
- def test_read_nonbytes(self):
- # Issue #17106
- # Crash when underlying read() returns non-bytes
- t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
- self.assertRaises(TypeError, t.read, 1)
- t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
- self.assertRaises(TypeError, t.readline)
- t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
- self.assertRaises(TypeError, t.read)
-
- def test_illegal_encoder(self):
- # Issue 31271: Calling write() while the return value of encoder's
- # encode() is invalid shouldn't cause an assertion failure.
- rot13 = codecs.lookup("rot13")
- with support.swap_attr(rot13, '_is_text_encoding', True):
- t = self.TextIOWrapper(self.BytesIO(b'foo'), encoding="rot13")
- self.assertRaises(TypeError, t.write, 'bar')
-
- def test_illegal_decoder(self):
- # Issue #17106
- # Bypass the early encoding check added in issue 20404
- def _make_illegal_wrapper():
- quopri = codecs.lookup("quopri")
- quopri._is_text_encoding = True
- try:
- t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
- newline='\n', encoding="quopri")
- finally:
- quopri._is_text_encoding = False
- return t
- # Crash when decoder returns non-string
- t = _make_illegal_wrapper()
- self.assertRaises(TypeError, t.read, 1)
- t = _make_illegal_wrapper()
- self.assertRaises(TypeError, t.readline)
- t = _make_illegal_wrapper()
- self.assertRaises(TypeError, t.read)
-
- # Issue 31243: calling read() while the return value of decoder's
- # getstate() is invalid should neither crash the interpreter nor
- # raise a SystemError.
- def _make_very_illegal_wrapper(getstate_ret_val):
- class BadDecoder:
- def getstate(self):
- return getstate_ret_val
- def _get_bad_decoder(dummy):
- return BadDecoder()
- quopri = codecs.lookup("quopri")
- with support.swap_attr(quopri, 'incrementaldecoder',
- _get_bad_decoder):
- return _make_illegal_wrapper()
- t = _make_very_illegal_wrapper(42)
- self.assertRaises(TypeError, t.read, 42)
- t = _make_very_illegal_wrapper(())
- self.assertRaises(TypeError, t.read, 42)
- t = _make_very_illegal_wrapper((1, 2))
- self.assertRaises(TypeError, t.read, 42)
-
- def _check_create_at_shutdown(self, **kwargs):
- # Issue #20037: creating a TextIOWrapper at shutdown
- # shouldn't crash the interpreter.
- iomod = self.io.__name__
- code = """if 1:
- import codecs
- import {iomod} as io
-
- # Avoid looking up codecs at shutdown
- codecs.lookup('utf-8')
-
- class C:
- def __del__(self):
- io.TextIOWrapper(io.BytesIO(), **{kwargs})
- print("ok")
- c = C()
- """.format(iomod=iomod, kwargs=kwargs)
- return assert_python_ok("-c", code)
-
- def test_create_at_shutdown_without_encoding(self):
- rc, out, err = self._check_create_at_shutdown()
- if err:
- # Can error out with a RuntimeError if the module state
- # isn't found.
- self.assertIn(self.shutdown_error, err.decode())
- else:
- self.assertEqual("ok", out.decode().strip())
-
- def test_create_at_shutdown_with_encoding(self):
- rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
- errors='strict')
- self.assertFalse(err)
- self.assertEqual("ok", out.decode().strip())
-
- def test_read_byteslike(self):
- r = MemviewBytesIO(b'Just some random string\n')
- t = self.TextIOWrapper(r, 'utf-8')
-
- # TextIOwrapper will not read the full string, because
- # we truncate it to a multiple of the native int size
- # so that we can construct a more complex memoryview.
- bytes_val = _to_memoryview(r.getvalue()).tobytes()
-
- self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
-
- def test_issue22849(self):
- class F(object):
- def readable(self): return True
- def writable(self): return True
- def seekable(self): return True
-
- for i in range(10):
- try:
- self.TextIOWrapper(F(), encoding='utf-8')
- except Exception:
- pass
-
- F.tell = lambda x: 0
- t = self.TextIOWrapper(F(), encoding='utf-8')
-
- def test_reconfigure_locale(self):
- wrapper = self.TextIOWrapper(self.BytesIO(b"test"))
- wrapper.reconfigure(encoding="locale")
-
- def test_reconfigure_encoding_read(self):
- # latin1 -> utf8
- # (latin1 can decode utf-8 encoded string)
- data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
- raw = self.BytesIO(data)
- txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
- self.assertEqual(txt.readline(), 'abc\xe9\n')
- with self.assertRaises(self.UnsupportedOperation):
- txt.reconfigure(encoding='utf-8')
- with self.assertRaises(self.UnsupportedOperation):
- txt.reconfigure(newline=None)
-
- def test_reconfigure_write_fromascii(self):
- # ascii has a specific encodefunc in the C implementation,
- # but utf-8-sig has not. Make sure that we get rid of the
- # cached encodefunc when we switch encoders.
- raw = self.BytesIO()
- txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
- txt.write('foo\n')
- txt.reconfigure(encoding='utf-8-sig')
- txt.write('\xe9\n')
- txt.flush()
- self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
-
- def test_reconfigure_write(self):
- # latin -> utf8
- raw = self.BytesIO()
- txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
- txt.write('abc\xe9\n')
- txt.reconfigure(encoding='utf-8')
- self.assertEqual(raw.getvalue(), b'abc\xe9\n')
- txt.write('d\xe9f\n')
- txt.flush()
- self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
-
- # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
- # the file
- raw = self.BytesIO()
- txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
- txt.write('abc\n')
- txt.reconfigure(encoding='utf-8-sig')
- txt.write('d\xe9f\n')
- txt.flush()
- self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
-
- def test_reconfigure_write_non_seekable(self):
- raw = self.BytesIO()
- raw.seekable = lambda: False
- raw.seek = None
- txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
- txt.write('abc\n')
- txt.reconfigure(encoding='utf-8-sig')
- txt.write('d\xe9f\n')
- txt.flush()
-
- # If the raw stream is not seekable, there'll be a BOM
- self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
-
- def test_reconfigure_defaults(self):
- txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
- txt.reconfigure(encoding=None)
- self.assertEqual(txt.encoding, 'ascii')
- self.assertEqual(txt.errors, 'replace')
- txt.write('LF\n')
-
- txt.reconfigure(newline='\r\n')
- self.assertEqual(txt.encoding, 'ascii')
- self.assertEqual(txt.errors, 'replace')
-
- txt.reconfigure(errors='ignore')
- self.assertEqual(txt.encoding, 'ascii')
- self.assertEqual(txt.errors, 'ignore')
- txt.write('CRLF\n')
-
- txt.reconfigure(encoding='utf-8', newline=None)
- self.assertEqual(txt.errors, 'strict')
- txt.seek(0)
- self.assertEqual(txt.read(), 'LF\nCRLF\n')
-
- self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
-
- def test_reconfigure_errors(self):
- txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r')
- with self.assertRaises(TypeError): # there was a crash
- txt.reconfigure(encoding=42)
- if self.is_C:
- with self.assertRaises(UnicodeEncodeError):
- txt.reconfigure(encoding='\udcfe')
- with self.assertRaises(LookupError):
- txt.reconfigure(encoding='locale\0')
- # TODO: txt.reconfigure(encoding='utf-8\0')
- # TODO: txt.reconfigure(encoding='nonexisting')
- with self.assertRaises(TypeError):
- txt.reconfigure(errors=42)
- if self.is_C:
- with self.assertRaises(UnicodeEncodeError):
- txt.reconfigure(errors='\udcfe')
- # TODO: txt.reconfigure(errors='ignore\0')
- # TODO: txt.reconfigure(errors='nonexisting')
- with self.assertRaises(TypeError):
- txt.reconfigure(newline=42)
- with self.assertRaises(ValueError):
- txt.reconfigure(newline='\udcfe')
- with self.assertRaises(ValueError):
- txt.reconfigure(newline='xyz')
- if not self.is_C:
- # TODO: Should fail in C too.
- with self.assertRaises(ValueError):
- txt.reconfigure(newline='\n\0')
- if self.is_C:
- # TODO: Use __bool__(), not __index__().
- with self.assertRaises(ZeroDivisionError):
- txt.reconfigure(line_buffering=BadIndex())
- with self.assertRaises(OverflowError):
- txt.reconfigure(line_buffering=2**1000)
- with self.assertRaises(ZeroDivisionError):
- txt.reconfigure(write_through=BadIndex())
- with self.assertRaises(OverflowError):
- txt.reconfigure(write_through=2**1000)
- with self.assertRaises(ZeroDivisionError): # there was a crash
- txt.reconfigure(line_buffering=BadIndex(),
- write_through=BadIndex())
- self.assertEqual(txt.encoding, 'ascii')
- self.assertEqual(txt.errors, 'replace')
- self.assertIs(txt.line_buffering, False)
- self.assertIs(txt.write_through, False)
-
- txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n',
- line_buffering=True, write_through=True)
- self.assertEqual(txt.encoding, 'latin1')
- self.assertEqual(txt.errors, 'ignore')
- self.assertIs(txt.line_buffering, True)
- self.assertIs(txt.write_through, True)
-
- def test_reconfigure_newline(self):
- raw = self.BytesIO(b'CR\rEOF')
- txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
- txt.reconfigure(newline=None)
- self.assertEqual(txt.readline(), 'CR\n')
- raw = self.BytesIO(b'CR\rEOF')
- txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
- txt.reconfigure(newline='')
- self.assertEqual(txt.readline(), 'CR\r')
- raw = self.BytesIO(b'CR\rLF\nEOF')
- txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
- txt.reconfigure(newline='\n')
- self.assertEqual(txt.readline(), 'CR\rLF\n')
- raw = self.BytesIO(b'LF\nCR\rEOF')
- txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
- txt.reconfigure(newline='\r')
- self.assertEqual(txt.readline(), 'LF\nCR\r')
- raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
- txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
- txt.reconfigure(newline='\r\n')
- self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
-
- txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
- txt.reconfigure(newline=None)
- txt.write('linesep\n')
- txt.reconfigure(newline='')
- txt.write('LF\n')
- txt.reconfigure(newline='\n')
- txt.write('LF\n')
- txt.reconfigure(newline='\r')
- txt.write('CR\n')
- txt.reconfigure(newline='\r\n')
- txt.write('CRLF\n')
- expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
- self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
-
- def test_issue25862(self):
- # Assertion failures occurred in tell() after read() and write().
- t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
- t.read(1)
- t.read()
- t.tell()
- t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
- t.read(1)
- t.write('x')
- t.tell()
-
- def test_issue35928(self):
- p = self.BufferedRWPair(self.BytesIO(b'foo\nbar\n'), self.BytesIO())
- f = self.TextIOWrapper(p)
- res = f.readline()
- self.assertEqual(res, 'foo\n')
- f.write(res)
- self.assertEqual(res + f.readline(), 'foo\nbar\n')
-
- def test_pickling_subclass(self):
- global MyTextIO
- class MyTextIO(self.TextIOWrapper):
- def __init__(self, raw, tag):
- super().__init__(raw)
- self.tag = tag
- def __getstate__(self):
- return self.tag, self.buffer.getvalue()
- def __setstate__(slf, state):
- tag, value = state
- slf.__init__(self.BytesIO(value), tag)
-
- raw = self.BytesIO(b'data')
- txt = MyTextIO(raw, 'ham')
- for proto in range(pickle.HIGHEST_PROTOCOL + 1):
- with self.subTest(protocol=proto):
- pickled = pickle.dumps(txt, proto)
- newtxt = pickle.loads(pickled)
- self.assertEqual(newtxt.buffer.getvalue(), b'data')
- self.assertEqual(newtxt.tag, 'ham')
- del MyTextIO
-
- @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
- def test_read_non_blocking(self):
- import os
- r, w = os.pipe()
- try:
- os.set_blocking(r, False)
- with self.io.open(r, 'rt') as textfile:
- r = None
- # Nothing has been written so a non-blocking read raises a BlockingIOError exception.
- with self.assertRaises(BlockingIOError):
- textfile.read()
- finally:
- if r is not None:
- os.close(r)
- os.close(w)
-
-
-class MemviewBytesIO(io.BytesIO):
- '''A BytesIO object whose read method returns memoryviews
- rather than bytes'''
-
- def read1(self, len_):
- return _to_memoryview(super().read1(len_))
-
- def read(self, len_):
- return _to_memoryview(super().read(len_))
-
-def _to_memoryview(buf):
- '''Convert bytes-object *buf* to a non-trivial memoryview'''
-
- arr = array.array('i')
- idx = len(buf) - len(buf) % arr.itemsize
- arr.frombytes(buf[:idx])
- return memoryview(arr)
-
-
-class CTextIOWrapperTest(TextIOWrapperTest, CTestCase):
- shutdown_error = "LookupError: unknown encoding: ascii"
-
- def test_initialization(self):
- r = self.BytesIO(b"\xc3\xa9\n\n")
- b = self.BufferedReader(r, 1000)
- t = self.TextIOWrapper(b, encoding="utf-8")
- self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
- self.assertRaises(ValueError, t.read)
-
- t = self.TextIOWrapper.__new__(self.TextIOWrapper)
- self.assertRaises(Exception, repr, t)
-
- def test_garbage_collection(self):
- # C TextIOWrapper objects are collected, and collecting them flushes
- # all data to disk.
- # The Python version has __del__, so it ends in gc.garbage instead.
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", ResourceWarning)
- rawio = self.FileIO(os_helper.TESTFN, "wb")
- b = self.BufferedWriter(rawio)
- t = self.TextIOWrapper(b, encoding="ascii")
- t.write("456def")
- t.x = t
- wr = weakref.ref(t)
- del t
- support.gc_collect()
- self.assertIsNone(wr(), wr)
- with self.open(os_helper.TESTFN, "rb") as f:
- self.assertEqual(f.read(), b"456def")
-
- def test_rwpair_cleared_before_textio(self):
- # Issue 13070: TextIOWrapper's finalization would crash when called
- # after the reference to the underlying BufferedRWPair's writer got
- # cleared by the GC.
- for i in range(1000):
- b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
- t1 = self.TextIOWrapper(b1, encoding="ascii")
- b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
- t2 = self.TextIOWrapper(b2, encoding="ascii")
- # circular references
- t1.buddy = t2
- t2.buddy = t1
- support.gc_collect()
-
- def test_del__CHUNK_SIZE_SystemError(self):
- t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
- with self.assertRaises(AttributeError):
- del t._CHUNK_SIZE
-
- def test_internal_buffer_size(self):
- # bpo-43260: TextIOWrapper's internal buffer should not store
- # data larger than chunk size.
- chunk_size = 8192 # default chunk size, updated later
-
- class MockIO(self.MockRawIO):
- def write(self, data):
- if len(data) > chunk_size:
- raise RuntimeError
- return super().write(data)
-
- buf = MockIO()
- t = self.TextIOWrapper(buf, encoding="ascii")
- chunk_size = t._CHUNK_SIZE
- t.write("abc")
- t.write("def")
- # default chunk size is 8192 bytes so t don't write data to buf.
- self.assertEqual([], buf._write_stack)
-
- with self.assertRaises(RuntimeError):
- t.write("x"*(chunk_size+1))
-
- self.assertEqual([b"abcdef"], buf._write_stack)
- t.write("ghi")
- t.write("x"*chunk_size)
- self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
-
- def test_issue119506(self):
- chunk_size = 8192
-
- class MockIO(self.MockRawIO):
- written = False
- def write(self, data):
- if not self.written:
- self.written = True
- t.write("middle")
- return super().write(data)
-
- buf = MockIO()
- t = self.TextIOWrapper(buf)
- t.write("abc")
- t.write("def")
- # writing data which size >= chunk_size cause flushing buffer before write.
- t.write("g" * chunk_size)
- t.flush()
-
- self.assertEqual([b"abcdef", b"middle", b"g"*chunk_size],
- buf._write_stack)
-
-
-class PyTextIOWrapperTest(TextIOWrapperTest, PyTestCase):
- shutdown_error = "LookupError: unknown encoding: ascii"
-
-
-class IncrementalNewlineDecoderTest:
-
- def check_newline_decoding_utf8(self, decoder):
- # UTF-8 specific tests for a newline decoder
- def _check_decode(b, s, **kwargs):
- # We exercise getstate() / setstate() as well as decode()
- state = decoder.getstate()
- self.assertEqual(decoder.decode(b, **kwargs), s)
- decoder.setstate(state)
- self.assertEqual(decoder.decode(b, **kwargs), s)
-
- _check_decode(b'\xe8\xa2\x88', "\u8888")
-
- _check_decode(b'\xe8', "")
- _check_decode(b'\xa2', "")
- _check_decode(b'\x88', "\u8888")
-
- _check_decode(b'\xe8', "")
- _check_decode(b'\xa2', "")
- _check_decode(b'\x88', "\u8888")
-
- _check_decode(b'\xe8', "")
- self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
-
- decoder.reset()
- _check_decode(b'\n', "\n")
- _check_decode(b'\r', "")
- _check_decode(b'', "\n", final=True)
- _check_decode(b'\r', "\n", final=True)
-
- _check_decode(b'\r', "")
- _check_decode(b'a', "\na")
-
- _check_decode(b'\r\r\n', "\n\n")
- _check_decode(b'\r', "")
- _check_decode(b'\r', "\n")
- _check_decode(b'\na', "\na")
-
- _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
- _check_decode(b'\xe8\xa2\x88', "\u8888")
- _check_decode(b'\n', "\n")
- _check_decode(b'\xe8\xa2\x88\r', "\u8888")
- _check_decode(b'\n', "\n")
-
- def check_newline_decoding(self, decoder, encoding):
- result = []
- if encoding is not None:
- encoder = codecs.getincrementalencoder(encoding)()
- def _decode_bytewise(s):
- # Decode one byte at a time
- for b in encoder.encode(s):
- result.append(decoder.decode(bytes([b])))
- else:
- encoder = None
- def _decode_bytewise(s):
- # Decode one char at a time
- for c in s:
- result.append(decoder.decode(c))
- self.assertEqual(decoder.newlines, None)
- _decode_bytewise("abc\n\r")
- self.assertEqual(decoder.newlines, '\n')
- _decode_bytewise("\nabc")
- self.assertEqual(decoder.newlines, ('\n', '\r\n'))
- _decode_bytewise("abc\r")
- self.assertEqual(decoder.newlines, ('\n', '\r\n'))
- _decode_bytewise("abc")
- self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
- _decode_bytewise("abc\r")
- self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
- decoder.reset()
- input = "abc"
- if encoder is not None:
- encoder.reset()
- input = encoder.encode(input)
- self.assertEqual(decoder.decode(input), "abc")
- self.assertEqual(decoder.newlines, None)
-
- def test_newline_decoder(self):
- encodings = (
- # None meaning the IncrementalNewlineDecoder takes unicode input
- # rather than bytes input
- None, 'utf-8', 'latin-1',
- 'utf-16', 'utf-16-le', 'utf-16-be',
- 'utf-32', 'utf-32-le', 'utf-32-be',
- )
- for enc in encodings:
- decoder = enc and codecs.getincrementaldecoder(enc)()
- decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
- self.check_newline_decoding(decoder, enc)
- decoder = codecs.getincrementaldecoder("utf-8")()
- decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
- self.check_newline_decoding_utf8(decoder)
- self.assertRaises(TypeError, decoder.setstate, 42)
-
- def test_newline_bytes(self):
- # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
- def _check(dec):
- self.assertEqual(dec.newlines, None)
- self.assertEqual(dec.decode("\u0D00"), "\u0D00")
- self.assertEqual(dec.newlines, None)
- self.assertEqual(dec.decode("\u0A00"), "\u0A00")
- self.assertEqual(dec.newlines, None)
- dec = self.IncrementalNewlineDecoder(None, translate=False)
- _check(dec)
- dec = self.IncrementalNewlineDecoder(None, translate=True)
- _check(dec)
-
- def test_translate(self):
- # issue 35062
- for translate in (-2, -1, 1, 2):
- decoder = codecs.getincrementaldecoder("utf-8")()
- decoder = self.IncrementalNewlineDecoder(decoder, translate)
- self.check_newline_decoding_utf8(decoder)
- decoder = codecs.getincrementaldecoder("utf-8")()
- decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
- self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
-
-class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
- IncrementalNewlineDecoder = io.IncrementalNewlineDecoder
-
- @support.cpython_only
- def test_uninitialized(self):
- uninitialized = self.IncrementalNewlineDecoder.__new__(
- self.IncrementalNewlineDecoder)
- self.assertRaises(ValueError, uninitialized.decode, b'bar')
- self.assertRaises(ValueError, uninitialized.getstate)
- self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0))
- self.assertRaises(ValueError, uninitialized.reset)
-
-
-class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
- IncrementalNewlineDecoder = pyio.IncrementalNewlineDecoder
-
-
# XXX Tests for open()
class MiscIOTest:
diff --git a/Lib/test/test_io/test_textio.py b/Lib/test/test_io/test_textio.py
new file mode 100644
index 00000000000000..d8d0928b4ba69b
--- /dev/null
+++ b/Lib/test/test_io/test_textio.py
@@ -0,0 +1,1664 @@
+import array
+import codecs
+import locale
+import os
+import pickle
+import sys
+import threading
+import time
+import unittest
+import warnings
+import weakref
+from collections import UserList
+from test import support
+from test.support import os_helper, threading_helper
+from test.support.script_helper import assert_python_ok
+from .utils import CTestCase, PyTestCase
+
+import io # C implementation of io
+import _pyio as pyio # Python implementation of io
+
+
+def _default_chunk_size():
+ """Get the default TextIOWrapper chunk size"""
+ with open(__file__, "r", encoding="latin-1") as f:
+ return f._CHUNK_SIZE
+
+
+class BadIndex:
+ def __index__(self):
+ 1/0
+
+
+# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
+# properties:
+# - A single output character can correspond to many bytes of input.
+# - The number of input bytes to complete the character can be
+# undetermined until the last input byte is received.
+# - The number of input bytes can vary depending on previous input.
+# - A single input byte can correspond to many characters of output.
+# - The number of output characters can be undetermined until the
+# last input byte is received.
+# - The number of output characters can vary depending on previous input.
+
+class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
+ """
+ For testing seek/tell behavior with a stateful, buffering decoder.
+
+ Input is a sequence of words. Words may be fixed-length (length set
+ by input) or variable-length (period-terminated). In variable-length
+ mode, extra periods are ignored. Possible words are:
+ - 'i' followed by a number sets the input length, I (maximum 99).
+ When I is set to 0, words are space-terminated.
+ - 'o' followed by a number sets the output length, O (maximum 99).
+ - Any other word is converted into a word followed by a period on
+ the output. The output word consists of the input word truncated
+ or padded out with hyphens to make its length equal to O. If O
+ is 0, the word is output verbatim without truncating or padding.
+ I and O are initially set to 1. When I changes, any buffered input is
+ re-scanned according to the new I. EOF also terminates the last word.
+ """
+
+ def __init__(self, errors='strict'):
+ codecs.IncrementalDecoder.__init__(self, errors)
+ self.reset()
+
+ def __repr__(self):
+ return '<SID %x>' % id(self)
+
+ def reset(self):
+ self.i = 1
+ self.o = 1
+ self.buffer = bytearray()
+
+ def getstate(self):
+ i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
+ return bytes(self.buffer), i*100 + o
+
+ def setstate(self, state):
+ buffer, io = state
+ self.buffer = bytearray(buffer)
+ i, o = divmod(io, 100)
+ self.i, self.o = i ^ 1, o ^ 1
+
+ def decode(self, input, final=False):
+ output = ''
+ for b in input:
+ if self.i == 0: # variable-length, terminated with period
+ if b == ord('.'):
+ if self.buffer:
+ output += self.process_word()
+ else:
+ self.buffer.append(b)
+ else: # fixed-length, terminate after self.i bytes
+ self.buffer.append(b)
+ if len(self.buffer) == self.i:
+ output += self.process_word()
+ if final and self.buffer: # EOF terminates the last word
+ output += self.process_word()
+ return output
+
+ def process_word(self):
+ output = ''
+ if self.buffer[0] == ord('i'):
+ self.i = min(99, int(self.buffer[1:] or 0)) # set input length
+ elif self.buffer[0] == ord('o'):
+ self.o = min(99, int(self.buffer[1:] or 0)) # set output length
+ else:
+ output = self.buffer.decode('ascii')
+ if len(output) < self.o:
+ output += '-'*self.o # pad out with hyphens
+ if self.o:
+ output = output[:self.o] # truncate to output length
+ output += '.'
+ self.buffer = bytearray()
+ return output
+
+ codecEnabled = False
+
+
+# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
+# when registering codecs and cleanup functions.
+def lookupTestDecoder(name):
+ if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
+ latin1 = codecs.lookup('latin-1')
+ return codecs.CodecInfo(
+ name='test_decoder', encode=latin1.encode, decode=None,
+ incrementalencoder=None,
+ streamreader=None, streamwriter=None,
+ incrementaldecoder=StatefulIncrementalDecoder)
+
+
+class StatefulIncrementalDecoderTest(unittest.TestCase):
+ """
+ Make sure the StatefulIncrementalDecoder actually works.
+ """
+
+ test_cases = [
+ # I=1, O=1 (fixed-length input == fixed-length output)
+ (b'abcd', False, 'a.b.c.d.'),
+ # I=0, O=0 (variable-length input, variable-length output)
+ (b'oiabcd', True, 'abcd.'),
+ # I=0, O=0 (should ignore extra periods)
+ (b'oi...abcd...', True, 'abcd.'),
+ # I=0, O=6 (variable-length input, fixed-length output)
+ (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
+ # I=2, O=6 (fixed-length input < fixed-length output)
+ (b'i.i2.o6xyz', True, 'xy----.z-----.'),
+ # I=6, O=3 (fixed-length input > fixed-length output)
+ (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
+ # I=0, then 3; O=29, then 15 (with longer output)
+ (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
+ 'a----------------------------.' +
+ 'b----------------------------.' +
+ 'cde--------------------------.' +
+ 'abcdefghijabcde.' +
+ 'a.b------------.' +
+ '.c.------------.' +
+ 'd.e------------.' +
+ 'k--------------.' +
+ 'l--------------.' +
+ 'm--------------.')
+ ]
+
+ def test_decoder(self):
+ # Try a few one-shot test cases.
+ for input, eof, output in self.test_cases:
+ d = StatefulIncrementalDecoder()
+ self.assertEqual(d.decode(input, eof), output)
+
+ # Also test an unfinished decode, followed by forcing EOF.
+ d = StatefulIncrementalDecoder()
+ self.assertEqual(d.decode(b'oiabcd'), '')
+ self.assertEqual(d.decode(b'', 1), 'abcd.')
+
+class TextIOWrapperTest:
+
+ def setUp(self):
+ self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
+ self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
+ os_helper.unlink(os_helper.TESTFN)
+ codecs.register(lookupTestDecoder)
+ self.addCleanup(codecs.unregister, lookupTestDecoder)
+
+ def tearDown(self):
+ os_helper.unlink(os_helper.TESTFN)
+
+ def test_constructor(self):
+ r = self.BytesIO(b"\xc3\xa9\n\n")
+ b = self.BufferedReader(r, 1000)
+ t = self.TextIOWrapper(b, encoding="utf-8")
+ t.__init__(b, encoding="latin-1", newline="\r\n")
+ self.assertEqual(t.encoding, "latin-1")
+ self.assertEqual(t.line_buffering, False)
+ t.__init__(b, encoding="utf-8", line_buffering=True)
+ self.assertEqual(t.encoding, "utf-8")
+ self.assertEqual(t.line_buffering, True)
+ self.assertEqual("\xe9\n", t.readline())
+ invalid_type = TypeError if self.is_C else ValueError
+ with self.assertRaises(invalid_type):
+ t.__init__(b, encoding=42)
+ with self.assertRaises(UnicodeEncodeError):
+ t.__init__(b, encoding='\udcfe')
+ with self.assertRaises(ValueError):
+ t.__init__(b, encoding='utf-8\0')
+ with self.assertRaises(invalid_type):
+ t.__init__(b, encoding="utf-8", errors=42)
+ if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
+ with self.assertRaises(UnicodeEncodeError):
+ t.__init__(b, encoding="utf-8", errors='\udcfe')
+ if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
+ with self.assertRaises(ValueError):
+ t.__init__(b, encoding="utf-8", errors='replace\0')
+ with self.assertRaises(TypeError):
+ t.__init__(b, encoding="utf-8", newline=42)
+ with self.assertRaises(ValueError):
+ t.__init__(b, encoding="utf-8", newline='\udcfe')
+ with self.assertRaises(ValueError):
+ t.__init__(b, encoding="utf-8", newline='\n\0')
+ with self.assertRaises(ValueError):
+ t.__init__(b, encoding="utf-8", newline='xyzzy')
+
+ def test_uninitialized(self):
+ t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+ del t
+ t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+ self.assertRaises(Exception, repr, t)
+ self.assertRaisesRegex((ValueError, AttributeError),
+ 'uninitialized|has no attribute',
+ t.read, 0)
+ t.__init__(self.MockRawIO(), encoding="utf-8")
+ self.assertEqual(t.read(0), '')
+
+ def test_non_text_encoding_codecs_are_rejected(self):
+ # Ensure the constructor complains if passed a codec that isn't
+ # marked as a text encoding
+ # http://bugs.python.org/issue20404
+ r = self.BytesIO()
+ b = self.BufferedWriter(r)
+ with self.assertRaisesRegex(LookupError, "is not a text encoding"):
+ self.TextIOWrapper(b, encoding="hex")
+
+ def test_detach(self):
+ r = self.BytesIO()
+ b = self.BufferedWriter(r)
+ t = self.TextIOWrapper(b, encoding="ascii")
+ self.assertIs(t.detach(), b)
+
+ t = self.TextIOWrapper(b, encoding="ascii")
+ t.write("howdy")
+ self.assertFalse(r.getvalue())
+ t.detach()
+ self.assertEqual(r.getvalue(), b"howdy")
+ self.assertRaises(ValueError, t.detach)
+
+ # Operations independent of the detached stream should still work
+ repr(t)
+ self.assertEqual(t.encoding, "ascii")
+ self.assertEqual(t.errors, "strict")
+ self.assertFalse(t.line_buffering)
+ self.assertFalse(t.write_through)
+
+ def test_repr(self):
+ raw = self.BytesIO("hello".encode("utf-8"))
+ b = self.BufferedReader(raw)
+ t = self.TextIOWrapper(b, encoding="utf-8")
+ modname = self.TextIOWrapper.__module__
+ self.assertRegex(repr(t),
+ r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
+ raw.name = "dummy"
+ self.assertRegex(repr(t),
+ r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
+ t.mode = "r"
+ self.assertRegex(repr(t),
+ r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
+ raw.name = b"dummy"
+ self.assertRegex(repr(t),
+ r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
+
+ t.buffer.detach()
+ repr(t) # Should not raise an exception
+
+ def test_recursive_repr(self):
+ # Issue #25455
+ raw = self.BytesIO()
+ t = self.TextIOWrapper(raw, encoding="utf-8")
+ with support.swap_attr(raw, 'name', t), support.infinite_recursion(25):
+ with self.assertRaises(RuntimeError):
+ repr(t) # Should not crash
+
+ def test_subclass_repr(self):
+ class TestSubclass(self.TextIOWrapper):
+ pass
+
+ f = TestSubclass(self.StringIO())
+ self.assertIn(TestSubclass.__name__, repr(f))
+
+ def test_line_buffering(self):
+ r = self.BytesIO()
+ b = self.BufferedWriter(r, 1000)
+ t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
+ t.write("X")
+ self.assertEqual(r.getvalue(), b"") # No flush happened
+ t.write("Y\nZ")
+ self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
+ t.write("A\rB")
+ self.assertEqual(r.getvalue(), b"XY\nZA\rB")
+
+ def test_reconfigure_line_buffering(self):
+ r = self.BytesIO()
+ b = self.BufferedWriter(r, 1000)
+ t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
+ t.write("AB\nC")
+ self.assertEqual(r.getvalue(), b"")
+
+ t.reconfigure(line_buffering=True) # implicit flush
+ self.assertEqual(r.getvalue(), b"AB\nC")
+ t.write("DEF\nG")
+ self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
+ t.write("H")
+ self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
+ t.reconfigure(line_buffering=False) # implicit flush
+ self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
+ t.write("IJ")
+ self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
+
+ # Keeping default value
+ t.reconfigure()
+ t.reconfigure(line_buffering=None)
+ self.assertEqual(t.line_buffering, False)
+ t.reconfigure(line_buffering=True)
+ t.reconfigure()
+ t.reconfigure(line_buffering=None)
+ self.assertEqual(t.line_buffering, True)
+
+ @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
+ def test_default_encoding(self):
+ with os_helper.EnvironmentVarGuard() as env:
+ # try to get a user preferred encoding different than the current
+ # locale encoding to check that TextIOWrapper() uses the current
+ # locale encoding and not the user preferred encoding
+ env.unset('LC_ALL', 'LANG', 'LC_CTYPE')
+
+ current_locale_encoding = locale.getencoding()
+ b = self.BytesIO()
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", EncodingWarning)
+ t = self.TextIOWrapper(b)
+ self.assertEqual(t.encoding, current_locale_encoding)
+
+ def test_encoding(self):
+ # Check the encoding attribute is always set, and valid
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b, encoding="utf-8")
+ self.assertEqual(t.encoding, "utf-8")
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", EncodingWarning)
+ t = self.TextIOWrapper(b)
+ self.assertIsNotNone(t.encoding)
+ codecs.lookup(t.encoding)
+
+ def test_encoding_errors_reading(self):
+ # (1) default
+ b = self.BytesIO(b"abc\n\xff\n")
+ t = self.TextIOWrapper(b, encoding="ascii")
+ self.assertRaises(UnicodeError, t.read)
+ # (2) explicit strict
+ b = self.BytesIO(b"abc\n\xff\n")
+ t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
+ self.assertRaises(UnicodeError, t.read)
+ # (3) ignore
+ b = self.BytesIO(b"abc\n\xff\n")
+ t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
+ self.assertEqual(t.read(), "abc\n\n")
+ # (4) replace
+ b = self.BytesIO(b"abc\n\xff\n")
+ t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
+ self.assertEqual(t.read(), "abc\n\ufffd\n")
+
+ def test_encoding_errors_writing(self):
+ # (1) default
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b, encoding="ascii")
+ self.assertRaises(UnicodeError, t.write, "\xff")
+ # (2) explicit strict
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
+ self.assertRaises(UnicodeError, t.write, "\xff")
+ # (3) ignore
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
+ newline="\n")
+ t.write("abc\xffdef\n")
+ t.flush()
+ self.assertEqual(b.getvalue(), b"abcdef\n")
+ # (4) replace
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
+ newline="\n")
+ t.write("abc\xffdef\n")
+ t.flush()
+ self.assertEqual(b.getvalue(), b"abc?def\n")
+
+ def test_newlines(self):
+ input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
+
+ tests = [
+ [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
+ [ '', input_lines ],
+ [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
+ [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
+ [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
+ ]
+ encodings = (
+ 'utf-8', 'latin-1',
+ 'utf-16', 'utf-16-le', 'utf-16-be',
+ 'utf-32', 'utf-32-le', 'utf-32-be',
+ )
+
+ # Try a range of buffer sizes to test the case where \r is the last
+ # character in TextIOWrapper._pending_line.
+ for encoding in encodings:
+ # XXX: str.encode() should return bytes
+ data = bytes(''.join(input_lines).encode(encoding))
+ for do_reads in (False, True):
+ for bufsize in range(1, 10):
+ for newline, exp_lines in tests:
+ bufio = self.BufferedReader(self.BytesIO(data), bufsize)
+ textio = self.TextIOWrapper(bufio, newline=newline,
+ encoding=encoding)
+ if do_reads:
+ got_lines = []
+ while True:
+ c2 = textio.read(2)
+ if c2 == '':
+ break
+ self.assertEqual(len(c2), 2)
+ got_lines.append(c2 + textio.readline())
+ else:
+ got_lines = list(textio)
+
+ for got_line, exp_line in zip(got_lines, exp_lines):
+ self.assertEqual(got_line, exp_line)
+ self.assertEqual(len(got_lines), len(exp_lines))
+
+ def test_newlines_input(self):
+ testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
+ normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
+ for newline, expected in [
+ (None, normalized.decode("ascii").splitlines(keepends=True)),
+ ("", testdata.decode("ascii").splitlines(keepends=True)),
+ ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+ ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+ ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
+ ]:
+ buf = self.BytesIO(testdata)
+ txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
+ self.assertEqual(txt.readlines(), expected)
+ txt.seek(0)
+ self.assertEqual(txt.read(), "".join(expected))
+
+ def test_newlines_output(self):
+ testdict = {
+ "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+ "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+ "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
+ "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
+ }
+ tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
+ for newline, expected in tests:
+ buf = self.BytesIO()
+ txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
+ txt.write("AAA\nB")
+ txt.write("BB\nCCC\n")
+ txt.write("X\rY\r\nZ")
+ txt.flush()
+ self.assertEqual(buf.closed, False)
+ self.assertEqual(buf.getvalue(), expected)
+
+ def test_destructor(self):
+ l = []
+ base = self.BytesIO
+ class MyBytesIO(base):
+ def close(self):
+ l.append(self.getvalue())
+ base.close(self)
+ b = MyBytesIO()
+ t = self.TextIOWrapper(b, encoding="ascii")
+ t.write("abc")
+ del t
+ support.gc_collect()
+ self.assertEqual([b"abc"], l)
+
+ def test_override_destructor(self):
+ record = []
+ class MyTextIO(self.TextIOWrapper):
+ def __del__(self):
+ record.append(1)
+ try:
+ f = super().__del__
+ except AttributeError:
+ pass
+ else:
+ f()
+ def close(self):
+ record.append(2)
+ super().close()
+ def flush(self):
+ record.append(3)
+ super().flush()
+ b = self.BytesIO()
+ t = MyTextIO(b, encoding="ascii")
+ del t
+ support.gc_collect()
+ self.assertEqual(record, [1, 2, 3])
+
+ def test_error_through_destructor(self):
+ # Test that the exception state is not modified by a destructor,
+ # even if close() fails.
+ rawio = self.CloseFailureIO()
+ with support.catch_unraisable_exception() as cm:
+ with self.assertRaises(AttributeError):
+ self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
+
+ self.assertEqual(cm.unraisable.exc_type, OSError)
+
+ # Systematic tests of the text I/O API
+
+ def test_basic_io(self):
+ for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
+ for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
+ f = self.open(os_helper.TESTFN, "w+", encoding=enc)
+ f._CHUNK_SIZE = chunksize
+ self.assertEqual(f.write("abc"), 3)
+ f.close()
+ f = self.open(os_helper.TESTFN, "r+", encoding=enc)
+ f._CHUNK_SIZE = chunksize
+ self.assertEqual(f.tell(), 0)
+ self.assertEqual(f.read(), "abc")
+ cookie = f.tell()
+ self.assertEqual(f.seek(0), 0)
+ self.assertEqual(f.read(None), "abc")
+ f.seek(0)
+ self.assertEqual(f.read(2), "ab")
+ self.assertEqual(f.read(1), "c")
+ self.assertEqual(f.read(1), "")
+ self.assertEqual(f.read(), "")
+ self.assertEqual(f.tell(), cookie)
+ self.assertEqual(f.seek(0), 0)
+ self.assertEqual(f.seek(0, 2), cookie)
+ self.assertEqual(f.write("def"), 3)
+ self.assertEqual(f.seek(cookie), cookie)
+ self.assertEqual(f.read(), "def")
+ if enc.startswith("utf"):
+ self.multi_line_test(f, enc)
+ f.close()
+
+ def multi_line_test(self, f, enc):
+ f.seek(0)
+ f.truncate()
+ sample = "s\xff\u0fff\uffff"
+ wlines = []
+ for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
+ chars = []
+ for i in range(size):
+ chars.append(sample[i % len(sample)])
+ line = "".join(chars) + "\n"
+ wlines.append((f.tell(), line))
+ f.write(line)
+ f.seek(0)
+ rlines = []
+ while True:
+ pos = f.tell()
+ line = f.readline()
+ if not line:
+ break
+ rlines.append((pos, line))
+ self.assertEqual(rlines, wlines)
+
+ def test_telling(self):
+ f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
+ p0 = f.tell()
+ f.write("\xff\n")
+ p1 = f.tell()
+ f.write("\xff\n")
+ p2 = f.tell()
+ f.seek(0)
+ self.assertEqual(f.tell(), p0)
+ self.assertEqual(f.readline(), "\xff\n")
+ self.assertEqual(f.tell(), p1)
+ self.assertEqual(f.readline(), "\xff\n")
+ self.assertEqual(f.tell(), p2)
+ f.seek(0)
+ for line in f:
+ self.assertEqual(line, "\xff\n")
+ self.assertRaises(OSError, f.tell)
+ self.assertEqual(f.tell(), p2)
+ f.close()
+
+ def test_seeking(self):
+ chunk_size = _default_chunk_size()
+ prefix_size = chunk_size - 2
+ u_prefix = "a" * prefix_size
+ prefix = bytes(u_prefix.encode("utf-8"))
+ self.assertEqual(len(u_prefix), len(prefix))
+ u_suffix = "\u8888\n"
+ suffix = bytes(u_suffix.encode("utf-8"))
+ line = prefix + suffix
+ with self.open(os_helper.TESTFN, "wb") as f:
+ f.write(line*2)
+ with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
+ s = f.read(prefix_size)
+ self.assertEqual(s, str(prefix, "ascii"))
+ self.assertEqual(f.tell(), prefix_size)
+ self.assertEqual(f.readline(), u_suffix)
+
+ def test_seeking_too(self):
+ # Regression test for a specific bug
+ data = b'\xe0\xbf\xbf\n'
+ with self.open(os_helper.TESTFN, "wb") as f:
+ f.write(data)
+ with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
+ f._CHUNK_SIZE # Just test that it exists
+ f._CHUNK_SIZE = 2
+ f.readline()
+ f.tell()
+
+ def test_seek_and_tell(self):
+ #Test seek/tell using the StatefulIncrementalDecoder.
+ # Make test faster by doing smaller seeks
+ CHUNK_SIZE = 128
+
+ def test_seek_and_tell_with_data(data, min_pos=0):
+ """Tell/seek to various points within a data stream and ensure
+ that the decoded data returned by read() is consistent."""
+ f = self.open(os_helper.TESTFN, 'wb')
+ f.write(data)
+ f.close()
+ f = self.open(os_helper.TESTFN, encoding='test_decoder')
+ f._CHUNK_SIZE = CHUNK_SIZE
+ decoded = f.read()
+ f.close()
+
+ for i in range(min_pos, len(decoded) + 1): # seek positions
+ for j in [1, 5, len(decoded) - i]: # read lengths
+ f = self.open(os_helper.TESTFN, encoding='test_decoder')
+ self.assertEqual(f.read(i), decoded[:i])
+ cookie = f.tell()
+ self.assertEqual(f.read(j), decoded[i:i + j])
+ f.seek(cookie)
+ self.assertEqual(f.read(), decoded[i:])
+ f.close()
+
+ # Enable the test decoder.
+ StatefulIncrementalDecoder.codecEnabled = 1
+
+ # Run the tests.
+ try:
+ # Try each test case.
+ for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
+ test_seek_and_tell_with_data(input)
+
+ # Position each test case so that it crosses a chunk boundary.
+ for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
+ offset = CHUNK_SIZE - len(input)//2
+ prefix = b'.'*offset
+ # Don't bother seeking into the prefix (takes too long).
+ min_pos = offset*2
+ test_seek_and_tell_with_data(prefix + input, min_pos)
+
+ # Ensure our test decoder won't interfere with subsequent tests.
+ finally:
+ StatefulIncrementalDecoder.codecEnabled = 0
+
+ def test_multibyte_seek_and_tell(self):
+ f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
+ f.write("AB\n\u3046\u3048\n")
+ f.close()
+
+ f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
+ self.assertEqual(f.readline(), "AB\n")
+ p0 = f.tell()
+ self.assertEqual(f.readline(), "\u3046\u3048\n")
+ p1 = f.tell()
+ f.seek(p0)
+ self.assertEqual(f.readline(), "\u3046\u3048\n")
+ self.assertEqual(f.tell(), p1)
+ f.close()
+
+ def test_seek_with_encoder_state(self):
+ f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
+ f.write("\u00e6\u0300")
+ p0 = f.tell()
+ f.write("\u00e6")
+ f.seek(p0)
+ f.write("\u0300")
+ f.close()
+
+ f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
+ self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
+ f.close()
+
+ def test_encoded_writes(self):
+ data = "1234567890"
+ tests = ("utf-16",
+ "utf-16-le",
+ "utf-16-be",
+ "utf-32",
+ "utf-32-le",
+ "utf-32-be")
+ for encoding in tests:
+ buf = self.BytesIO()
+ f = self.TextIOWrapper(buf, encoding=encoding)
+ # Check if the BOM is written only once (see issue1753).
+ f.write(data)
+ f.write(data)
+ f.seek(0)
+ self.assertEqual(f.read(), data * 2)
+ f.seek(0)
+ self.assertEqual(f.read(), data * 2)
+ self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
+
+ def test_unreadable(self):
+ class UnReadable(self.BytesIO):
+ def readable(self):
+ return False
+ txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
+ self.assertRaises(OSError, txt.read)
+
+ def test_read_one_by_one(self):
+ txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
+ reads = ""
+ while True:
+ c = txt.read(1)
+ if not c:
+ break
+ reads += c
+ self.assertEqual(reads, "AA\nBB")
+
+ def test_readlines(self):
+ txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
+ self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
+ txt.seek(0)
+ self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
+ txt.seek(0)
+ self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
+
+ # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
+ def test_read_by_chunk(self):
+ # make sure "\r\n" straddles 128 char boundary.
+ txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
+ reads = ""
+ while True:
+ c = txt.read(128)
+ if not c:
+ break
+ reads += c
+ self.assertEqual(reads, "A"*127+"\nB")
+
+ def test_writelines(self):
+ l = ['ab', 'cd', 'ef']
+ buf = self.BytesIO()
+ txt = self.TextIOWrapper(buf, encoding="utf-8")
+ txt.writelines(l)
+ txt.flush()
+ self.assertEqual(buf.getvalue(), b'abcdef')
+
+ def test_writelines_userlist(self):
+ l = UserList(['ab', 'cd', 'ef'])
+ buf = self.BytesIO()
+ txt = self.TextIOWrapper(buf, encoding="utf-8")
+ txt.writelines(l)
+ txt.flush()
+ self.assertEqual(buf.getvalue(), b'abcdef')
+
+ def test_writelines_error(self):
+ txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
+ self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
+ self.assertRaises(TypeError, txt.writelines, None)
+ self.assertRaises(TypeError, txt.writelines, b'abc')
+
+ def test_issue1395_1(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+
+ # read one char at a time
+ reads = ""
+ while True:
+ c = txt.read(1)
+ if not c:
+ break
+ reads += c
+ self.assertEqual(reads, self.normalized)
+
+ def test_issue1395_2(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = ""
+ while True:
+ c = txt.read(4)
+ if not c:
+ break
+ reads += c
+ self.assertEqual(reads, self.normalized)
+
+ def test_issue1395_3(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ reads += txt.read(4)
+ reads += txt.readline()
+ reads += txt.readline()
+ reads += txt.readline()
+ self.assertEqual(reads, self.normalized)
+
+ def test_issue1395_4(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ reads += txt.read()
+ self.assertEqual(reads, self.normalized)
+
+ def test_issue1395_5(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ pos = txt.tell()
+ txt.seek(0)
+ txt.seek(pos)
+ self.assertEqual(txt.read(4), "BBB\n")
+
+ def test_issue2282(self):
+ buffer = self.BytesIO(self.testdata)
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+
+ self.assertEqual(buffer.seekable(), txt.seekable())
+
+ def test_append_bom(self):
+ # The BOM is not written again when appending to a non-empty file
+ filename = os_helper.TESTFN
+ for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+ with self.open(filename, 'w', encoding=charset) as f:
+ f.write('aaa')
+ pos = f.tell()
+ with self.open(filename, 'rb') as f:
+ self.assertEqual(f.read(), 'aaa'.encode(charset))
+
+ with self.open(filename, 'a', encoding=charset) as f:
+ f.write('xxx')
+ with self.open(filename, 'rb') as f:
+ self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
+
+ def test_seek_bom(self):
+ # Same test, but when seeking manually
+ filename = os_helper.TESTFN
+ for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+ with self.open(filename, 'w', encoding=charset) as f:
+ f.write('aaa')
+ pos = f.tell()
+ with self.open(filename, 'r+', encoding=charset) as f:
+ f.seek(pos)
+ f.write('zzz')
+ f.seek(0)
+ f.write('bbb')
+ with self.open(filename, 'rb') as f:
+ self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
+
+ def test_seek_append_bom(self):
+ # Same test, but first seek to the start and then to the end
+ filename = os_helper.TESTFN
+ for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+ with self.open(filename, 'w', encoding=charset) as f:
+ f.write('aaa')
+ with self.open(filename, 'a', encoding=charset) as f:
+ f.seek(0)
+ f.seek(0, self.SEEK_END)
+ f.write('xxx')
+ with self.open(filename, 'rb') as f:
+ self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
+
+ def test_errors_property(self):
+ with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
+ self.assertEqual(f.errors, "strict")
+ with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
+ self.assertEqual(f.errors, "replace")
+
+ @support.no_tracing
+ @threading_helper.requires_working_threading()
+ def test_threads_write(self):
+ # Issue6750: concurrent writes could duplicate data
+ event = threading.Event()
+ with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
+ def run(n):
+ text = "Thread%03d\n" % n
+ event.wait()
+ f.write(text)
+ threads = [threading.Thread(target=run, args=(x,))
+ for x in range(20)]
+ with threading_helper.start_threads(threads, event.set):
+ time.sleep(0.02)
+ with self.open(os_helper.TESTFN, encoding="utf-8") as f:
+ content = f.read()
+ for n in range(20):
+ self.assertEqual(content.count("Thread%03d\n" % n), 1)
+
+ def test_flush_error_on_close(self):
+ # Test that text file is closed despite failed flush
+ # and that flush() is called before file closed.
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ closed = []
+ def bad_flush():
+ closed[:] = [txt.closed, txt.buffer.closed]
+ raise OSError()
+ txt.flush = bad_flush
+ self.assertRaises(OSError, txt.close) # exception not swallowed
+ self.assertTrue(txt.closed)
+ self.assertTrue(txt.buffer.closed)
+ self.assertTrue(closed) # flush() called
+ self.assertFalse(closed[0]) # flush() called before file closed
+ self.assertFalse(closed[1])
+ txt.flush = lambda: None # break reference loop
+
+ def test_close_error_on_close(self):
+ buffer = self.BytesIO(self.testdata)
+ def bad_flush():
+ raise OSError('flush')
+ def bad_close():
+ raise OSError('close')
+ buffer.close = bad_close
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+ txt.flush = bad_flush
+ with self.assertRaises(OSError) as err: # exception not swallowed
+ txt.close()
+ self.assertEqual(err.exception.args, ('close',))
+ self.assertIsInstance(err.exception.__context__, OSError)
+ self.assertEqual(err.exception.__context__.args, ('flush',))
+ self.assertFalse(txt.closed)
+
+ # Silence destructor error
+ buffer.close = lambda: None
+ txt.flush = lambda: None
+
+ def test_nonnormalized_close_error_on_close(self):
+ # Issue #21677
+ buffer = self.BytesIO(self.testdata)
+ def bad_flush():
+ raise non_existing_flush
+ def bad_close():
+ raise non_existing_close
+ buffer.close = bad_close
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+ txt.flush = bad_flush
+ with self.assertRaises(NameError) as err: # exception not swallowed
+ txt.close()
+ self.assertIn('non_existing_close', str(err.exception))
+ self.assertIsInstance(err.exception.__context__, NameError)
+ self.assertIn('non_existing_flush', str(err.exception.__context__))
+ self.assertFalse(txt.closed)
+
+ # Silence destructor error
+ buffer.close = lambda: None
+ txt.flush = lambda: None
+
+ def test_multi_close(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ txt.close()
+ txt.close()
+ txt.close()
+ self.assertRaises(ValueError, txt.flush)
+
+ def test_unseekable(self):
+ txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
+ self.assertRaises(self.UnsupportedOperation, txt.tell)
+ self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
+
+ def test_readonly_attributes(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ buf = self.BytesIO(self.testdata)
+ with self.assertRaises(AttributeError):
+ txt.buffer = buf
+
+ def test_rawio(self):
+ # Issue #12591: TextIOWrapper must work with raw I/O objects, so
+ # that subprocess.Popen() can have the required unbuffered
+ # semantics with universal_newlines=True.
+ raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ # Reads
+ self.assertEqual(txt.read(4), 'abcd')
+ self.assertEqual(txt.readline(), 'efghi\n')
+ self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
+
+ def test_rawio_write_through(self):
+ # Issue #12591: with write_through=True, writes don't need a flush
+ raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
+ write_through=True)
+ txt.write('1')
+ txt.write('23\n4')
+ txt.write('5')
+ self.assertEqual(b''.join(raw._write_stack), b'123\n45')
+
+ def test_bufio_write_through(self):
+ # Issue #21396: write_through=True doesn't force a flush()
+ # on the underlying binary buffered object.
+ flush_called, write_called = [], []
+ class BufferedWriter(self.BufferedWriter):
+ def flush(self, *args, **kwargs):
+ flush_called.append(True)
+ return super().flush(*args, **kwargs)
+ def write(self, *args, **kwargs):
+ write_called.append(True)
+ return super().write(*args, **kwargs)
+
+ rawio = self.BytesIO()
+ data = b"a"
+ bufio = BufferedWriter(rawio, len(data)*2)
+ textio = self.TextIOWrapper(bufio, encoding='ascii',
+ write_through=True)
+ # write to the buffered io but don't overflow the buffer
+ text = data.decode('ascii')
+ textio.write(text)
+
+ # buffer.flush is not called with write_through=True
+ self.assertFalse(flush_called)
+ # buffer.write *is* called with write_through=True
+ self.assertTrue(write_called)
+ self.assertEqual(rawio.getvalue(), b"") # no flush
+
+ write_called = [] # reset
+ textio.write(text * 10) # total content is larger than bufio buffer
+ self.assertTrue(write_called)
+ self.assertEqual(rawio.getvalue(), data * 11) # all flushed
+
+ def test_reconfigure_write_through(self):
+ raw = self.MockRawIO([])
+ t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ t.write('1')
+ t.reconfigure(write_through=True) # implied flush
+ self.assertEqual(t.write_through, True)
+ self.assertEqual(b''.join(raw._write_stack), b'1')
+ t.write('23')
+ self.assertEqual(b''.join(raw._write_stack), b'123')
+ t.reconfigure(write_through=False)
+ self.assertEqual(t.write_through, False)
+ t.write('45')
+ t.flush()
+ self.assertEqual(b''.join(raw._write_stack), b'12345')
+ # Keeping default value
+ t.reconfigure()
+ t.reconfigure(write_through=None)
+ self.assertEqual(t.write_through, False)
+ t.reconfigure(write_through=True)
+ t.reconfigure()
+ t.reconfigure(write_through=None)
+ self.assertEqual(t.write_through, True)
+
+ def test_read_nonbytes(self):
+ # Issue #17106
+ # Crash when underlying read() returns non-bytes
+ t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
+ self.assertRaises(TypeError, t.read, 1)
+ t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
+ self.assertRaises(TypeError, t.readline)
+ t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
+ self.assertRaises(TypeError, t.read)
+
+ def test_illegal_encoder(self):
+ # Issue 31271: Calling write() while the return value of encoder's
+ # encode() is invalid shouldn't cause an assertion failure.
+ rot13 = codecs.lookup("rot13")
+ with support.swap_attr(rot13, '_is_text_encoding', True):
+ t = self.TextIOWrapper(self.BytesIO(b'foo'), encoding="rot13")
+ self.assertRaises(TypeError, t.write, 'bar')
+
+ def test_illegal_decoder(self):
+ # Issue #17106
+ # Bypass the early encoding check added in issue 20404
+ def _make_illegal_wrapper():
+ quopri = codecs.lookup("quopri")
+ quopri._is_text_encoding = True
+ try:
+ t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
+ newline='\n', encoding="quopri")
+ finally:
+ quopri._is_text_encoding = False
+ return t
+ # Crash when decoder returns non-string
+ t = _make_illegal_wrapper()
+ self.assertRaises(TypeError, t.read, 1)
+ t = _make_illegal_wrapper()
+ self.assertRaises(TypeError, t.readline)
+ t = _make_illegal_wrapper()
+ self.assertRaises(TypeError, t.read)
+
+ # Issue 31243: calling read() while the return value of decoder's
+ # getstate() is invalid should neither crash the interpreter nor
+ # raise a SystemError.
+ def _make_very_illegal_wrapper(getstate_ret_val):
+ class BadDecoder:
+ def getstate(self):
+ return getstate_ret_val
+ def _get_bad_decoder(dummy):
+ return BadDecoder()
+ quopri = codecs.lookup("quopri")
+ with support.swap_attr(quopri, 'incrementaldecoder',
+ _get_bad_decoder):
+ return _make_illegal_wrapper()
+ t = _make_very_illegal_wrapper(42)
+ self.assertRaises(TypeError, t.read, 42)
+ t = _make_very_illegal_wrapper(())
+ self.assertRaises(TypeError, t.read, 42)
+ t = _make_very_illegal_wrapper((1, 2))
+ self.assertRaises(TypeError, t.read, 42)
+
+ def _check_create_at_shutdown(self, **kwargs):
+ # Issue #20037: creating a TextIOWrapper at shutdown
+ # shouldn't crash the interpreter.
+ iomod = self.io.__name__
+ code = """if 1:
+ import codecs
+ import {iomod} as io
+
+ # Avoid looking up codecs at shutdown
+ codecs.lookup('utf-8')
+
+ class C:
+ def __del__(self):
+ io.TextIOWrapper(io.BytesIO(), **{kwargs})
+ print("ok")
+ c = C()
+ """.format(iomod=iomod, kwargs=kwargs)
+ return assert_python_ok("-c", code)
+
+ def test_create_at_shutdown_without_encoding(self):
+ rc, out, err = self._check_create_at_shutdown()
+ if err:
+ # Can error out with a RuntimeError if the module state
+ # isn't found.
+ self.assertIn(self.shutdown_error, err.decode())
+ else:
+ self.assertEqual("ok", out.decode().strip())
+
+ def test_create_at_shutdown_with_encoding(self):
+ rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
+ errors='strict')
+ self.assertFalse(err)
+ self.assertEqual("ok", out.decode().strip())
+
+ def test_read_byteslike(self):
+ r = MemviewBytesIO(b'Just some random string\n')
+ t = self.TextIOWrapper(r, 'utf-8')
+
+ # TextIOwrapper will not read the full string, because
+ # we truncate it to a multiple of the native int size
+ # so that we can construct a more complex memoryview.
+ bytes_val = _to_memoryview(r.getvalue()).tobytes()
+
+ self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
+
+ def test_issue22849(self):
+ class F(object):
+ def readable(self): return True
+ def writable(self): return True
+ def seekable(self): return True
+
+ for i in range(10):
+ try:
+ self.TextIOWrapper(F(), encoding='utf-8')
+ except Exception:
+ pass
+
+ F.tell = lambda x: 0
+ t = self.TextIOWrapper(F(), encoding='utf-8')
+
+ def test_reconfigure_locale(self):
+ wrapper = self.TextIOWrapper(self.BytesIO(b"test"))
+ wrapper.reconfigure(encoding="locale")
+
+ def test_reconfigure_encoding_read(self):
+ # latin1 -> utf8
+ # (latin1 can decode utf-8 encoded string)
+ data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
+ raw = self.BytesIO(data)
+ txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+ self.assertEqual(txt.readline(), 'abc\xe9\n')
+ with self.assertRaises(self.UnsupportedOperation):
+ txt.reconfigure(encoding='utf-8')
+ with self.assertRaises(self.UnsupportedOperation):
+ txt.reconfigure(newline=None)
+
+ def test_reconfigure_write_fromascii(self):
+ # ascii has a specific encodefunc in the C implementation,
+ # but utf-8-sig has not. Make sure that we get rid of the
+ # cached encodefunc when we switch encoders.
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('foo\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('\xe9\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
+
+ def test_reconfigure_write(self):
+ # latin -> utf8
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+ txt.write('abc\xe9\n')
+ txt.reconfigure(encoding='utf-8')
+ self.assertEqual(raw.getvalue(), b'abc\xe9\n')
+ txt.write('d\xe9f\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
+
+ # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
+ # the file
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('abc\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('d\xe9f\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
+
+ def test_reconfigure_write_non_seekable(self):
+ raw = self.BytesIO()
+ raw.seekable = lambda: False
+ raw.seek = None
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('abc\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('d\xe9f\n')
+ txt.flush()
+
+ # If the raw stream is not seekable, there'll be a BOM
+ self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
+
+ def test_reconfigure_defaults(self):
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
+ txt.reconfigure(encoding=None)
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+ txt.write('LF\n')
+
+ txt.reconfigure(newline='\r\n')
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+
+ txt.reconfigure(errors='ignore')
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'ignore')
+ txt.write('CRLF\n')
+
+ txt.reconfigure(encoding='utf-8', newline=None)
+ self.assertEqual(txt.errors, 'strict')
+ txt.seek(0)
+ self.assertEqual(txt.read(), 'LF\nCRLF\n')
+
+ self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
+
+ def test_reconfigure_errors(self):
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r')
+ with self.assertRaises(TypeError): # there was a crash
+ txt.reconfigure(encoding=42)
+ if self.is_C:
+ with self.assertRaises(UnicodeEncodeError):
+ txt.reconfigure(encoding='\udcfe')
+ with self.assertRaises(LookupError):
+ txt.reconfigure(encoding='locale\0')
+ # TODO: txt.reconfigure(encoding='utf-8\0')
+ # TODO: txt.reconfigure(encoding='nonexisting')
+ with self.assertRaises(TypeError):
+ txt.reconfigure(errors=42)
+ if self.is_C:
+ with self.assertRaises(UnicodeEncodeError):
+ txt.reconfigure(errors='\udcfe')
+ # TODO: txt.reconfigure(errors='ignore\0')
+ # TODO: txt.reconfigure(errors='nonexisting')
+ with self.assertRaises(TypeError):
+ txt.reconfigure(newline=42)
+ with self.assertRaises(ValueError):
+ txt.reconfigure(newline='\udcfe')
+ with self.assertRaises(ValueError):
+ txt.reconfigure(newline='xyz')
+ if not self.is_C:
+ # TODO: Should fail in C too.
+ with self.assertRaises(ValueError):
+ txt.reconfigure(newline='\n\0')
+ if self.is_C:
+ # TODO: Use __bool__(), not __index__().
+ with self.assertRaises(ZeroDivisionError):
+ txt.reconfigure(line_buffering=BadIndex())
+ with self.assertRaises(OverflowError):
+ txt.reconfigure(line_buffering=2**1000)
+ with self.assertRaises(ZeroDivisionError):
+ txt.reconfigure(write_through=BadIndex())
+ with self.assertRaises(OverflowError):
+ txt.reconfigure(write_through=2**1000)
+ with self.assertRaises(ZeroDivisionError): # there was a crash
+ txt.reconfigure(line_buffering=BadIndex(),
+ write_through=BadIndex())
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+ self.assertIs(txt.line_buffering, False)
+ self.assertIs(txt.write_through, False)
+
+ txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n',
+ line_buffering=True, write_through=True)
+ self.assertEqual(txt.encoding, 'latin1')
+ self.assertEqual(txt.errors, 'ignore')
+ self.assertIs(txt.line_buffering, True)
+ self.assertIs(txt.write_through, True)
+
+ def test_reconfigure_newline(self):
+ raw = self.BytesIO(b'CR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline=None)
+ self.assertEqual(txt.readline(), 'CR\n')
+ raw = self.BytesIO(b'CR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline='')
+ self.assertEqual(txt.readline(), 'CR\r')
+ raw = self.BytesIO(b'CR\rLF\nEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+ txt.reconfigure(newline='\n')
+ self.assertEqual(txt.readline(), 'CR\rLF\n')
+ raw = self.BytesIO(b'LF\nCR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline='\r')
+ self.assertEqual(txt.readline(), 'LF\nCR\r')
+ raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+ txt.reconfigure(newline='\r\n')
+ self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
+
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
+ txt.reconfigure(newline=None)
+ txt.write('linesep\n')
+ txt.reconfigure(newline='')
+ txt.write('LF\n')
+ txt.reconfigure(newline='\n')
+ txt.write('LF\n')
+ txt.reconfigure(newline='\r')
+ txt.write('CR\n')
+ txt.reconfigure(newline='\r\n')
+ txt.write('CRLF\n')
+ expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
+ self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
+
+ def test_issue25862(self):
+ # Assertion failures occurred in tell() after read() and write().
+ t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
+ t.read(1)
+ t.read()
+ t.tell()
+ t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
+ t.read(1)
+ t.write('x')
+ t.tell()
+
+ def test_issue35928(self):
+ p = self.BufferedRWPair(self.BytesIO(b'foo\nbar\n'), self.BytesIO())
+ f = self.TextIOWrapper(p)
+ res = f.readline()
+ self.assertEqual(res, 'foo\n')
+ f.write(res)
+ self.assertEqual(res + f.readline(), 'foo\nbar\n')
+
+ def test_pickling_subclass(self):
+ global MyTextIO
+ class MyTextIO(self.TextIOWrapper):
+ def __init__(self, raw, tag):
+ super().__init__(raw)
+ self.tag = tag
+ def __getstate__(self):
+ return self.tag, self.buffer.getvalue()
+ def __setstate__(slf, state):
+ tag, value = state
+ slf.__init__(self.BytesIO(value), tag)
+
+ raw = self.BytesIO(b'data')
+ txt = MyTextIO(raw, 'ham')
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(protocol=proto):
+ pickled = pickle.dumps(txt, proto)
+ newtxt = pickle.loads(pickled)
+ self.assertEqual(newtxt.buffer.getvalue(), b'data')
+ self.assertEqual(newtxt.tag, 'ham')
+ del MyTextIO
+
+ @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
+ def test_read_non_blocking(self):
+ import os
+ r, w = os.pipe()
+ try:
+ os.set_blocking(r, False)
+ with self.io.open(r, 'rt') as textfile:
+ r = None
+ # Nothing has been written so a non-blocking read raises a BlockingIOError exception.
+ with self.assertRaises(BlockingIOError):
+ textfile.read()
+ finally:
+ if r is not None:
+ os.close(r)
+ os.close(w)
+
+
+class MemviewBytesIO(io.BytesIO):
+ '''A BytesIO object whose read method returns memoryviews
+ rather than bytes'''
+
+ def read1(self, len_):
+ return _to_memoryview(super().read1(len_))
+
+ def read(self, len_):
+ return _to_memoryview(super().read(len_))
+
+def _to_memoryview(buf):
+ '''Convert bytes-object *buf* to a non-trivial memoryview'''
+
+ arr = array.array('i')
+ idx = len(buf) - len(buf) % arr.itemsize
+ arr.frombytes(buf[:idx])
+ return memoryview(arr)
+
+
+class CTextIOWrapperTest(TextIOWrapperTest, CTestCase):
+ shutdown_error = "LookupError: unknown encoding: ascii"
+
+ def test_initialization(self):
+ r = self.BytesIO(b"\xc3\xa9\n\n")
+ b = self.BufferedReader(r, 1000)
+ t = self.TextIOWrapper(b, encoding="utf-8")
+ self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
+ self.assertRaises(ValueError, t.read)
+
+ t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+ self.assertRaises(Exception, repr, t)
+
+ def test_garbage_collection(self):
+ # C TextIOWrapper objects are collected, and collecting them flushes
+ # all data to disk.
+ # The Python version has __del__, so it ends in gc.garbage instead.
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", ResourceWarning)
+ rawio = self.FileIO(os_helper.TESTFN, "wb")
+ b = self.BufferedWriter(rawio)
+ t = self.TextIOWrapper(b, encoding="ascii")
+ t.write("456def")
+ t.x = t
+ wr = weakref.ref(t)
+ del t
+ support.gc_collect()
+ self.assertIsNone(wr(), wr)
+ with self.open(os_helper.TESTFN, "rb") as f:
+ self.assertEqual(f.read(), b"456def")
+
+ def test_rwpair_cleared_before_textio(self):
+ # Issue 13070: TextIOWrapper's finalization would crash when called
+ # after the reference to the underlying BufferedRWPair's writer got
+ # cleared by the GC.
+ for i in range(1000):
+ b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
+ t1 = self.TextIOWrapper(b1, encoding="ascii")
+ b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
+ t2 = self.TextIOWrapper(b2, encoding="ascii")
+ # circular references
+ t1.buddy = t2
+ t2.buddy = t1
+ support.gc_collect()
+
+ def test_del__CHUNK_SIZE_SystemError(self):
+ t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
+ with self.assertRaises(AttributeError):
+ del t._CHUNK_SIZE
+
+ def test_internal_buffer_size(self):
+ # bpo-43260: TextIOWrapper's internal buffer should not store
+ # data larger than chunk size.
+ chunk_size = 8192 # default chunk size, updated later
+
+ class MockIO(self.MockRawIO):
+ def write(self, data):
+ if len(data) > chunk_size:
+ raise RuntimeError
+ return super().write(data)
+
+ buf = MockIO()
+ t = self.TextIOWrapper(buf, encoding="ascii")
+ chunk_size = t._CHUNK_SIZE
+ t.write("abc")
+ t.write("def")
+ # default chunk size is 8192 bytes so t don't write data to buf.
+ self.assertEqual([], buf._write_stack)
+
+ with self.assertRaises(RuntimeError):
+ t.write("x"*(chunk_size+1))
+
+ self.assertEqual([b"abcdef"], buf._write_stack)
+ t.write("ghi")
+ t.write("x"*chunk_size)
+ self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
+
+ def test_issue119506(self):
+ chunk_size = 8192
+
+ class MockIO(self.MockRawIO):
+ written = False
+ def write(self, data):
+ if not self.written:
+ self.written = True
+ t.write("middle")
+ return super().write(data)
+
+ buf = MockIO()
+ t = self.TextIOWrapper(buf)
+ t.write("abc")
+ t.write("def")
+ # writing data which size >= chunk_size cause flushing buffer before write.
+ t.write("g" * chunk_size)
+ t.flush()
+
+ self.assertEqual([b"abcdef", b"middle", b"g"*chunk_size],
+ buf._write_stack)
+
+
+class PyTextIOWrapperTest(TextIOWrapperTest, PyTestCase):
+ shutdown_error = "LookupError: unknown encoding: ascii"
+
+
+class IncrementalNewlineDecoderTest:
+
+ def check_newline_decoding_utf8(self, decoder):
+ # UTF-8 specific tests for a newline decoder
+ def _check_decode(b, s, **kwargs):
+ # We exercise getstate() / setstate() as well as decode()
+ state = decoder.getstate()
+ self.assertEqual(decoder.decode(b, **kwargs), s)
+ decoder.setstate(state)
+ self.assertEqual(decoder.decode(b, **kwargs), s)
+
+ _check_decode(b'\xe8\xa2\x88', "\u8888")
+
+ _check_decode(b'\xe8', "")
+ _check_decode(b'\xa2', "")
+ _check_decode(b'\x88', "\u8888")
+
+ _check_decode(b'\xe8', "")
+ _check_decode(b'\xa2', "")
+ _check_decode(b'\x88', "\u8888")
+
+ _check_decode(b'\xe8', "")
+ self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
+
+ decoder.reset()
+ _check_decode(b'\n', "\n")
+ _check_decode(b'\r', "")
+ _check_decode(b'', "\n", final=True)
+ _check_decode(b'\r', "\n", final=True)
+
+ _check_decode(b'\r', "")
+ _check_decode(b'a', "\na")
+
+ _check_decode(b'\r\r\n', "\n\n")
+ _check_decode(b'\r', "")
+ _check_decode(b'\r', "\n")
+ _check_decode(b'\na', "\na")
+
+ _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
+ _check_decode(b'\xe8\xa2\x88', "\u8888")
+ _check_decode(b'\n', "\n")
+ _check_decode(b'\xe8\xa2\x88\r', "\u8888")
+ _check_decode(b'\n', "\n")
+
+ def check_newline_decoding(self, decoder, encoding):
+ result = []
+ if encoding is not None:
+ encoder = codecs.getincrementalencoder(encoding)()
+ def _decode_bytewise(s):
+ # Decode one byte at a time
+ for b in encoder.encode(s):
+ result.append(decoder.decode(bytes([b])))
+ else:
+ encoder = None
+ def _decode_bytewise(s):
+ # Decode one char at a time
+ for c in s:
+ result.append(decoder.decode(c))
+ self.assertEqual(decoder.newlines, None)
+ _decode_bytewise("abc\n\r")
+ self.assertEqual(decoder.newlines, '\n')
+ _decode_bytewise("\nabc")
+ self.assertEqual(decoder.newlines, ('\n', '\r\n'))
+ _decode_bytewise("abc\r")
+ self.assertEqual(decoder.newlines, ('\n', '\r\n'))
+ _decode_bytewise("abc")
+ self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
+ _decode_bytewise("abc\r")
+ self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
+ decoder.reset()
+ input = "abc"
+ if encoder is not None:
+ encoder.reset()
+ input = encoder.encode(input)
+ self.assertEqual(decoder.decode(input), "abc")
+ self.assertEqual(decoder.newlines, None)
+
+ def test_newline_decoder(self):
+ encodings = (
+ # None meaning the IncrementalNewlineDecoder takes unicode input
+ # rather than bytes input
+ None, 'utf-8', 'latin-1',
+ 'utf-16', 'utf-16-le', 'utf-16-be',
+ 'utf-32', 'utf-32-le', 'utf-32-be',
+ )
+ for enc in encodings:
+ decoder = enc and codecs.getincrementaldecoder(enc)()
+ decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
+ self.check_newline_decoding(decoder, enc)
+ decoder = codecs.getincrementaldecoder("utf-8")()
+ decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
+ self.check_newline_decoding_utf8(decoder)
+ self.assertRaises(TypeError, decoder.setstate, 42)
+
+ def test_newline_bytes(self):
+ # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
+ def _check(dec):
+ self.assertEqual(dec.newlines, None)
+ self.assertEqual(dec.decode("\u0D00"), "\u0D00")
+ self.assertEqual(dec.newlines, None)
+ self.assertEqual(dec.decode("\u0A00"), "\u0A00")
+ self.assertEqual(dec.newlines, None)
+ dec = self.IncrementalNewlineDecoder(None, translate=False)
+ _check(dec)
+ dec = self.IncrementalNewlineDecoder(None, translate=True)
+ _check(dec)
+
+ def test_translate(self):
+ # issue 35062
+ for translate in (-2, -1, 1, 2):
+ decoder = codecs.getincrementaldecoder("utf-8")()
+ decoder = self.IncrementalNewlineDecoder(decoder, translate)
+ self.check_newline_decoding_utf8(decoder)
+ decoder = codecs.getincrementaldecoder("utf-8")()
+ decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
+ self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
+
+class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
+ IncrementalNewlineDecoder = io.IncrementalNewlineDecoder
+
+ @support.cpython_only
+ def test_uninitialized(self):
+ uninitialized = self.IncrementalNewlineDecoder.__new__(
+ self.IncrementalNewlineDecoder)
+ self.assertRaises(ValueError, uninitialized.decode, b'bar')
+ self.assertRaises(ValueError, uninitialized.getstate)
+ self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0))
+ self.assertRaises(ValueError, uninitialized.reset)
+
+
+class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
+ IncrementalNewlineDecoder = pyio.IncrementalNewlineDecoder
1
0
gh-139116: tracemalloc: Detach thread state when acquiring tables_lock (GH-139449)
by encukou Sept. 30, 2025
by encukou Sept. 30, 2025
Sept. 30, 2025
https://github.com/python/cpython/commit/01157e0cdff1321a065b0a00c5a97460f5…
commit: 01157e0cdff1321a065b0a00c5a97460f52af07e
branch: main
author: Petr Viktorin <encukou(a)gmail.com>
committer: encukou <encukou(a)gmail.com>
date: 2025-09-30T16:43:49+02:00
summary:
gh-139116: tracemalloc: Detach thread state when acquiring tables_lock (GH-139449)
* gh-139116: tracemalloc: Detach thread state when acquiring tables_lock
This prevents a deadlock when:
- One thread is in `_PyTraceMalloc_Stop`, with `TABLES_LOCK` held, calling
`PyRefTracer_SetTracer` which wants to stop the world
- Another is thread in `PyTraceMalloc_Track`, just attached thread state, waiting
for `TABLES_LOCK`
Detaching the thread state while waiting for `TABLES_LOCK` allows
`PyRefTracer_SetTracer` to stop the world.
Co-authored-by: Peter Bierma <zintensitydev(a)gmail.com>
files:
A Misc/NEWS.d/next/Core_and_Builtins/2025-09-30-14-57-19.gh-issue-139116.nlVf40.rst
M Python/tracemalloc.c
diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2025-09-30-14-57-19.gh-issue-139116.nlVf40.rst b/Misc/NEWS.d/next/Core_and_Builtins/2025-09-30-14-57-19.gh-issue-139116.nlVf40.rst
new file mode 100644
index 00000000000000..00692e97c44fb5
--- /dev/null
+++ b/Misc/NEWS.d/next/Core_and_Builtins/2025-09-30-14-57-19.gh-issue-139116.nlVf40.rst
@@ -0,0 +1,2 @@
+Prevent a deadlock when multiple threads start, stop and use :mod:`tracemalloc`
+simultaneously.
diff --git a/Python/tracemalloc.c b/Python/tracemalloc.c
index 7066a214f1065b..7a0853b6a07f55 100644
--- a/Python/tracemalloc.c
+++ b/Python/tracemalloc.c
@@ -36,7 +36,7 @@ static int _PyTraceMalloc_TraceRef(PyObject *op, PyRefTracerEvent event,
the GIL held from PyMem_RawFree(). It cannot acquire the lock because it
would introduce a deadlock in _PyThreadState_DeleteCurrent(). */
#define tables_lock _PyRuntime.tracemalloc.tables_lock
-#define TABLES_LOCK() PyMutex_LockFlags(&tables_lock, _Py_LOCK_DONT_DETACH)
+#define TABLES_LOCK() PyMutex_Lock(&tables_lock)
#define TABLES_UNLOCK() PyMutex_Unlock(&tables_lock)
1
0
Sept. 30, 2025
https://github.com/python/cpython/commit/b2f5ad0c6d7cfd249c41bfbcdd0a75a9f2…
commit: b2f5ad0c6d7cfd249c41bfbcdd0a75a9f21f1e72
branch: main
author: Serhiy Storchaka <storchaka(a)gmail.com>
committer: serhiy-storchaka <storchaka(a)gmail.com>
date: 2025-09-30T12:20:17+03:00
summary:
gh-63161: Add more tests for source encoding (#139440)
files:
M Lib/test/test_source_encoding.py
M Lib/test/test_tokenize.py
diff --git a/Lib/test/test_source_encoding.py b/Lib/test/test_source_encoding.py
index 1399f3fcd2d393..5df40782382120 100644
--- a/Lib/test/test_source_encoding.py
+++ b/Lib/test/test_source_encoding.py
@@ -172,6 +172,8 @@ def test_tokenizer_fstring_warning_in_first_line(self):
os.unlink(TESTFN)
+BUFSIZ = 2**13
+
class AbstractSourceEncodingTest:
def test_default_coding(self):
@@ -184,14 +186,20 @@ def test_first_coding_line(self):
self.check_script_output(src, br"'\xc3\u20ac'")
def test_second_coding_line(self):
- src = (b'#\n'
+ src = (b'#!/usr/bin/python\n'
+ b'#coding:iso8859-15\n'
+ b'print(ascii("\xc3\xa4"))\n')
+ self.check_script_output(src, br"'\xc3\u20ac'")
+
+ def test_second_coding_line_empty_first_line(self):
+ src = (b'\n'
b'#coding:iso8859-15\n'
b'print(ascii("\xc3\xa4"))\n')
self.check_script_output(src, br"'\xc3\u20ac'")
def test_third_coding_line(self):
# Only first two lines are tested for a magic comment.
- src = (b'#\n'
+ src = (b'#!/usr/bin/python\n'
b'#\n'
b'#coding:iso8859-15\n'
b'print(ascii("\xc3\xa4"))\n')
@@ -209,13 +217,52 @@ def test_double_coding_same_line(self):
b'print(ascii("\xc3\xa4"))\n')
self.check_script_output(src, br"'\xc3\u20ac'")
+ def test_double_coding_utf8(self):
+ src = (b'#coding:utf-8\n'
+ b'#coding:latin1\n'
+ b'print(ascii("\xc3\xa4"))\n')
+ self.check_script_output(src, br"'\xe4'")
+
+ def test_long_first_coding_line(self):
+ src = (b'#' + b' '*BUFSIZ + b'coding:iso8859-15\n'
+ b'print(ascii("\xc3\xa4"))\n')
+ self.check_script_output(src, br"'\xc3\u20ac'")
+
+ def test_long_second_coding_line(self):
+ src = (b'#!/usr/bin/python\n'
+ b'#' + b' '*BUFSIZ + b'coding:iso8859-15\n'
+ b'print(ascii("\xc3\xa4"))\n')
+ self.check_script_output(src, br"'\xc3\u20ac'")
+
+ def test_long_coding_line(self):
+ src = (b'#coding:iso-8859-15' + b' '*BUFSIZ + b'\n'
+ b'print(ascii("\xc3\xa4"))\n')
+ self.check_script_output(src, br"'\xc3\u20ac'")
+
+ def test_long_coding_name(self):
+ src = (b'#coding:iso-8859-1-' + b'x'*BUFSIZ + b'\n'
+ b'print(ascii("\xc3\xa4"))\n')
+ self.check_script_output(src, br"'\xc3\xa4'")
+
+ def test_long_first_utf8_line(self):
+ src = b'#' + b'\xc3\xa4'*(BUFSIZ//2) + b'\n'
+ self.check_script_output(src, b'')
+ src = b'# ' + b'\xc3\xa4'*(BUFSIZ//2) + b'\n'
+ self.check_script_output(src, b'')
+
+ def test_long_second_utf8_line(self):
+ src = b'\n#' + b'\xc3\xa4'*(BUFSIZ//2) + b'\n'
+ self.check_script_output(src, b'')
+ src = b'\n# ' + b'\xc3\xa4'*(BUFSIZ//2) + b'\n'
+ self.check_script_output(src, b'')
+
def test_first_non_utf8_coding_line(self):
src = (b'#coding:iso-8859-15 \xa4\n'
b'print(ascii("\xc3\xa4"))\n')
self.check_script_output(src, br"'\xc3\u20ac'")
def test_second_non_utf8_coding_line(self):
- src = (b'\n'
+ src = (b'#!/usr/bin/python\n'
b'#coding:iso-8859-15 \xa4\n'
b'print(ascii("\xc3\xa4"))\n')
self.check_script_output(src, br"'\xc3\u20ac'")
@@ -224,27 +271,56 @@ def test_utf8_bom(self):
src = (b'\xef\xbb\xbfprint(ascii("\xc3\xa4"))\n')
self.check_script_output(src, br"'\xe4'")
+ def test_utf8_bom_utf8_comments(self):
+ src = (b'\xef\xbb\xbf#\xc3\xa4\n'
+ b'#\xc3\xa4\n'
+ b'print(ascii("\xc3\xa4"))\n')
+ self.check_script_output(src, br"'\xe4'")
+
def test_utf8_bom_and_utf8_coding_line(self):
src = (b'\xef\xbb\xbf#coding:utf-8\n'
b'print(ascii("\xc3\xa4"))\n')
self.check_script_output(src, br"'\xe4'")
+ def test_utf8_non_utf8_comment_line_error(self):
+ src = (b'#coding: utf8\n'
+ b'#\n'
+ b'#\xa4\n'
+ b'raise RuntimeError\n')
+ self.check_script_error(src,
+ br"'utf-8' codec can't decode byte|"
+ br"encoding problem: utf8")
+
def test_crlf(self):
src = (b'print(ascii("""\r\n"""))\n')
- out = self.check_script_output(src, br"'\n'")
+ self.check_script_output(src, br"'\n'")
def test_crcrlf(self):
src = (b'print(ascii("""\r\r\n"""))\n')
- out = self.check_script_output(src, br"'\n\n'")
+ self.check_script_output(src, br"'\n\n'")
def test_crcrcrlf(self):
src = (b'print(ascii("""\r\r\r\n"""))\n')
- out = self.check_script_output(src, br"'\n\n\n'")
+ self.check_script_output(src, br"'\n\n\n'")
def test_crcrcrlf2(self):
src = (b'#coding:iso-8859-1\n'
b'print(ascii("""\r\r\r\n"""))\n')
- out = self.check_script_output(src, br"'\n\n\n'")
+ self.check_script_output(src, br"'\n\n\n'")
+
+ def test_nul_in_first_coding_line(self):
+ src = (b'#coding:iso8859-15\x00\n'
+ b'\n'
+ b'\n'
+ b'raise RuntimeError\n')
+ self.check_script_error(src, br"source code (string )?cannot contain null bytes")
+
+ def test_nul_in_second_coding_line(self):
+ src = (b'#!/usr/bin/python\n'
+ b'#coding:iso8859-15\x00\n'
+ b'\n'
+ b'raise RuntimeError\n')
+ self.check_script_error(src, br"source code (string )?cannot contain null bytes")
class UTF8ValidatorTest(unittest.TestCase):
@@ -324,6 +400,10 @@ def check_script_output(self, src, expected):
out = stdout.getvalue().encode('latin1')
self.assertEqual(out.rstrip(), expected)
+ def check_script_error(self, src, expected):
+ with self.assertRaisesRegex(SyntaxError, expected.decode()) as cm:
+ exec(src)
+
class FileSourceEncodingTest(AbstractSourceEncodingTest, unittest.TestCase):
@@ -335,6 +415,14 @@ def check_script_output(self, src, expected):
res = script_helper.assert_python_ok(fn)
self.assertEqual(res.out.rstrip(), expected)
+ def check_script_error(self, src, expected):
+ with tempfile.TemporaryDirectory() as tmpd:
+ fn = os.path.join(tmpd, 'test.py')
+ with open(fn, 'wb') as fp:
+ fp.write(src)
+ res = script_helper.assert_python_failure(fn)
+ self.assertRegex(res.err.rstrip().splitlines()[-1], b'SyntaxError.*?' + expected)
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_tokenize.py b/Lib/test/test_tokenize.py
index 865e0c5b40ddd3..d90a7659c4237c 100644
--- a/Lib/test/test_tokenize.py
+++ b/Lib/test/test_tokenize.py
@@ -1346,7 +1346,8 @@ def readline():
def test_no_bom_no_encoding_cookie(self):
lines = (
- b'# something\n',
+ b'#!/home/\xc3\xa4/bin/python\n',
+ b'# something \xe2\x82\xac\n',
b'print(something)\n',
b'do_something(else)\n'
)
@@ -1354,16 +1355,54 @@ def test_no_bom_no_encoding_cookie(self):
self.assertEqual(encoding, 'utf-8')
self.assertEqual(consumed_lines, list(lines[:2]))
+ def test_no_bom_no_encoding_cookie_first_line_error(self):
+ lines = (
+ b'#!/home/\xa4/bin/python\n\n',
+ b'print(something)\n',
+ b'do_something(else)\n'
+ )
+ with self.assertRaises(SyntaxError):
+ tokenize.detect_encoding(self.get_readline(lines))
+
+ def test_no_bom_no_encoding_cookie_second_line_error(self):
+ lines = (
+ b'#!/usr/bin/python\n',
+ b'# something \xe2\n',
+ b'print(something)\n',
+ b'do_something(else)\n'
+ )
+ with self.assertRaises(SyntaxError):
+ tokenize.detect_encoding(self.get_readline(lines))
+
def test_bom_no_cookie(self):
lines = (
- b'\xef\xbb\xbf# something\n',
+ b'\xef\xbb\xbf#!/home/\xc3\xa4/bin/python\n',
b'print(something)\n',
b'do_something(else)\n'
)
encoding, consumed_lines = tokenize.detect_encoding(self.get_readline(lines))
self.assertEqual(encoding, 'utf-8-sig')
self.assertEqual(consumed_lines,
- [b'# something\n', b'print(something)\n'])
+ [b'#!/home/\xc3\xa4/bin/python\n', b'print(something)\n'])
+
+ def test_bom_no_cookie_first_line_error(self):
+ lines = (
+ b'\xef\xbb\xbf#!/home/\xa4/bin/python\n',
+ b'print(something)\n',
+ b'do_something(else)\n'
+ )
+ with self.assertRaises(SyntaxError):
+ tokenize.detect_encoding(self.get_readline(lines))
+
+ def test_bom_no_cookie_second_line_error(self):
+ lines = (
+ b'\xef\xbb\xbf#!/usr/bin/python\n',
+ b'# something \xe2\n',
+ b'print(something)\n',
+ b'do_something(else)\n'
+ )
+ with self.assertRaises(SyntaxError):
+ tokenize.detect_encoding(self.get_readline(lines))
def test_cookie_first_line_no_bom(self):
lines = (
@@ -1439,27 +1478,58 @@ def test_cookie_second_line_noncommented_first_line(self):
expected = [b"print('\xc2\xa3')\n"]
self.assertEqual(consumed_lines, expected)
- def test_cookie_second_line_commented_first_line(self):
+ def test_cookie_second_line_empty_first_line(self):
lines = (
- b"#print('\xc2\xa3')\n",
+ b'\n',
b'# vim: set fileencoding=iso8859-15 :\n',
b"print('\xe2\x82\xac')\n"
)
encoding, consumed_lines = tokenize.detect_encoding(self.get_readline(lines))
self.assertEqual(encoding, 'iso8859-15')
- expected = [b"#print('\xc2\xa3')\n", b'# vim: set fileencoding=iso8859-15 :\n']
+ expected = [b'\n', b'# vim: set fileencoding=iso8859-15 :\n']
self.assertEqual(consumed_lines, expected)
- def test_cookie_second_line_empty_first_line(self):
+ def test_cookie_third_line(self):
lines = (
- b'\n',
- b'# vim: set fileencoding=iso8859-15 :\n',
- b"print('\xe2\x82\xac')\n"
+ b'#!/home/\xc3\xa4/bin/python\n',
+ b'# something\n',
+ b'# vim: set fileencoding=ascii :\n',
+ b'print(something)\n',
+ b'do_something(else)\n'
+ )
+ encoding, consumed_lines = tokenize.detect_encoding(self.get_readline(lines))
+ self.assertEqual(encoding, 'utf-8')
+ self.assertEqual(consumed_lines, list(lines[:2]))
+
+ def test_double_coding_line(self):
+ # If the first line matches the second line is ignored.
+ lines = (
+ b'#coding:iso8859-15\n',
+ b'#coding:latin1\n',
+ b'print(something)\n'
)
encoding, consumed_lines = tokenize.detect_encoding(self.get_readline(lines))
self.assertEqual(encoding, 'iso8859-15')
- expected = [b'\n', b'# vim: set fileencoding=iso8859-15 :\n']
- self.assertEqual(consumed_lines, expected)
+ self.assertEqual(consumed_lines, list(lines[:1]))
+
+ def test_double_coding_same_line(self):
+ lines = (
+ b'#coding:iso8859-15 coding:latin1\n',
+ b'print(something)\n'
+ )
+ encoding, consumed_lines = tokenize.detect_encoding(self.get_readline(lines))
+ self.assertEqual(encoding, 'iso8859-15')
+ self.assertEqual(consumed_lines, list(lines[:1]))
+
+ def test_double_coding_utf8(self):
+ lines = (
+ b'#coding:utf-8\n',
+ b'#coding:latin1\n',
+ b'print(something)\n'
+ )
+ encoding, consumed_lines = tokenize.detect_encoding(self.get_readline(lines))
+ self.assertEqual(encoding, 'utf-8')
+ self.assertEqual(consumed_lines, list(lines[:1]))
def test_latin1_normalization(self):
# See get_normal_name() in Parser/tokenizer/helpers.c.
@@ -1485,7 +1555,6 @@ def test_syntaxerror_latin1(self):
readline = self.get_readline(lines)
self.assertRaises(SyntaxError, tokenize.detect_encoding, readline)
-
def test_utf8_normalization(self):
# See get_normal_name() in Parser/tokenizer/helpers.c.
encodings = ("utf-8", "utf-8-mac", "utf-8-unix")
1
0
gh-139283: correctly handle `size` limit in `cursor.fetchmany()` (#139296)
by picnixz Sept. 30, 2025
by picnixz Sept. 30, 2025
Sept. 30, 2025
https://github.com/python/cpython/commit/bc172ee8307431caf4c89612e9e4540816…
commit: bc172ee8307431caf4c89612e9e454081635191f
branch: main
author: Bénédikt Tran <10796600+picnixz(a)users.noreply.github.com>
committer: picnixz <10796600+picnixz(a)users.noreply.github.com>
date: 2025-09-30T11:18:55+02:00
summary:
gh-139283: correctly handle `size` limit in `cursor.fetchmany()` (#139296)
Passing a negative or zero size to `cursor.fetchmany()` made it fetch all rows
instead of none.
While this could be considered a security vulnerability, it was decided to treat
this issue as a regular bug as passing a non-sanitized *size* value in the first
place is not recommended.
files:
A Misc/NEWS.d/next/Security/2025-09-24-13-39-56.gh-issue-139283.jODz_q.rst
M Doc/library/sqlite3.rst
M Lib/test/test_sqlite3/test_dbapi.py
M Modules/_sqlite/clinic/cursor.c.h
M Modules/_sqlite/cursor.c
M Modules/_sqlite/cursor.h
diff --git a/Doc/library/sqlite3.rst b/Doc/library/sqlite3.rst
index 6a7e15db223210..a7c9923f116f9f 100644
--- a/Doc/library/sqlite3.rst
+++ b/Doc/library/sqlite3.rst
@@ -1611,6 +1611,9 @@ Cursor objects
If the *size* parameter is used, then it is best for it to retain the same
value from one :meth:`fetchmany` call to the next.
+ .. versionchanged:: next
+ Negative *size* values are rejected by raising :exc:`ValueError`.
+
.. method:: fetchall()
Return all (remaining) rows of a query result as a :class:`list`.
@@ -1638,6 +1641,9 @@ Cursor objects
Read/write attribute that controls the number of rows returned by :meth:`fetchmany`.
The default value is 1 which means a single row would be fetched per call.
+ .. versionchanged:: next
+ Negative values are rejected by raising :exc:`ValueError`.
+
.. attribute:: connection
Read-only attribute that provides the SQLite database :class:`Connection`
diff --git a/Lib/test/test_sqlite3/test_dbapi.py b/Lib/test/test_sqlite3/test_dbapi.py
index 74a511ba7c88c2..20e39f61e4dedb 100644
--- a/Lib/test/test_sqlite3/test_dbapi.py
+++ b/Lib/test/test_sqlite3/test_dbapi.py
@@ -21,6 +21,7 @@
# 3. This notice may not be removed or altered from any source distribution.
import contextlib
+import functools
import os
import sqlite3 as sqlite
import subprocess
@@ -1060,7 +1061,7 @@ def test_array_size(self):
# now set to 2
self.cu.arraysize = 2
- # now make the query return 3 rows
+ # now make the query return 2 rows from a table of 3 rows
self.cu.execute("delete from test")
self.cu.execute("insert into test(name) values ('A')")
self.cu.execute("insert into test(name) values ('B')")
@@ -1070,13 +1071,50 @@ def test_array_size(self):
self.assertEqual(len(res), 2)
+ def test_invalid_array_size(self):
+ UINT32_MAX = (1 << 32) - 1
+ setter = functools.partial(setattr, self.cu, 'arraysize')
+
+ self.assertRaises(TypeError, setter, 1.0)
+ self.assertRaises(ValueError, setter, -3)
+ self.assertRaises(OverflowError, setter, UINT32_MAX + 1)
+
def test_fetchmany(self):
+ # no active SQL statement
+ res = self.cu.fetchmany()
+ self.assertEqual(res, [])
+ res = self.cu.fetchmany(1000)
+ self.assertEqual(res, [])
+
+ # test default parameter
+ self.cu.execute("select name from test")
+ res = self.cu.fetchmany()
+ self.assertEqual(len(res), 1)
+
+ # test when the number of requested rows exceeds the actual count
self.cu.execute("select name from test")
res = self.cu.fetchmany(100)
self.assertEqual(len(res), 1)
res = self.cu.fetchmany(100)
self.assertEqual(res, [])
+ # test when size = 0
+ self.cu.execute("select name from test")
+ res = self.cu.fetchmany(0)
+ self.assertEqual(res, [])
+ res = self.cu.fetchmany(100)
+ self.assertEqual(len(res), 1)
+ res = self.cu.fetchmany(100)
+ self.assertEqual(res, [])
+
+ def test_invalid_fetchmany(self):
+ UINT32_MAX = (1 << 32) - 1
+ fetchmany = self.cu.fetchmany
+
+ self.assertRaises(TypeError, fetchmany, 1.0)
+ self.assertRaises(ValueError, fetchmany, -3)
+ self.assertRaises(OverflowError, fetchmany, UINT32_MAX + 1)
+
def test_fetchmany_kw_arg(self):
"""Checks if fetchmany works with keyword arguments"""
self.cu.execute("select name from test")
diff --git a/Misc/NEWS.d/next/Security/2025-09-24-13-39-56.gh-issue-139283.jODz_q.rst b/Misc/NEWS.d/next/Security/2025-09-24-13-39-56.gh-issue-139283.jODz_q.rst
new file mode 100644
index 00000000000000..a8fd83bca52554
--- /dev/null
+++ b/Misc/NEWS.d/next/Security/2025-09-24-13-39-56.gh-issue-139283.jODz_q.rst
@@ -0,0 +1,4 @@
+:mod:`sqlite3`: correctly handle maximum number of rows to fetch in
+:meth:`Cursor.fetchmany <sqlite3.Cursor.fetchmany>` and reject negative
+values for :attr:`Cursor.arraysize <sqlite3.Cursor.arraysize>`. Patch by
+Bénédikt Tran.
diff --git a/Modules/_sqlite/clinic/cursor.c.h b/Modules/_sqlite/clinic/cursor.c.h
index 350577f488df4b..3cad9f3aef5ecd 100644
--- a/Modules/_sqlite/clinic/cursor.c.h
+++ b/Modules/_sqlite/clinic/cursor.c.h
@@ -6,6 +6,7 @@ preserve
# include "pycore_gc.h" // PyGC_Head
# include "pycore_runtime.h" // _Py_ID()
#endif
+#include "pycore_long.h" // _PyLong_UInt32_Converter()
#include "pycore_modsupport.h" // _PyArg_CheckPositional()
static int
@@ -181,7 +182,7 @@ PyDoc_STRVAR(pysqlite_cursor_fetchmany__doc__,
{"fetchmany", _PyCFunction_CAST(pysqlite_cursor_fetchmany), METH_FASTCALL|METH_KEYWORDS, pysqlite_cursor_fetchmany__doc__},
static PyObject *
-pysqlite_cursor_fetchmany_impl(pysqlite_Cursor *self, int maxrows);
+pysqlite_cursor_fetchmany_impl(pysqlite_Cursor *self, uint32_t maxrows);
static PyObject *
pysqlite_cursor_fetchmany(PyObject *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
@@ -216,7 +217,7 @@ pysqlite_cursor_fetchmany(PyObject *self, PyObject *const *args, Py_ssize_t narg
#undef KWTUPLE
PyObject *argsbuf[1];
Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 0;
- int maxrows = ((pysqlite_Cursor *)self)->arraysize;
+ uint32_t maxrows = ((pysqlite_Cursor *)self)->arraysize;
args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser,
/*minpos*/ 0, /*maxpos*/ 1, /*minkw*/ 0, /*varpos*/ 0, argsbuf);
@@ -226,8 +227,7 @@ pysqlite_cursor_fetchmany(PyObject *self, PyObject *const *args, Py_ssize_t narg
if (!noptargs) {
goto skip_optional_pos;
}
- maxrows = PyLong_AsInt(args[0]);
- if (maxrows == -1 && PyErr_Occurred()) {
+ if (!_PyLong_UInt32_Converter(args[0], &maxrows)) {
goto exit;
}
skip_optional_pos:
@@ -329,4 +329,46 @@ pysqlite_cursor_close(PyObject *self, PyObject *Py_UNUSED(ignored))
{
return pysqlite_cursor_close_impl((pysqlite_Cursor *)self);
}
-/*[clinic end generated code: output=d05c7cbbc8bcab26 input=a9049054013a1b77]*/
+
+#if !defined(_sqlite3_Cursor_arraysize_DOCSTR)
+# define _sqlite3_Cursor_arraysize_DOCSTR NULL
+#endif
+#if defined(_SQLITE3_CURSOR_ARRAYSIZE_GETSETDEF)
+# undef _SQLITE3_CURSOR_ARRAYSIZE_GETSETDEF
+# define _SQLITE3_CURSOR_ARRAYSIZE_GETSETDEF {"arraysize", (getter)_sqlite3_Cursor_arraysize_get, (setter)_sqlite3_Cursor_arraysize_set, _sqlite3_Cursor_arraysize_DOCSTR},
+#else
+# define _SQLITE3_CURSOR_ARRAYSIZE_GETSETDEF {"arraysize", (getter)_sqlite3_Cursor_arraysize_get, NULL, _sqlite3_Cursor_arraysize_DOCSTR},
+#endif
+
+static PyObject *
+_sqlite3_Cursor_arraysize_get_impl(pysqlite_Cursor *self);
+
+static PyObject *
+_sqlite3_Cursor_arraysize_get(PyObject *self, void *Py_UNUSED(context))
+{
+ return _sqlite3_Cursor_arraysize_get_impl((pysqlite_Cursor *)self);
+}
+
+#if !defined(_sqlite3_Cursor_arraysize_DOCSTR)
+# define _sqlite3_Cursor_arraysize_DOCSTR NULL
+#endif
+#if defined(_SQLITE3_CURSOR_ARRAYSIZE_GETSETDEF)
+# undef _SQLITE3_CURSOR_ARRAYSIZE_GETSETDEF
+# define _SQLITE3_CURSOR_ARRAYSIZE_GETSETDEF {"arraysize", (getter)_sqlite3_Cursor_arraysize_get, (setter)_sqlite3_Cursor_arraysize_set, _sqlite3_Cursor_arraysize_DOCSTR},
+#else
+# define _SQLITE3_CURSOR_ARRAYSIZE_GETSETDEF {"arraysize", NULL, (setter)_sqlite3_Cursor_arraysize_set, NULL},
+#endif
+
+static int
+_sqlite3_Cursor_arraysize_set_impl(pysqlite_Cursor *self, PyObject *value);
+
+static int
+_sqlite3_Cursor_arraysize_set(PyObject *self, PyObject *value, void *Py_UNUSED(context))
+{
+ int return_value;
+
+ return_value = _sqlite3_Cursor_arraysize_set_impl((pysqlite_Cursor *)self, value);
+
+ return return_value;
+}
+/*[clinic end generated code: output=a0e3ebba9e4d0ece input=a9049054013a1b77]*/
diff --git a/Modules/_sqlite/cursor.c b/Modules/_sqlite/cursor.c
index 0c3f43d0e50b43..2bca411bfd9ba2 100644
--- a/Modules/_sqlite/cursor.c
+++ b/Modules/_sqlite/cursor.c
@@ -1159,35 +1159,31 @@ pysqlite_cursor_fetchone_impl(pysqlite_Cursor *self)
/*[clinic input]
_sqlite3.Cursor.fetchmany as pysqlite_cursor_fetchmany
- size as maxrows: int(c_default='((pysqlite_Cursor *)self)->arraysize') = 1
+ size as maxrows: uint32(c_default='((pysqlite_Cursor *)self)->arraysize') = 1
The default value is set by the Cursor.arraysize attribute.
Fetches several rows from the resultset.
[clinic start generated code]*/
static PyObject *
-pysqlite_cursor_fetchmany_impl(pysqlite_Cursor *self, int maxrows)
-/*[clinic end generated code: output=a8ef31fea64d0906 input=035dbe44a1005bf2]*/
+pysqlite_cursor_fetchmany_impl(pysqlite_Cursor *self, uint32_t maxrows)
+/*[clinic end generated code: output=3325f2b477c71baf input=a509c412aa70b27e]*/
{
PyObject* row;
PyObject* list;
- int counter = 0;
list = PyList_New(0);
if (!list) {
return NULL;
}
- while ((row = pysqlite_cursor_iternext((PyObject *)self))) {
- if (PyList_Append(list, row) < 0) {
- Py_DECREF(row);
- break;
- }
+ while (maxrows > 0 && (row = pysqlite_cursor_iternext((PyObject *)self))) {
+ int rc = PyList_Append(list, row);
Py_DECREF(row);
-
- if (++counter == maxrows) {
+ if (rc < 0) {
break;
}
+ maxrows--;
}
if (PyErr_Occurred()) {
@@ -1301,6 +1297,30 @@ pysqlite_cursor_close_impl(pysqlite_Cursor *self)
Py_RETURN_NONE;
}
+/*[clinic input]
+@getter
+_sqlite3.Cursor.arraysize
+[clinic start generated code]*/
+
+static PyObject *
+_sqlite3_Cursor_arraysize_get_impl(pysqlite_Cursor *self)
+/*[clinic end generated code: output=e0919d97175e6c50 input=3278f8d3ecbd90e3]*/
+{
+ return PyLong_FromUInt32(self->arraysize);
+}
+
+/*[clinic input]
+@setter
+_sqlite3.Cursor.arraysize
+[clinic start generated code]*/
+
+static int
+_sqlite3_Cursor_arraysize_set_impl(pysqlite_Cursor *self, PyObject *value)
+/*[clinic end generated code: output=af59a6b09f8cce6e input=ace48cb114e26060]*/
+{
+ return PyLong_AsUInt32(value, &self->arraysize);
+}
+
static PyMethodDef cursor_methods[] = {
PYSQLITE_CURSOR_CLOSE_METHODDEF
PYSQLITE_CURSOR_EXECUTEMANY_METHODDEF
@@ -1318,7 +1338,6 @@ static struct PyMemberDef cursor_members[] =
{
{"connection", _Py_T_OBJECT, offsetof(pysqlite_Cursor, connection), Py_READONLY},
{"description", _Py_T_OBJECT, offsetof(pysqlite_Cursor, description), Py_READONLY},
- {"arraysize", Py_T_INT, offsetof(pysqlite_Cursor, arraysize), 0},
{"lastrowid", _Py_T_OBJECT, offsetof(pysqlite_Cursor, lastrowid), Py_READONLY},
{"rowcount", Py_T_LONG, offsetof(pysqlite_Cursor, rowcount), Py_READONLY},
{"row_factory", _Py_T_OBJECT, offsetof(pysqlite_Cursor, row_factory), 0},
@@ -1326,6 +1345,11 @@ static struct PyMemberDef cursor_members[] =
{NULL}
};
+static struct PyGetSetDef cursor_getsets[] = {
+ _SQLITE3_CURSOR_ARRAYSIZE_GETSETDEF
+ {NULL},
+};
+
static const char cursor_doc[] =
PyDoc_STR("SQLite database cursor class.");
@@ -1336,6 +1360,7 @@ static PyType_Slot cursor_slots[] = {
{Py_tp_iternext, pysqlite_cursor_iternext},
{Py_tp_methods, cursor_methods},
{Py_tp_members, cursor_members},
+ {Py_tp_getset, cursor_getsets},
{Py_tp_init, pysqlite_cursor_init},
{Py_tp_traverse, cursor_traverse},
{Py_tp_clear, cursor_clear},
diff --git a/Modules/_sqlite/cursor.h b/Modules/_sqlite/cursor.h
index 42f817af7c54ad..c840a3d7ed0d15 100644
--- a/Modules/_sqlite/cursor.h
+++ b/Modules/_sqlite/cursor.h
@@ -35,7 +35,7 @@ typedef struct
pysqlite_Connection* connection;
PyObject* description;
PyObject* row_cast_map;
- int arraysize;
+ uint32_t arraysize;
PyObject* lastrowid;
long rowcount;
PyObject* row_factory;
1
0
Sept. 30, 2025
https://github.com/python/cpython/commit/bd1ada6df35f29069fc478ce3b0fee3372…
commit: bd1ada6df35f29069fc478ce3b0fee3372df075a
branch: main
author: AN Long <aisk(a)users.noreply.github.com>
committer: encukou <encukou(a)gmail.com>
date: 2025-09-30T11:08:50+02:00
summary:
gh-138092: Allow calling mmap.flush with offset only (#138093)
files:
A Misc/NEWS.d/next/Library/2025-08-24-02-04-32.gh-issue-138092.V4-wTO.rst
M Doc/library/mmap.rst
M Lib/test/test_mmap.py
M Modules/mmapmodule.c
diff --git a/Doc/library/mmap.rst b/Doc/library/mmap.rst
index bd3f7229bdaf70..b6ffb5cebc020e 100644
--- a/Doc/library/mmap.rst
+++ b/Doc/library/mmap.rst
@@ -212,8 +212,7 @@ To map anonymous memory, -1 should be passed as the fileno along with the length
Writable :term:`bytes-like object` is now accepted.
- .. method:: flush()
- flush(offset, size, /)
+ .. method:: flush([offset[, size]])
Flushes changes made to the in-memory copy of a file back to disk. Without
use of this call there is no guarantee that changes are written back before
@@ -230,6 +229,12 @@ To map anonymous memory, -1 should be passed as the fileno along with the length
on error under Windows. A zero value was returned on success; an
exception was raised on error under Unix.
+ .. versionchanged:: next
+ Allow specifying *offset* without *size*. Previously, both *offset*
+ and *size* parameters were required together. Now *offset* can be
+ specified alone, and the flush operation will extend from *offset*
+ to the end of the mmap.
+
.. method:: madvise(option[, start[, length]])
diff --git a/Lib/test/test_mmap.py b/Lib/test/test_mmap.py
index 75ea1a671b67de..0571eed23f72dc 100644
--- a/Lib/test/test_mmap.py
+++ b/Lib/test/test_mmap.py
@@ -1145,6 +1145,18 @@ def test_access_violations(self):
self.assertEqual(stdout.strip(), b'')
self.assertEqual(stderr.strip(), b'')
+ def test_flush_parameters(self):
+ with open(TESTFN, 'wb+') as f:
+ f.write(b'x' * PAGESIZE * 3)
+ f.flush()
+
+ m = mmap.mmap(f.fileno(), PAGESIZE * 3)
+ self.addCleanup(m.close)
+
+ m.flush()
+ m.flush(PAGESIZE)
+ m.flush(PAGESIZE, PAGESIZE)
+
class LargeMmapTests(unittest.TestCase):
diff --git a/Misc/NEWS.d/next/Library/2025-08-24-02-04-32.gh-issue-138092.V4-wTO.rst b/Misc/NEWS.d/next/Library/2025-08-24-02-04-32.gh-issue-138092.V4-wTO.rst
new file mode 100644
index 00000000000000..21c28c49628cba
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-08-24-02-04-32.gh-issue-138092.V4-wTO.rst
@@ -0,0 +1,2 @@
+Fixed a bug in :meth:`mmap.mmap.flush` where calling with only an offset
+parameter would fail.
diff --git a/Modules/mmapmodule.c b/Modules/mmapmodule.c
index 8caadde8ae211b..41d117162882d7 100644
--- a/Modules/mmapmodule.c
+++ b/Modules/mmapmodule.c
@@ -933,11 +933,15 @@ static PyObject *
mmap_flush_method(PyObject *op, PyObject *args)
{
Py_ssize_t offset = 0;
+ Py_ssize_t size = -1;
mmap_object *self = mmap_object_CAST(op);
- Py_ssize_t size = self->size;
CHECK_VALID(NULL);
- if (!PyArg_ParseTuple(args, "|nn:flush", &offset, &size))
+ if (!PyArg_ParseTuple(args, "|nn:flush", &offset, &size)) {
return NULL;
+ }
+ if (size == -1) {
+ size = self->size - offset;
+ }
if (size < 0 || offset < 0 || self->size - offset < size) {
PyErr_SetString(PyExc_ValueError, "flush values out of range");
return NULL;
1
0
Sept. 29, 2025
https://github.com/python/cpython/commit/622f37f9dd0624b97e599ebb44bfa5cbdf…
commit: 622f37f9dd0624b97e599ebb44bfa5cbdf86cfb6
branch: 3.14
author: Miss Islington (bot) <31488909+miss-islington(a)users.noreply.github.com>
committer: hugovk <1324225+hugovk(a)users.noreply.github.com>
date: 2025-09-30T00:12:28+03:00
summary:
[3.14] Remove link to the PDF downloads (GH-139142) (#139426)
Remove link to the PDF downloads (GH-139142)
(cherry picked from commit 6b5f15698a436591f7c305c576a2d366c38d9997)
Co-authored-by: Adam Turner <9087854+AA-Turner(a)users.noreply.github.com>
files:
M Doc/tools/templates/download.html
diff --git a/Doc/tools/templates/download.html b/Doc/tools/templates/download.html
index 47a57eb111ba50..26fc9571d17423 100644
--- a/Doc/tools/templates/download.html
+++ b/Doc/tools/templates/download.html
@@ -40,11 +40,6 @@ <h1>{% trans %}Download Python {{ dl_version }} documentation{% endtrans %}</h1>
<th>{% trans %}Packed as .zip{% endtrans %}</th>
<th>{% trans %}Packed as .tar.bz2{% endtrans %}</th>
</tr>
- <tr>
- <td>{% trans %}PDF{% endtrans %}</td>
- <td>{% trans download_size="17" %}<a href="{{ dl_base }}/python-{{ dl_version }}-docs-pdf-a4.zip">Download</a> (ca. {{ download_size }} MiB){% endtrans %}</td>
- <td>{% trans download_size="17" %}<a href="{{ dl_base }}/python-{{ dl_version }}-docs-pdf-a4.tar.bz2">Download</a> (ca. {{ download_size }} MiB){% endtrans %}</td>
- </tr>
<tr>
<td>{% trans %}HTML{% endtrans %}</td>
<td>{% trans download_size="13" %}<a href="{{ dl_base }}/python-{{ dl_version }}-docs-html.zip">Download</a> (ca. {{ download_size }} MiB){% endtrans %}</td>
@@ -69,6 +64,13 @@ <h1>{% trans %}Download Python {{ dl_version }} documentation{% endtrans %}</h1>
<p>{% trans %}These archives contain all the content in the documentation.{% endtrans %}</p>
+<p>{% trans %}
+We no longer provide pre-built PDFs of the documentation.
+To build a PDF archive, follow the instructions in the
+<a href="https://devguide.python.org/documentation/start-documenting/#building-the-d…">Developer's Guide</a>
+and run <code>make dist-pdf</code> in the <code>Doc/</code> directory of a copy of the CPython repository.
+{% endtrans %}</p>
+
<h2>{% trans %}Unpacking{% endtrans %}</h2>
1
0