[pypy-svn] r79409 - in pypy/branch/fast-forward/pypy/module/_io: . test
afa at codespeak.net
afa at codespeak.net
Tue Nov 23 16:24:51 CET 2010
Author: afa
Date: Tue Nov 23 16:24:49 2010
New Revision: 79409
Modified:
pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py
pypy/branch/fast-forward/pypy/module/_io/interp_textio.py
pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py
Log:
Test and fix around TextIOWrapper.close() and __del__()
Modified: pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py Tue Nov 23 16:24:49 2010
@@ -1,5 +1,5 @@
from pypy.interpreter.typedef import (
- TypeDef, generic_new_descr)
+ TypeDef, generic_new_descr, GetSetProperty)
from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.interpreter.baseobjspace import ObjSpace, W_Root
from pypy.interpreter.error import OperationError, operationerrfmt
@@ -20,11 +20,12 @@
W_BufferedIOBase.__init__(self, space)
self.pos = 0
self.string_size = 0
- self.buf = []
+ self.buf = None
@unwrap_spec('self', ObjSpace, W_Root)
def descr_init(self, space, w_initvalue=None):
# In case __init__ is called multiple times
+ self.buf = []
self.string_size = 0
self.pos = 0
@@ -32,6 +33,12 @@
self.write_w(space, w_initvalue)
self.pos = 0
+ def _check_closed(self, space, message=None):
+ if self.buf is None:
+ if message is None:
+ message = "I/O operation on closed file"
+ raise OperationError(space.w_ValueError, space.wrap(message))
+
@unwrap_spec('self', ObjSpace, W_Root)
def read_w(self, space, w_size=None):
self._check_closed(space)
@@ -169,6 +176,13 @@
def seekable_w(self, space):
return space.w_True
+ @unwrap_spec('self', ObjSpace)
+ def close_w(self, space):
+ self.buf = None
+
+ def closed_get_w(space, self):
+ return space.wrap(self.buf is None)
+
W_BytesIO.typedef = TypeDef(
'BytesIO', W_BufferedIOBase.typedef,
__new__ = generic_new_descr(W_BytesIO),
@@ -185,5 +199,7 @@
readable = interp2app(W_BytesIO.readable_w),
writable = interp2app(W_BytesIO.writable_w),
seekable = interp2app(W_BytesIO.seekable_w),
+ close = interp2app(W_BytesIO.close_w),
+ closed = GetSetProperty(W_BytesIO.closed_get_w),
)
Modified: pypy/branch/fast-forward/pypy/module/_io/interp_textio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_textio.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_textio.py Tue Nov 23 16:24:49 2010
@@ -361,6 +361,10 @@
self._check_init(space)
return space.call_method(self.w_buffer, "fileno")
+ def closed_get_w(space, self):
+ self._check_init(space)
+ return space.getattr(self.w_buffer, space.wrap("closed"))
+
@unwrap_spec('self', ObjSpace)
def flush_w(self, space):
self._check_closed(space)
@@ -371,8 +375,9 @@
@unwrap_spec('self', ObjSpace)
def close_w(self, space):
self._check_init(space)
- if not self._closed(space):
- return space.call_method(self, "flush")
+ if not space.is_true(space.getattr(self.w_buffer,
+ space.wrap("closed"))):
+ space.call_method(self, "flush")
return space.call_method(self.w_buffer, "close")
# _____________________________________________________________
@@ -813,4 +818,5 @@
writable = interp2app(W_TextIOWrapper.writable_w),
seekable = interp2app(W_TextIOWrapper.seekable_w),
fileno = interp2app(W_TextIOWrapper.fileno_w),
+ closed = GetSetProperty(W_TextIOWrapper.closed_get_w),
)
Modified: pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py Tue Nov 23 16:24:49 2010
@@ -106,6 +106,20 @@
assert f.read() == data * 2
assert buf.getvalue() == (data * 2).encode(encoding)
+ def test_destructor(self):
+ import _io
+ l = []
+ class MyBytesIO(_io.BytesIO):
+ def close(self):
+ l.append(self.getvalue())
+ _io.BytesIO.close(self)
+ b = MyBytesIO()
+ t = _io.TextIOWrapper(b, encoding="ascii")
+ t.write(u"abc")
+ del t
+ import gc; gc.collect()
+ assert l == ["abc"]
+
class AppTestIncrementalNewlineDecoder:
def test_newline_decoder(self):
More information about the Pypy-commit
mailing list