[pypy-svn] r64525 - in pypy/trunk/pypy: lang/gameboy/test lib/test2 module/_stackless/test module/unicodedata rlib rlib/test

iko at codespeak.net iko at codespeak.net
Tue Apr 21 17:04:49 CEST 2009


Author: iko
Date: Tue Apr 21 17:04:49 2009
New Revision: 64525

Modified:
   pypy/trunk/pypy/lang/gameboy/test/test_cpu_register.py   (props changed)
   pypy/trunk/pypy/lib/test2/test_collections.py   (props changed)
   pypy/trunk/pypy/module/_stackless/test/test_pickle_infrastructure.py   (props changed)
   pypy/trunk/pypy/module/unicodedata/triegenerator.py   (props changed)
   pypy/trunk/pypy/rlib/streamio.py
   pypy/trunk/pypy/rlib/test/test_streamio.py
Log:
revert r64513



Modified: pypy/trunk/pypy/rlib/streamio.py
==============================================================================
--- pypy/trunk/pypy/rlib/streamio.py	(original)
+++ pypy/trunk/pypy/rlib/streamio.py	Tue Apr 21 17:04:49 2009
@@ -72,18 +72,18 @@
 
 
 def open_file_as_stream(path, mode="r", buffering=-1):
-    os_flags, universal, reading, writing, basemode, binary = decode_mode(mode)
+    os_flags, universal, reading, writing, basemode = decode_mode(mode)
     stream = open_path_helper(path, os_flags, basemode == "a")
     return construct_stream_tower(stream, buffering, universal, reading,
-                                  writing, binary)
+                                  writing)
 
 def fdopen_as_stream(fd, mode, buffering):
     # XXX XXX XXX you want do check whether the modes are compatible
     # otherwise you get funny results
-    os_flags, universal, reading, writing, basemode, binary = decode_mode(mode)
+    os_flags, universal, reading, writing, basemode = decode_mode(mode)
     stream = DiskFile(fd)
     return construct_stream_tower(stream, buffering, universal, reading,
-                                  writing, binary)
+                                  writing)
 
 def open_path_helper(path, os_flags, append):
     # XXX for now always return DiskFile
@@ -116,16 +116,16 @@
             break
 
     flag = OS_MODE[basemode, plus]
-    flag |= O_BINARY
+    if binary or universal:
+        flag |= O_BINARY
 
     reading = basemode == 'r' or plus
     writing = basemode != 'r' or plus
 
-    return flag, universal, reading, writing, basemode, binary
+    return flag, universal, reading, writing, basemode
 
 
-def construct_stream_tower(stream, buffering, universal, reading, writing,
-                           binary):
+def construct_stream_tower(stream, buffering, universal, reading, writing):
     if buffering == 0:   # no buffering
         pass
     elif buffering == 1:   # line-buffering
@@ -147,8 +147,6 @@
             stream = TextOutputFilter(stream)
         if reading:
             stream = TextInputFilter(stream)
-    elif not binary and os.linesep == '\r\n':
-        stream = TextCRLFFilter(stream)
     return stream
 
 
@@ -236,9 +234,6 @@
     def truncate(self, size):
         raise NotImplementedError
 
-    def flush_buffers(self):
-        pass
-
     def flush(self):
         pass
 
@@ -819,72 +814,6 @@
     try_to_find_file_descriptor = PassThrough("try_to_find_file_descriptor",
                                               flush_buffers=False)
 
