[pypy-svn] r78375 - in pypy/branch/fast-forward/pypy/module/_io: . test
afa at codespeak.net
afa at codespeak.net
Wed Oct 27 19:54:45 CEST 2010
Author: afa
Date: Wed Oct 27 19:54:43 2010
New Revision: 78375
Added:
pypy/branch/fast-forward/pypy/module/_io/test/test_bytesio.py (contents, props changed)
Modified:
pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py
pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py
Log:
Start to implement _io.BytesIO
Modified: pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py Wed Oct 27 19:54:43 2010
@@ -17,6 +17,29 @@
pass
class W_BufferedIOBase(W_IOBase):
+ def _unsupportedoperation(self, space, message):
+ w_exc = space.getattr(space.getbuiltinmodule('_io'),
+ space.wrap('UnsupportedOperation'))
+ raise OperationError(w_exc, space.wrap(message))
+
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def read_w(self, space, w_size=None):
+ self._unsupportedoperation(space, "read")
+
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def write_w(self, space, w_data):
+ self._unsupportedoperation(space, "write")
+
+W_BufferedIOBase.typedef = TypeDef(
+ '_BufferedIOBase', W_IOBase.typedef,
+ __new__ = generic_new_descr(W_BufferedIOBase),
+ read = interp2app(W_BufferedIOBase.read_w),
+ write = interp2app(W_BufferedIOBase.write_w),
+ )
+
+class BufferedMixin:
+ _mixin_ = True
+
def __init__(self, space):
W_IOBase.__init__(self, space)
self.state = STATE_ZERO
@@ -37,19 +60,6 @@
self.readable = False
self.writable = False
- def _unsupportedoperation(self, space, message):
- w_exc = space.getattr(space.getbuiltinmodule('_io'),
- space.wrap('UnsupportedOperation'))
- raise OperationError(w_exc, space.wrap(message))
-
- @unwrap_spec('self', ObjSpace, W_Root)
- def read_w(self, space, w_size=None):
- self._unsupportedoperation(space, "read")
-
- @unwrap_spec('self', ObjSpace, W_Root)
- def write_w(self, space, w_size=None):
- self._unsupportedoperation(space, "write")
-
def _reader_reset_buf(self):
self.read_end = -1
@@ -57,16 +67,6 @@
self.write_pos = 0
self.write_end = -1
-W_BufferedIOBase.typedef = TypeDef(
- '_BufferedIOBase', W_IOBase.typedef,
- __new__ = generic_new_descr(W_BufferedIOBase),
- read = interp2app(W_BufferedIOBase.read_w),
- write = interp2app(W_BufferedIOBase.write_w),
- )
-
-class BufferedMixin:
- _mixin_ = True
-
def _init(self, space):
if self.buffer_size <= 0:
raise OperationError(space.w_ValueError, space.wrap(
@@ -245,11 +245,6 @@
return self._write(space, ''.join(l))
class W_BufferedReader(BufferedMixin, W_BufferedIOBase):
- def __init__(self, space):
- W_BufferedIOBase.__init__(self, space)
- self.ok = False
- self.detached = False
-
@unwrap_spec('self', ObjSpace, W_Root, int)
def descr_init(self, space, w_raw, buffer_size=DEFAULT_BUFFER_SIZE):
self.state = STATE_ZERO
Modified: pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py Wed Oct 27 19:54:43 2010
@@ -1,11 +1,97 @@
-from pypy.module._io.interp_bufferedio import W_BufferedIOBase
from pypy.interpreter.typedef import (
TypeDef, generic_new_descr)
+from pypy.interpreter.gateway import interp2app, unwrap_spec
+from pypy.interpreter.baseobjspace import ObjSpace, W_Root
+from pypy.module._io.interp_bufferedio import W_BufferedIOBase
+from pypy.module._io.interp_iobase import convert_size
+
+def buffer2string(buffer, start, end):
+ from pypy.rlib.rstring import StringBuilder
+ builder = StringBuilder(end - start)
+ for i in range(start, end):
+ builder.append(buffer[i])
+ return builder.build()
class W_BytesIO(W_BufferedIOBase):
- pass
+ def __init__(self, space):
+ W_BufferedIOBase.__init__(self, space)
+ self.pos = 0
+ self.string_size = 0
+ self.buf = []
+
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def descr_init(self, space, w_initvalue=None):
+ # In case __init__ is called multiple times
+ self.string_size = 0
+ self.pos = 0
+
+ if not space.is_w(w_initvalue, space.w_None):
+ self.write_w(space, w_initvalue)
+ self.pos = 0
+
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def read_w(self, space, w_size=None):
+ self._check_closed(space)
+ size = convert_size(space, w_size)
+
+ # adjust invalid sizes
+ n = self.string_size - self.pos
+ if not 0 <= n <= size:
+ size = n
+ if size < 0:
+ size = 0
+
+ output = buffer2string(self.buf, self.pos, self.pos + size)
+ self.pos += size
+ return space.wrap(output)
+
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def write_w(self, space, w_data):
+ self._check_closed(space)
+ buf = space.buffer_w(w_data)
+ length = buf.getlength()
+ if length == 0:
+ return
+
+ if self.pos + length > len(self.buf):
+ self.buf.extend(['\0'] * (self.pos + length - len(self.buf)))
+
+ if self.pos > self.string_size:
+ # In case of overseek, pad with null bytes the buffer region
+ # between the end of stream and the current position.
+ #
+ # 0 lo string_size hi
+ # | |<---used--->|<----------available----------->|
+ # | | <--to pad-->|<---to write---> |
+ # 0 buf position
+ for i in range(self.string_size, self.pos):
+ self.buf[i] = '\0'
+
+ # Copy the data to the internal buffer, overwriting some of the
+ # existing data if self->pos < self->string_size.
+ for i in range(length):
+ self.buf[self.pos + i] = buf.getitem(i)
+ self.pos += length
+
+ # Set the new length of the internal string if it has changed
+ if self.string_size < self.pos:
+ self.string_size = self.pos
+
+ return space.wrap(length)
+
+ @unwrap_spec('self', ObjSpace)
+ def getvalue_w(self, space):
+ self._check_closed(space)
+ return space.wrap(buffer2string(self.buf, 0, self.string_size))
+
+
W_BytesIO.typedef = TypeDef(
'BytesIO', W_BufferedIOBase.typedef,
__new__ = generic_new_descr(W_BytesIO),
+ __init__ = interp2app(W_BytesIO.descr_init),
+
+ read = interp2app(W_BytesIO.read_w),
+ write = interp2app(W_BytesIO.write_w),
+ getvalue = interp2app(W_BytesIO.getvalue_w),
)
Added: pypy/branch/fast-forward/pypy/module/_io/test/test_bytesio.py
==============================================================================
--- (empty file)
+++ pypy/branch/fast-forward/pypy/module/_io/test/test_bytesio.py Wed Oct 27 19:54:43 2010
@@ -0,0 +1,25 @@
+from pypy.conftest import gettestobjspace
+
+class AppTestBytesIO:
+ def setup_class(cls):
+ cls.space = gettestobjspace(usemodules=['_io'])
+
+ def test_write(self):
+ import _io
+ f = _io.BytesIO()
+ assert f.write("hello") == 5
+ import gc; gc.collect()
+ assert f.getvalue() == "hello"
+ f.close()
+
+ def test_read(self):
+ import _io
+ f = _io.BytesIO("hello")
+ assert f.read() == "hello"
+ import gc; gc.collect()
+ f.close()
+
+ ## def test_seek(self):
+ ## import _io
+ ## f = _io.BytesIO("hello")
+ ## assert f.seek(-1, 2) == 4
More information about the Pypy-commit
mailing list