[pypy-svn] r31056 - in pypy/dist/pypy/module/bz2: . test

rhymes at codespeak.net rhymes at codespeak.net
Sun Aug 6 01:07:13 CEST 2006


Author: rhymes
Date: Sun Aug  6 01:07:10 2006
New Revision: 31056

Modified:
   pypy/dist/pypy/module/bz2/interp_bz2.py
   pypy/dist/pypy/module/bz2/test/test_bz2.py
Log:
write() and s/StringIO/cStringIO in tests. decompress() utility function is not implemented for now

Modified: pypy/dist/pypy/module/bz2/interp_bz2.py
==============================================================================
--- pypy/dist/pypy/module/bz2/interp_bz2.py	(original)
+++ pypy/dist/pypy/module/bz2/interp_bz2.py	Sun Aug  6 01:07:10 2006
@@ -108,6 +108,8 @@
 libbz2.BZ2_bzWriteClose.restype = c_void
 libbz2.BZ2_bzRead.argtypes = [POINTER(c_int), POINTER(BZFILE), c_char_p, c_int]
 libbz2.BZ2_bzRead.restype = c_int
+libbz2.BZ2_bzWrite.argtypes = [POINTER(c_int), POINTER(BZFILE), c_char_p, c_int]
+libbz2.BZ2_bzWrite.restype = c_void
 
 libc.strerror.restype = c_char_p
 libc.strerror.argtypes = [c_int]
@@ -319,11 +321,11 @@
         self.f_bufend = c_char_p() # points after last occupied position
         self.f_bufptr = c_char_p() # current buffer position
         
-        self.f_softspace = 0 # flag used by print command
+        self.f_softspace = False # flag used by print command
         
         self.f_univ_newline = False # handle any newline convention
         self.f_newlinetypes = 0 # types of newlines seen
-        self.f_skipnextlf = 0 # skip next \n
+        self.f_skipnextlf = False # skip next \n
         
         self.mode = 0
         self.pos = 0
@@ -639,6 +641,30 @@
         return self.space.wrap(lines)
     readlines.unwrap_spec = ['self', int]
     
+    def write(self, data):
+        """write(data) -> None
+
+        Write the 'data' string to file. Note that due to buffering, close() may
+        be needed before the file on disk reflects the data written."""
+        
+        self._check_if_closed()
+        
+        if not self.mode == MODE_WRITE:
+            raise OperationError(self.space.w_IOError,
+                self.space.wrap("file is not ready for writing"))
+        
+        self.f_softspace = False
+        
+        bzerror = c_int()
+        bufsize = len(data)
+        buf = c_char_p(data)
+        libbz2.BZ2_bzWrite(byref(bzerror), self.fp, buf, bufsize)
+        self.pos += bufsize
+        
+        if bzerror.value != BZ_OK:
+            _catch_bz2_error(self.space, bzerror)
+    write.unwrap_spec = ['self', str]
+    
     # accessors for properties
     def fget_newlines(space, self):
         if self.f_newlinetypes == NEWLINE_UNKNOWN:
@@ -701,6 +727,7 @@
         unwrap_spec=_BZ2File.get_iterator.unwrap_spec),
     xreadlines = interp2app(_BZ2File.xreadlines,
         unwrap_spec=_BZ2File.xreadlines.unwrap_spec),
+    write = interp2app(_BZ2File.write, unwrap_spec=_BZ2File.write.unwrap_spec),
     newlines = get_newlines,
     closed = get_closed,
     name = interp_attrproperty("filename", _BZ2File),

Modified: pypy/dist/pypy/module/bz2/test/test_bz2.py
==============================================================================
--- pypy/dist/pypy/module/bz2/test/test_bz2.py	(original)
+++ pypy/dist/pypy/module/bz2/test/test_bz2.py	Sun Aug  6 01:07:10 2006
@@ -275,7 +275,7 @@
             f.close()
 
         from bz2 import BZ2File
-        from StringIO import StringIO
+        from cStringIO import StringIO
         create_temp_file()
         
         TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
@@ -411,7 +411,7 @@
             f.close()
 
         from bz2 import BZ2File
-        from StringIO import StringIO
+        from cStringIO import StringIO
         create_temp_file()
         
         TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
@@ -435,7 +435,7 @@
             f.close()
 
         from bz2 import BZ2File
-        from StringIO import StringIO
+        from cStringIO import StringIO
         create_temp_file()
         
         TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
@@ -457,7 +457,7 @@
             f.close()
 
         from bz2 import BZ2File
-        from StringIO import StringIO
+        from cStringIO import StringIO
         create_temp_file()
         
         TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
@@ -485,6 +485,41 @@
         xlines = list(bz2f.xreadlines())
         bz2f.close()
         assert xlines == ['Test']
+    
+    def test_write(self):
+        def decompress(data):
+            import popen2
+            import bz2
+            pop = popen2.Popen3("bunzip2", capturestderr=1)
+            pop.tochild.write(data)
+            pop.tochild.close()
+            res = pop.fromchild.read()
+            pop.fromchild.close()
+            if pop.wait() != 0:
+                res = bz2.decompress(data)
+            return res
+            
+        from bz2 import BZ2File
+        TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
+
+        bz2f = BZ2File("foo", 'w')
+        raises(TypeError, bz2f.write)
+        bz2f.write(TEXT)
+        bz2f.close()
+        
+        f = open("foo", "rb")
+        assert decompress(f.read()) == TEXT
+        f.close()
+
+#         def decompress(self, data):
+#             pop = popen2.Popen3("bunzip2", capturestderr=1)
+#             pop.tochild.write(data)
+#             pop.tochild.close()
+#             ret = pop.fromchild.read()
+#             pop.fromchild.close()
+#             if pop.wait() != 0:
+#                 ret = bz2.decompress(data)
+#             return ret
 
 # #!/usr/bin/python
 # from test import test_support



More information about the Pypy-commit mailing list