[pypy-svn] pypy default: Fixed test_file2k.py, added support for PYTHONIOENCODING.
alex_gaynor
commits-noreply at bitbucket.org
Sat Mar 5 21:37:59 CET 2011
Author: Alex Gaynor <alex.gaynor at gmail.com>
Branch:
Changeset: r42427:327b99e513e5
Date: 2011-03-05 12:37 -0800
http://bitbucket.org/pypy/pypy/changeset/327b99e513e5/
Log: Fixed test_file2k.py, added support for PYTHONIOENCODING.
diff --git a/pypy/module/_file/interp_file.py b/pypy/module/_file/interp_file.py
--- a/pypy/module/_file/interp_file.py
+++ b/pypy/module/_file/interp_file.py
@@ -28,8 +28,10 @@
stream = None
w_name = None
mode = "<uninitialized file>"
+ binary = False
softspace= 0 # Required according to file object docs
- encoding = None # This is not used internally by file objects
+ encoding = None
+ errors = None
fd = -1
newlines = 0 # Updated when the stream is closed
@@ -46,6 +48,7 @@
def fdopenstream(self, stream, fd, mode, w_name=None):
self.fd = fd
self.mode = mode
+ self.binary = "b" in mode
if w_name is not None:
self.w_name = w_name
self.stream = stream
@@ -229,8 +232,11 @@
size = space.r_longlong_w(w_size)
stream.truncate(size)
- @unwrap_spec(data='bufferstr')
- def direct_write(self, data):
+ def direct_write(self, w_data):
+ space = self.space
+ if not self.binary and space.isinstance_w(w_data, space.w_unicode):
+ w_data = space.call_method(w_data, "encode", space.wrap(self.encoding), space.wrap(self.errors))
+ data = space.bufferstr_w(w_data)
self.softspace = 0
self.getstream().write(data)
@@ -423,7 +429,7 @@
if not e.match(space, space.w_StopIteration):
raise
break # done
- self.file_write(space.str_w(w_line))
+ self.file_write(w_line)
def file_readinto(self, w_rwbuffer):
"""readinto() -> Undocumented. Don't use this; it may go away."""
@@ -510,6 +516,7 @@
doc = "file mode ('r', 'U', 'w', 'a', "
"possibly with 'b' or '+' added)"),
encoding = interp_attrproperty('encoding', cls=W_File),
+ errors = interp_attrproperty('errors', cls=W_File),
closed = GetSetProperty(descr_file_closed, cls=W_File,
doc="True if the file is closed"),
newlines = GetSetProperty(descr_file_newlines, cls=W_File,
@@ -538,3 +545,9 @@
def getopenstreams(space):
return space.fromcache(FileState).openstreams
+
+
+ at unwrap_spec(file=W_File, encoding="str_or_None", errors="str_or_None")
+def set_file_encoding(space, file, encoding=None, errors=None):
+ file.encoding = encoding
+ file.errors = errors
\ No newline at end of file
diff --git a/pypy/module/__pypy__/interp_magic.py b/pypy/module/__pypy__/interp_magic.py
--- a/pypy/module/__pypy__/interp_magic.py
+++ b/pypy/module/__pypy__/interp_magic.py
@@ -3,6 +3,8 @@
from pypy.rlib.objectmodel import we_are_translated
from pypy.objspace.std.typeobject import MethodCache
from pypy.objspace.std.mapdict import IndexCache
+from pypy.module._file.interp_file import W_File
+
def internal_repr(space, w_object):
return space.wrap('%r' % (w_object,))
diff --git a/pypy/translator/goal/app_main.py b/pypy/translator/goal/app_main.py
--- a/pypy/translator/goal/app_main.py
+++ b/pypy/translator/goal/app_main.py
@@ -254,6 +254,22 @@
sys.path.append(dir)
_seen[dir] = True
+def set_io_encoding(io_encoding):
+ try:
+ import _file
+ except ImportError:
+ import ctypes # HACK: while running on top of CPython
+ set_file_encoding = ctypes.pythonapi.PyFile_SetEncodingAndErrors
+ set_file_encoding.argtypes = [ctypes.py_object, ctypes.c_char_p, ctypes.c_char_p]
+ else:
+ set_file_encoding = _file.set_file_encoding
+ if ":" in io_encoding:
+ encoding, errors = io_encoding.split(":", 1)
+ else:
+ encoding, errors = io_encoding, None
+ for f in [sys.stdin, sys.stdout, sys.stderr]:
+ set_file_encoding(f, encoding, errors)
+
# Order is significant!
sys_flags = (
"debug",
@@ -447,7 +463,6 @@
elif not sys.stdout.isatty():
set_fully_buffered_io()
-
mainmodule = type(sys)('__main__')
sys.modules['__main__'] = mainmodule
@@ -458,6 +473,10 @@
print >> sys.stderr, "'import site' failed"
readenv = not ignore_environment
+ io_encoding = readenv and os.getenv("PYTHONIOENCODING")
+ if io_encoding:
+ set_io_encoding(io_encoding)
+
pythonwarnings = readenv and os.getenv('PYTHONWARNINGS')
if pythonwarnings:
warnoptions.extend(pythonwarnings.split(','))
diff --git a/pypy/translator/goal/test2/test_app_main.py b/pypy/translator/goal/test2/test_app_main.py
--- a/pypy/translator/goal/test2/test_app_main.py
+++ b/pypy/translator/goal/test2/test_app_main.py
@@ -3,7 +3,7 @@
"""
from __future__ import with_statement
import py
-import sys, os, re, runpy
+import sys, os, re, runpy, subprocess
import autopath
from pypy.tool.udir import udir
from contextlib import contextmanager
@@ -535,11 +535,16 @@
class TestNonInteractive:
def run(self, cmdline, senddata='', expect_prompt=False,
- expect_banner=False, python_flags=''):
+ expect_banner=False, python_flags='', env=None):
cmdline = '%s %s "%s" %s' % (sys.executable, python_flags,
app_main, cmdline)
print 'POPEN:', cmdline
- child_in, child_out_err = os.popen4(cmdline)
+ process = subprocess.Popen(
+ cmdline,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ shell=True, env=env
+ )
+ child_in, child_out_err = process.stdin, process.stdout
child_in.write(senddata)
child_in.close()
data = child_out_err.read()
@@ -728,6 +733,27 @@
data = self.run(p + os.sep)
assert data == '42\n'
+ def test_pythonioencoding(self):
+ if sys.version_info < (2, 7):
+ skip("test requires Python >= 2.7")
+ for encoding, expected in [
+ ("iso-8859-15", "15\xa4"),
+ ("utf-8", '15\xe2\x82\xac'),
+ ("utf-16-le", '1\x005\x00\xac\x20'),
+ ("iso-8859-1:ignore", "15"),
+ ("iso-8859-1:replace", "15?"),
+ ("iso-8859-1:backslashreplace", "15\\u20ac"),
+ ]:
+ p = getscript_in_dir("""
+ import sys
+ sys.stdout.write(u'15\u20ac')
+ sys.stdout.flush()
+ """)
+ env = os.environ.copy()
+ env["PYTHONIOENCODING"] = encoding
+ data = self.run(p, env=env)
+ assert data == expected
+
class AppTestAppMain:
diff --git a/pypy/module/_file/test/test_file.py b/pypy/module/_file/test/test_file.py
--- a/pypy/module/_file/test/test_file.py
+++ b/pypy/module/_file/test/test_file.py
@@ -207,6 +207,32 @@
exc = raises(IOError, self.file, os.curdir, 'w')
assert exc.value.filename == os.curdir
+ def test_encoding_errors(self):
+ import _file
+
+ with self.file(self.temppath, "w") as f:
+ _file.set_file_encoding(f, "utf-8")
+ f.write(u'15\u20ac')
+
+ assert f.encoding == "utf-8"
+ assert f.errors is None
+
+ with self.file(self.temppath, "r") as f:
+ data = f.read()
+ assert data == '15\xe2\x82\xac'
+
+ with self.file(self.temppath, "w") as f:
+ _file.set_file_encoding(f, "iso-8859-1", "ignore")
+ f.write(u'15\u20ac')
+
+ assert f.encoding == "iso-8859-1"
+ assert f.errors == "ignore"
+
+ with self.file(self.temppath, "r") as f:
+ data = f.read()
+ assert data == "15"
+
+
class AppTestConcurrency(object):
# these tests only really make sense on top of a translated pypy-c,
diff --git a/pypy/module/_file/__init__.py b/pypy/module/_file/__init__.py
--- a/pypy/module/_file/__init__.py
+++ b/pypy/module/_file/__init__.py
@@ -9,6 +9,7 @@
interpleveldefs = {
"file": "interp_file.W_File",
+ "set_file_encoding": "interp_file.set_file_encoding",
}
def __init__(self, space, *args):
More information about the Pypy-commit
mailing list