[pypy-svn] r78375 - in pypy/branch/fast-forward/pypy/module/_io: . test

afa at codespeak.net afa at codespeak.net
Wed Oct 27 19:54:45 CEST 2010


Author: afa
Date: Wed Oct 27 19:54:43 2010
New Revision: 78375

Added:
   pypy/branch/fast-forward/pypy/module/_io/test/test_bytesio.py   (contents, props changed)
Modified:
   pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py
   pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py
Log:
Start to implement _io.BytesIO


Modified: pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_bufferedio.py	Wed Oct 27 19:54:43 2010
@@ -17,6 +17,29 @@
     pass
 
 class W_BufferedIOBase(W_IOBase):
+    def _unsupportedoperation(self, space, message):
+        w_exc = space.getattr(space.getbuiltinmodule('_io'),
+                              space.wrap('UnsupportedOperation'))
+        raise OperationError(w_exc, space.wrap(message))
+
+    @unwrap_spec('self', ObjSpace, W_Root)
+    def read_w(self, space, w_size=None):
+        self._unsupportedoperation(space, "read")
+
+    @unwrap_spec('self', ObjSpace, W_Root)
+    def write_w(self, space, w_data):
+        self._unsupportedoperation(space, "write")
+
+W_BufferedIOBase.typedef = TypeDef(
+    '_BufferedIOBase', W_IOBase.typedef,
+    __new__ = generic_new_descr(W_BufferedIOBase),
+    read = interp2app(W_BufferedIOBase.read_w),
+    write = interp2app(W_BufferedIOBase.write_w),
+    )
+
+class BufferedMixin:
+    _mixin_ = True
+
     def __init__(self, space):
         W_IOBase.__init__(self, space)
         self.state = STATE_ZERO
@@ -37,19 +60,6 @@
         self.readable = False
         self.writable = False
 
-    def _unsupportedoperation(self, space, message):
-        w_exc = space.getattr(space.getbuiltinmodule('_io'),
-                              space.wrap('UnsupportedOperation'))
-        raise OperationError(w_exc, space.wrap(message))
-
-    @unwrap_spec('self', ObjSpace, W_Root)
-    def read_w(self, space, w_size=None):
-        self._unsupportedoperation(space, "read")
-
-    @unwrap_spec('self', ObjSpace, W_Root)
-    def write_w(self, space, w_size=None):
-        self._unsupportedoperation(space, "write")
-
     def _reader_reset_buf(self):
         self.read_end = -1
 
@@ -57,16 +67,6 @@
         self.write_pos = 0
         self.write_end = -1
 
