[pypy-svn] r31101 - in pypy/dist/pypy/module/bz2: . test

rhymes at codespeak.net rhymes at codespeak.net
Mon Aug 7 12:49:35 CEST 2006


Author: rhymes
Date: Mon Aug  7 12:49:31 2006
New Revision: 31101

Modified:
   pypy/dist/pypy/module/bz2/interp_bz2.py
   pypy/dist/pypy/module/bz2/test/test_bz2.py
Log:
implemented BZ2Decompressor.decompress()

Modified: pypy/dist/pypy/module/bz2/interp_bz2.py
==============================================================================
--- pypy/dist/pypy/module/bz2/interp_bz2.py	(original)
+++ pypy/dist/pypy/module/bz2/interp_bz2.py	Mon Aug  7 12:49:31 2006
@@ -133,6 +133,8 @@
 libbz2.BZ2_bzDecompressInit.restype = c_int
 libbz2.BZ2_bzDecompressEnd.argtypes = [POINTER(bz_stream)]
 libbz2.BZ2_bzDecompressEnd.restype = c_int
+libbz2.BZ2_bzDecompress.argtypes = [POINTER(bz_stream)]
+libbz2.BZ2_bzDecompress.restype = c_int
 
 libc.strerror.restype = c_char_p
 libc.strerror.argtypes = [c_int]
@@ -922,9 +924,71 @@
     def __del__(self):
         libbz2.BZ2_bzDecompressEnd(byref(self.bzs))
     
+    def decompress(self, data):
+        """"decompress(data) -> string
+
+        Provide more data to the decompressor object. It will return chunks
+        of decompressed data whenever possible. If you try to decompress data
+        after the end of stream is found, EOFError will be raised. If any data
+        was found after the end of stream, it'll be ignored and saved in
+        unused_data attribute."""
+        
+        if not self.running:
+            raise OperationError(self.space.w_EOFError,
+                self.space.wrap("end of stream was already found"))
+        
+        in_bufsize = len(data)
+        in_buf = create_string_buffer(in_bufsize)
+        in_buf.value = data
+
+        out_bufsize = SMALLCHUNK
+        out_buf = create_string_buffer(out_bufsize)
+        
+        self.bzs.next_in = in_buf
+        self.bzs.avail_in = in_bufsize
+        self.bzs.next_out = out_buf
+        self.bzs.avail_out = out_bufsize
+        
+        temp = []
+        while True:
+            bzerror = libbz2.BZ2_bzDecompress(byref(self.bzs))
+            if bzerror == BZ_STREAM_END:
+                if self.bzs.avail_in != 0:
+                    unused = [self.bzs.next_in[i] for i in range(self.bzs.avail_in)]
+                    self.unused_data = "".join(unused)
+                self.running = False
+                break
+            if bzerror != BZ_OK:
+                _catch_bz2_error(self.space, bzerror)
+            
+            if self.bzs.avail_in == 0:
+                break
+            elif self.bzs.avail_out == 0:
+                total_out = _bzs_total_out(self.bzs)
+                data = "".join([out_buf[i] for i in range(total_out)])
+                temp.append(data)
+                
+                out_bufsize = _new_buffer_size(out_bufsize)
+                out_buf = create_string_buffer(out_bufsize)
+                self.bzs.next_out = out_buf
+                self.bzs.avail_out = out_bufsize
+                
+        if temp:
+            total_out = _bzs_total_out(self.bzs)
+            data = "".join([out_buf[i] for i in range(total_out - len(temp[0]))])
+            temp.append(data)
+            return self.space.wrap("".join(temp))
+
+        total_out = _bzs_total_out(self.bzs)
+        res = "".join([out_buf[i] for i in range(total_out)])
+        return self.space.wrap(res)
+    decompress.unwrap_spec = ['self', str]
+
 
 _BZ2Decomp.typedef = TypeDef("_BZ2Decomp",
     unused_data = interp_attrproperty("unused_data", _BZ2Decomp),
+    decompress = interp2app(_BZ2Decomp.decompress,
+        unwrap_spec=_BZ2Decomp.decompress.unwrap_spec),
 )
 
 

Modified: pypy/dist/pypy/module/bz2/test/test_bz2.py
==============================================================================
--- pypy/dist/pypy/module/bz2/test/test_bz2.py	(original)
+++ pypy/dist/pypy/module/bz2/test/test_bz2.py	Mon Aug  7 12:49:31 2006
@@ -674,6 +674,17 @@
         bz2d = BZ2Decompressor()
         assert bz2d.unused_data == ""
 
+    def test_decompress(self):
+        from bz2 import BZ2Decompressor
+        
+        DATA = 'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`'
+        TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
+        
+        bz2d = BZ2Decompressor()
+        raises(TypeError, bz2d.decompress)
+        decompressed_data = bz2d.decompress(DATA)
+        assert decompressed_data == TEXT
+
 # has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx", "riscos")
 # 
 # if has_cmdline_bunzip2:
@@ -693,16 +704,6 @@
 #     def decompress(self, data):
 #         return bz2.decompress(data)
 #
-# class BZ2DecompressorTest(BaseTest):
-#     def test_Constructor(self):
-#         self.assertRaises(TypeError, BZ2Decompressor, 42)
-# 
-#     def testDecompress(self):
-#         # "Test BZ2Decompressor.decompress()"
-#         bz2d = BZ2Decompressor()
-#         self.assertRaises(TypeError, bz2d.decompress)
-#         text = bz2d.decompress(self.DATA)
-#         self.assertEqual(text, self.TEXT)
 # 
 #     def testDecompressChunks10(self):
 #         # "Test BZ2Decompressor.decompress() with chunks of 10 bytes"



More information about the Pypy-commit mailing list