[pypy-svn] r31077 - in pypy/dist/pypy/module/bz2: . test

rhymes at codespeak.net rhymes at codespeak.net
Sun Aug 6 17:02:40 CEST 2006


Author: rhymes
Date: Sun Aug  6 17:02:36 2006
New Revision: 31077

Modified:
   pypy/dist/pypy/module/bz2/bzlib.py
   pypy/dist/pypy/module/bz2/interp_bz2.py
   pypy/dist/pypy/module/bz2/test/test_bz2.py
Log:
compress() and flush().

Modified: pypy/dist/pypy/module/bz2/bzlib.py
==============================================================================
--- pypy/dist/pypy/module/bz2/bzlib.py	(original)
+++ pypy/dist/pypy/module/bz2/bzlib.py	Sun Aug  6 17:02:36 2006
@@ -8,11 +8,11 @@
 class bz_stream(Structure):
     pass
 bz_stream._fields_ = [
-    ('next_in', STRING),
+    ('next_in', POINTER(c_char)),
     ('avail_in', c_uint),
     ('total_in_lo32', c_uint),
     ('total_in_hi32', c_uint),
-    ('next_out', STRING),
+    ('next_out', POINTER(c_char)),
     ('avail_out', c_uint),
     ('total_out_lo32', c_uint),
     ('total_out_hi32', c_uint),

Modified: pypy/dist/pypy/module/bz2/interp_bz2.py
==============================================================================
--- pypy/dist/pypy/module/bz2/interp_bz2.py	(original)
+++ pypy/dist/pypy/module/bz2/interp_bz2.py	Sun Aug  6 17:02:36 2006
@@ -86,6 +86,17 @@
     
 MAXINT = sys.maxint
 
+if BZ_CONFIG_ERROR:
+    if sizeof(c_long) >= 8 or sizeof(c_longlong) >= 8:
+        def _bzs_total_out(bzs):
+            return long(bzs.total_out_hi32 << 32) + bzs.total_out_lo32
+    else:
+        def _bzs_total_out(bzs):
+            return bzs.total_out_lo32
+else:
+    def _bzs_total_out(bzs):
+        return bzs.total_out
+
 pythonapi.PyFile_FromString.argtypes = [c_char_p, c_char_p]
 pythonapi.PyFile_FromString.restype = POINTER(PyFileObject)
 pythonapi.PyFile_SetBufSize.argtypes = [POINTER(PyFileObject), c_int]
@@ -764,7 +775,7 @@
         if compresslevel < 1 or compresslevel > 9:
             raise OperationError(self.space.w_ValueError,
                 self.space.wrap("compresslevel must be between 1 and 9"))
-        
+                
         bzerror = libbz2.BZ2_bzCompressInit(byref(self.bzs), compresslevel, 0, 0)
         if bzerror != BZ_OK:
             _catch_bz2_error(self.space, bzerror)
@@ -773,9 +784,79 @@
         
     def __del__(self):
         libbz2.BZ2_bzCompressEnd(byref(self.bzs))
+    
+    def compress(self, data):
+        """compress(data) -> string
 
+        Provide more data to the compressor object. It will return chunks of
+        compressed data whenever possible. When you've finished providing data
+        to compress, call the flush() method to finish the compression process,
+        and return what is left in the internal buffers."""
+        
+        datasize = len(data)
+        
+        if datasize == 0:
+            return self.space.wrap("")
+        
+        if not self.running:
+            raise OperationError(self.space.w_ValueError,
+                self.space.wrap("this object was already flushed"))
+        
+        out_bufsize = SMALLCHUNK
+        out_buf = create_string_buffer(out_bufsize)
+        
+        in_bufsize = datasize
+        in_buf = create_string_buffer(in_bufsize)
+        in_buf.value = data
+        
+        self.bzs.next_in = in_buf
+        self.bzs.avail_in = in_bufsize
+        self.bzs.next_out = out_buf
+        self.bzs.avail_out = out_bufsize
+        
+        while True:
+            bzerror = libbz2.BZ2_bzCompress(byref(self.bzs), BZ_RUN)
+            if bzerror != BZ_OK:
+                _catch_bz2_error(self.space, bzerror)
 
-_BZ2Comp.typedef = TypeDef("_BZ2File")
+            if self.bzs.avail_in == 0:
+                break
+
+        total_out = _bzs_total_out(self.bzs)
+        res = "".join([out_buf[i] for i in range(total_out)])
+        return self.space.wrap(res)
+    compress.unwrap_spec = ['self', str]
+    
+    def flush(self):
+        if not self.running:
+            raise OperationError(self.space.w_ValueError,
+                self.space.wrap("this object was already flushed"))
+        self.running = False
+        
+        out_bufsize = SMALLCHUNK
+        out_buf = create_string_buffer(out_bufsize)
+
+        self.bzs.next_out = out_buf
+        self.bzs.avail_out = out_bufsize
+        
+        while True:
+            bzerror = libbz2.BZ2_bzCompress(byref(self.bzs), BZ_FINISH)
+            if bzerror == BZ_STREAM_END:
+                break
+            elif bzerror != BZ_FINISH_OK:
+                _catch_bz2_error(self.space, bzerror)
+
+        total_out = _bzs_total_out(self.bzs)
+        res = "".join([out_buf[i] for i in range(total_out)])
+        return self.space.wrap(res)
+    flush.unwrap_spec = ['self']
+
+
+_BZ2Comp.typedef = TypeDef("_BZ2Comp",
+    compress = interp2app(_BZ2Comp.compress,
+        unwrap_spec=_BZ2Comp.compress.unwrap_spec),
+    flush = interp2app(_BZ2Comp.flush, unwrap_spec=_BZ2Comp.flush.unwrap_spec),
+)
 
 def BZ2Compressor(space, compresslevel=9):
     """BZ2Compressor([compresslevel=9]) -> compressor object

Modified: pypy/dist/pypy/module/bz2/test/test_bz2.py
==============================================================================
--- pypy/dist/pypy/module/bz2/test/test_bz2.py	(original)
+++ pypy/dist/pypy/module/bz2/test/test_bz2.py	Sun Aug  6 17:02:36 2006
@@ -582,7 +582,29 @@
         
         BZ2Compressor(1)
         BZ2Compressor(9)
+        
+    def test_compress(self):
+        def decompress(data):
+            import popen2
+            import bz2
+            pop = popen2.Popen3("bunzip2", capturestderr=1)
+            pop.tochild.write(data)
+            pop.tochild.close()
+            res = pop.fromchild.read()
+            pop.fromchild.close()
+            if pop.wait() != 0:
+                res = bz2.decompress(data)
+            return res
 
+        from bz2 import BZ2Compressor            
+        TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
+        
+        bz2c = BZ2Compressor()
+        raises(TypeError, bz2c.compress)
+        data = bz2c.compress(TEXT)
+        data = "%s%s" % (data, bz2c.flush())
+        assert decompress(data) == TEXT
+        
 # has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx", "riscos")
 # 
 # if has_cmdline_bunzip2:
@@ -603,14 +625,6 @@
 #         return bz2.decompress(data)
 #
 # class BZ2CompressorTest(BaseTest):
-#     def testCompress(self):
-#         # "Test BZ2Compressor.compress()/flush()"
-#         bz2c = BZ2Compressor()
-#         self.assertRaises(TypeError, bz2c.compress)
-#         data = bz2c.compress(self.TEXT)
-#         data += bz2c.flush()
-#         self.assertEqual(self.decompress(data), self.TEXT)
-# 
 #     def testCompressChunks10(self):
 #         # "Test BZ2Compressor.compress()/flush() with chunks of 10 bytes"
 #         bz2c = BZ2Compressor()



More information about the Pypy-commit mailing list