[pypy-svn] r31106 - in pypy/dist/pypy/module/bz2: . test

rhymes at codespeak.net rhymes at codespeak.net
Mon Aug 7 15:23:08 CEST 2006


Author: rhymes
Date: Mon Aug  7 15:23:04 2006
New Revision: 31106

Modified:
   pypy/dist/pypy/module/bz2/__init__.py
   pypy/dist/pypy/module/bz2/interp_bz2.py
   pypy/dist/pypy/module/bz2/test/test_bz2.py
Log:
one-shot module level compress() function implemented.

Modified: pypy/dist/pypy/module/bz2/__init__.py
==============================================================================
--- pypy/dist/pypy/module/bz2/__init__.py	(original)
+++ pypy/dist/pypy/module/bz2/__init__.py	Mon Aug  7 15:23:04 2006
@@ -5,6 +5,7 @@
         'BZ2File': 'interp_bz2.BZ2File',
         'BZ2Compressor': 'interp_bz2.BZ2Compressor',
         'BZ2Decompressor': 'interp_bz2.BZ2Decompressor',
+        'compress': 'interp_bz2.compress',
     }
 
     appleveldefs = {

Modified: pypy/dist/pypy/module/bz2/interp_bz2.py
==============================================================================
--- pypy/dist/pypy/module/bz2/interp_bz2.py	(original)
+++ pypy/dist/pypy/module/bz2/interp_bz2.py	Mon Aug  7 15:23:04 2006
@@ -992,6 +992,69 @@
 )
 
 
+def compress(space, data, compresslevel=9):
+    """compress(data [, compresslevel=9]) -> string
+
+    Compress data in one shot. If you want to compress data sequentially,
+    use an instance of BZ2Compressor instead. The compresslevel parameter, if
+    given, must be a number between 1 and 9."""
+    
+    if compresslevel < 1 or compresslevel > 9:
+        raise OperationError(self.space.w_ValueError,
+            self.space.wrap("compresslevel must be between 1 and 9"))
+            
+    bzs = bz_stream()
+    
+    in_bufsize = len(data)
+    # conforming to bz2 manual, this is large enough to fit compressed
+	# data in one shot. We will check it later anyway.
+    out_bufsize = in_bufsize + (in_bufsize / 100 + 1) + 600
+    
+    out_buf = create_string_buffer(out_bufsize)        
+    in_buf = create_string_buffer(in_bufsize)
+    in_buf.value = data
+    
+    self.bzs.next_in = in_buf
+    self.bzs.avail_in = in_bufsize
+    self.bzs.next_out = out_buf
+    self.bzs.avail_out = out_bufsize
+
+    bzerror = libbz2.BZ2_bzCompressInit(byref(self.bzs), compresslevel, 0, 0)
+    if bzerror != BZ_OK:
+        _catch_bz2_error(self.space, bzerror)
+    
+    while True:
+        bzerror = libbz2.BZ2_bzCompress(byref(self.bzs), BZ_FINISH)
+        if bzerror == BZ_STREAM_END:
+            break
+        elif bzerror != BZ_FINISH_OK:
+            libbz2.BZ2_bzCompressEnd(byref(self.bzs))
+            _catch_bz2_error(self.space, bzerror)
+            
+        if self.bzs.avail_out == 0:
+            data = "".join([out_buf[i] for i in range(_bzs_total_out(self.bzs))])
+            temp.append(data)
+            
+            out_bufsize = _new_buffer_size(out_bufsize)
+            out_buf = create_string_buffer(out_bufsize)
+            self.bzs.next_out = out_buf
+            self.bzs.avail_out = out_bufsize
+    
+    if temp:
+        res = "".join(temp)
+        
+    if self.bzs.avail_out:
+        size = _bzs_total_out(self.bzs) - total_out
+        res = "".join([out_buf[i] for i in range(size)])
+    else:
+        total_out = _bzs_total_out(self.bzs)
+        res = "".join([out_buf[i] for i in range(total_out)])
+    
+    libbz2.BZ2_bzCompressEnd(byref(self.bzs))
+    return self.space.wrap(res)
+compress.unwrap_spec = [ObjSpace, str, int]
+
+
 def BZ2Compressor(space, compresslevel=9):
     """BZ2Compressor([compresslevel=9]) -> compressor object
 

Modified: pypy/dist/pypy/module/bz2/test/test_bz2.py
==============================================================================
--- pypy/dist/pypy/module/bz2/test/test_bz2.py	(original)
+++ pypy/dist/pypy/module/bz2/test/test_bz2.py	Mon Aug  7 15:23:04 2006
@@ -724,6 +724,30 @@
         bz2d = BZ2Decompressor()
         bz2d.decompress(DATA)
         raises(EOFError, bz2d.decompress, "foo")
+        
+def test_compress_function():
+    def decompress(data):
+        import popen2
+        import bz2
+        pop = popen2.Popen3("bunzip2", capturestderr=1)
+        pop.tochild.write(data)
+        pop.tochild.close()
+        res = pop.fromchild.read()
+        pop.fromchild.close()
+        if pop.wait() != 0:
+            res = bz2.decompress(data)
+        return res
+
+    from bz2 import compress
+    
+    TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
+    
+    raises(TypeError, compress, 123)
+    raises(ValueError, compress, "foo", 10)
+    raises(TypeError, compress, "foo", "foo")
+    
+    data = compress(TEXT)
+    assert decompress(data) == TEXT
 
 # has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx", "riscos")
 # 
@@ -744,14 +768,6 @@
 #     def decompress(self, data):
 #         return bz2.decompress(data)
 #
-# class FuncTest(BaseTest):
-#     "Test module functions"
-# 
-#     def testCompress(self):
-#         # "Test compress() function"
-#         data = bz2.compress(self.TEXT)
-#         self.assertEqual(self.decompress(data), self.TEXT)
-# 
 #     def testDecompress(self):
 #         # "Test decompress() function"
 #         text = bz2.decompress(self.DATA)



More information about the Pypy-commit mailing list