-class TextCRLFFilter(Stream):
-
-    """Filtering stream for universal newlines.
-
-    TextInputFilter is more general, but this is faster when you don't
-    need tell/seek.
-    """
-
-    def __init__(self, base):
-        self.base = base
-        self.do_read = base.read
-        self.do_write = base.write
-        self.do_flush = base.flush_buffers
-        self.lfbuffer = ""
-
-    def read(self, n):
-        data = self.lfbuffer + self.do_read(n)
-        self.lfbuffer = ""
-        if data.endswith("\r"):
-            c = self.do_read(1)
-            if c and c[0] == '\n':
-                data = data + '\n'
-                self.lfbuffer = c[1:]
-            else:
-                self.lfbuffer = c
-
-        result = []
-        offset = 0
-        while True:
-            newoffset = data.find('\r\n', offset)
-            if newoffset < 0:
-                result.append(data[offset:])
-                break
-            result.append(data[offset:newoffset])
-            offset = newoffset + 2
-
-        return '\n'.join(result)
-
-    def tell(self):
-        pos = self.base.tell()
-        return pos - len(self.lfbuffer)
-
-    def seek(self, offset, whence):
-        if whence == 1:
-            offset -= len(self.lfbuffer)   # correct for already-read-ahead character
-        self.base.seek(offset, whence)
-        self.lfbuffer = ""
-
-    def flush_buffers(self):
-        if self.lfbuffer:
-            self.base.seek(-len(self.lfbuffer), 1)
-            self.lfbuffer = ""
-        self.do_flush()
-
-    def write(self, data):
-        data = replace_char_with_str(data, '\n', '\r\n')
-        self.flush_buffers()
-        self.do_write(data)
-
-    truncate = PassThrough("truncate", flush_buffers=True)
-    flush    = PassThrough("flush", flush_buffers=False)
-    flushable= PassThrough("flushable", flush_buffers=False)
-    close    = PassThrough("close", flush_buffers=False)
-    try_to_find_file_descriptor = PassThrough("try_to_find_file_descriptor",
-                                              flush_buffers=False)
-    
 class TextInputFilter(Stream):
 
     """Filtering input stream for universal newline translation."""

Modified: pypy/trunk/pypy/rlib/test/test_streamio.py
==============================================================================
--- pypy/trunk/pypy/rlib/test/test_streamio.py	(original)
+++ pypy/trunk/pypy/rlib/test/test_streamio.py	Tue Apr 21 17:04:49 2009
@@ -590,83 +590,6 @@
 class TestCRLFFilterOOinterp(BaseTestCRLFFilter, OORtypeMixin):
     pass
 
-class BaseTestTextCRLFFilter(BaseRtypingTest):
-    def test_simple(self):
-        packets = ["abc\r\n", "abc\r", "\nd\r\nef\r\ngh", "a\rbc\r", "def\n",
-                   "\r", "\n\r"]
-        expected = ["abc\n", "abc\n", "d\nef\ngh", "a\rbc\r", "def\n", "\n",
-                    "\r"]
-        crlf = streamio.TextCRLFFilter(TSource(packets))
-        def f():
-            blocks = []
-            while True:
-                block = crlf.read(100)
-                if not block:
-                    break
-                blocks.append(block)
-            assert blocks == expected
-        self.interpret(f, [])
-
-    def test_readline_and_seek(self):
-        packets = ["abc\r\n", "abc\r", "\nd\r\nef\r\ngh", "a\rbc\r", "def\n",
-                   "\r", "\n\r"]
-        expected = ["abc\n", "abc\n", "d\n","ef\n", "gha\rbc\rdef\n", "\n",
-                    "\r"]
-        crlf = streamio.TextCRLFFilter(TSource(packets))
-        def f():
-            lines = []
-            while True:
-                pos = crlf.tell()
-                line = crlf.readline()
-                if not line:
-                    break
-                crlf.seek(pos, 0)
-                line2 = crlf.readline()
-                assert line2 == line                         
-                lines.append(line)
-            assert lines == expected
-        self.interpret(f, [])
-
-    def test_seek_relative(self):
-        packets = ["abc\r\n", "abc\r", "\nd\r\nef\r"]
-        expected = ["abc\n", "abc\n", "d\n","ef\r"]
-
-        crlf = streamio.TextCRLFFilter(TSource(packets))
-        def f():
-            lines = []
-            while True:
-                pos = crlf.tell()
-                line = crlf.readline()
-                if not line:
-                    break
-                crlf.seek(0, 1)
-                lines.append(line)
-            assert lines == expected
-        self.interpret(f, [])
-
-    def test_write(self):
-        data = "line1\r\nline2\rline3\r\n"
-        crlf = streamio.TextCRLFFilter(TReaderWriter(data))
-        def f():
-            line = crlf.readline()
-            assert line == 'line1\n'
-            line = crlf.read(6)
-            assert line == 'line2\r'
-            pos = crlf.tell()
-            crlf.write('line3\n')
-            crlf.seek(pos,0)
-            line = crlf.readline()
-            assert line == 'line3\n'
-            line = crlf.readline()
-            assert line == ''
-        self.interpret(f, [])
-
-class TestTextCRLFFilterLLInterp(BaseTestTextCRLFFilter, LLRtypeMixin):
-    pass
-        
-class TestTextCRLFFilterOOInterp(BaseTestTextCRLFFilter, OORtypeMixin):
-    pass
-        
 class TestMMapFile(BaseTestBufferingInputStreamTests):
     tfn = None
     fd = None



More information about the Pypy-commit mailing list