-W_BufferedIOBase.typedef = TypeDef(
-    '_BufferedIOBase', W_IOBase.typedef,
-    __new__ = generic_new_descr(W_BufferedIOBase),
-    read = interp2app(W_BufferedIOBase.read_w),
-    write = interp2app(W_BufferedIOBase.write_w),
-    )
-
-class BufferedMixin:
-    _mixin_ = True
-
     def _init(self, space):
         if self.buffer_size <= 0:
             raise OperationError(space.w_ValueError, space.wrap(
@@ -245,11 +245,6 @@
         return self._write(space, ''.join(l))
 
 class W_BufferedReader(BufferedMixin, W_BufferedIOBase):
-    def __init__(self, space):
-        W_BufferedIOBase.__init__(self, space)
-        self.ok = False
-        self.detached = False
-
     @unwrap_spec('self', ObjSpace, W_Root, int)
     def descr_init(self, space, w_raw, buffer_size=DEFAULT_BUFFER_SIZE):
         self.state = STATE_ZERO

Modified: pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_bytesio.py	Wed Oct 27 19:54:43 2010
@@ -1,11 +1,97 @@
-from pypy.module._io.interp_bufferedio import W_BufferedIOBase
 from pypy.interpreter.typedef import (
     TypeDef, generic_new_descr)
+from pypy.interpreter.gateway import interp2app, unwrap_spec
+from pypy.interpreter.baseobjspace import ObjSpace, W_Root
+from pypy.module._io.interp_bufferedio import W_BufferedIOBase
+from pypy.module._io.interp_iobase import convert_size
+
+def buffer2string(buffer, start, end):
+    from pypy.rlib.rstring import StringBuilder
+    builder = StringBuilder(end - start)
+    for i in range(start, end):
+        builder.append(buffer[i])
+    return builder.build()
 
 class W_BytesIO(W_BufferedIOBase):
-    pass
+    def __init__(self, space):
+        W_BufferedIOBase.__init__(self, space)
+        self.pos = 0
+        self.string_size = 0
+        self.buf = []
+
+    @unwrap_spec('self', ObjSpace, W_Root)
+    def descr_init(self, space, w_initvalue=None):
+        # In case __init__ is called multiple times
+        self.string_size = 0
+        self.pos = 0
+
+        if not space.is_w(w_initvalue, space.w_None):
+            self.write_w(space, w_initvalue)
+            self.pos = 0
+
+    @unwrap_spec('self', ObjSpace, W_Root)
+    def read_w(self, space, w_size=None):
+        self._check_closed(space)
+        size = convert_size(space, w_size)
+
+        # adjust invalid sizes
+        n = self.string_size - self.pos
+        if not 0 <= n <= size:
+            size = n
+            if size < 0:
+                size = 0
+
+        output = buffer2string(self.buf, self.pos, self.pos + size)
+        self.pos += size
+        return space.wrap(output)
+
+    @unwrap_spec('self', ObjSpace, W_Root)
+    def write_w(self, space, w_data):
+        self._check_closed(space)
+        buf = space.buffer_w(w_data)
+        length = buf.getlength()
+        if length == 0:
+            return
+
+        if self.pos + length > len(self.buf):
+            self.buf.extend(['\0'] * (self.pos + length - len(self.buf)))
+
+        if self.pos > self.string_size:
+            # In case of overseek, pad with null bytes the buffer region
+            # between the end of stream and the current position.
+            #
+            # 0   lo      string_size                           hi
+            # |   |<---used--->|<----------available----------->|
+            # |   |            <--to pad-->|<---to write--->    |
+            # 0   buf                   position
+            for i in range(self.string_size, self.pos):
+                self.buf[i] = '\0'
+
+        # Copy the data to the internal buffer, overwriting some of the
+        # existing data if self->pos < self->string_size.
+        for i in range(length):
+            self.buf[self.pos + i] = buf.getitem(i)
+        self.pos += length
+
+        # Set the new length of the internal string if it has changed
+        if self.string_size < self.pos:
+            self.string_size = self.pos
+
+        return space.wrap(length)
+
+    @unwrap_spec('self', ObjSpace)
+    def getvalue_w(self, space):
+        self._check_closed(space)
+        return space.wrap(buffer2string(self.buf, 0, self.string_size))
+
+
 W_BytesIO.typedef = TypeDef(
     'BytesIO', W_BufferedIOBase.typedef,
     __new__ = generic_new_descr(W_BytesIO),
+    __init__  = interp2app(W_BytesIO.descr_init),
+
+    read = interp2app(W_BytesIO.read_w),
+    write = interp2app(W_BytesIO.write_w),
+    getvalue = interp2app(W_BytesIO.getvalue_w),
     )
 

Added: pypy/branch/fast-forward/pypy/module/_io/test/test_bytesio.py
==============================================================================
--- (empty file)
+++ pypy/branch/fast-forward/pypy/module/_io/test/test_bytesio.py	Wed Oct 27 19:54:43 2010
@@ -0,0 +1,25 @@
+from pypy.conftest import gettestobjspace
+
+class AppTestBytesIO:
+    def setup_class(cls):
+        cls.space = gettestobjspace(usemodules=['_io'])
+
+    def test_write(self):
+        import _io
+        f = _io.BytesIO()
+        assert f.write("hello") == 5
+        import gc; gc.collect()
+        assert f.getvalue() == "hello"
+        f.close()
+
+    def test_read(self):
+        import _io
+        f = _io.BytesIO("hello")
+        assert f.read() == "hello"
+        import gc; gc.collect()
+        f.close()
+
+    ## def test_seek(self):
+    ##     import _io
+    ##     f = _io.BytesIO("hello")
+    ##     assert f.seek(-1, 2) == 4



More information about the Pypy-commit mailing list