[pypy-svn] r31056 - in pypy/dist/pypy/module/bz2: . test
rhymes at codespeak.net
rhymes at codespeak.net
Sun Aug 6 01:07:13 CEST 2006
Author: rhymes
Date: Sun Aug 6 01:07:10 2006
New Revision: 31056
Modified:
pypy/dist/pypy/module/bz2/interp_bz2.py
pypy/dist/pypy/module/bz2/test/test_bz2.py
Log:
write() and s/StringIO/cStringIO in tests. decompress() utility function is not implemented for now
Modified: pypy/dist/pypy/module/bz2/interp_bz2.py
==============================================================================
--- pypy/dist/pypy/module/bz2/interp_bz2.py (original)
+++ pypy/dist/pypy/module/bz2/interp_bz2.py Sun Aug 6 01:07:10 2006
@@ -108,6 +108,8 @@
libbz2.BZ2_bzWriteClose.restype = c_void
libbz2.BZ2_bzRead.argtypes = [POINTER(c_int), POINTER(BZFILE), c_char_p, c_int]
libbz2.BZ2_bzRead.restype = c_int
+libbz2.BZ2_bzWrite.argtypes = [POINTER(c_int), POINTER(BZFILE), c_char_p, c_int]
+libbz2.BZ2_bzWrite.restype = c_void
libc.strerror.restype = c_char_p
libc.strerror.argtypes = [c_int]
@@ -319,11 +321,11 @@
self.f_bufend = c_char_p() # points after last occupied position
self.f_bufptr = c_char_p() # current buffer position
- self.f_softspace = 0 # flag used by print command
+ self.f_softspace = False # flag used by print command
self.f_univ_newline = False # handle any newline convention
self.f_newlinetypes = 0 # types of newlines seen
- self.f_skipnextlf = 0 # skip next \n
+ self.f_skipnextlf = False # skip next \n
self.mode = 0
self.pos = 0
@@ -639,6 +641,30 @@
return self.space.wrap(lines)
readlines.unwrap_spec = ['self', int]
+ def write(self, data):
+ """write(data) -> None
+
+ Write the 'data' string to file. Note that due to buffering, close() may
+ be needed before the file on disk reflects the data written."""
+
+ self._check_if_closed()
+
+ if not self.mode == MODE_WRITE:
+ raise OperationError(self.space.w_IOError,
+ self.space.wrap("file is not ready for writing"))
+
+ self.f_softspace = False
+
+ bzerror = c_int()
+ bufsize = len(data)
+ buf = c_char_p(data)
+ libbz2.BZ2_bzWrite(byref(bzerror), self.fp, buf, bufsize)
+ self.pos += bufsize
+
+ if bzerror.value != BZ_OK:
+ _catch_bz2_error(self.space, bzerror)
+ write.unwrap_spec = ['self', str]
+
# accessors for properties
def fget_newlines(space, self):
if self.f_newlinetypes == NEWLINE_UNKNOWN:
@@ -701,6 +727,7 @@
unwrap_spec=_BZ2File.get_iterator.unwrap_spec),
xreadlines = interp2app(_BZ2File.xreadlines,
unwrap_spec=_BZ2File.xreadlines.unwrap_spec),
+ write = interp2app(_BZ2File.write, unwrap_spec=_BZ2File.write.unwrap_spec),
newlines = get_newlines,
closed = get_closed,
name = interp_attrproperty("filename", _BZ2File),
Modified: pypy/dist/pypy/module/bz2/test/test_bz2.py
==============================================================================
--- pypy/dist/pypy/module/bz2/test/test_bz2.py (original)
+++ pypy/dist/pypy/module/bz2/test/test_bz2.py Sun Aug 6 01:07:10 2006
@@ -275,7 +275,7 @@
f.close()
from bz2 import BZ2File
- from StringIO import StringIO
+ from cStringIO import StringIO
create_temp_file()
TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
@@ -411,7 +411,7 @@
f.close()
from bz2 import BZ2File
- from StringIO import StringIO
+ from cStringIO import StringIO
create_temp_file()
TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
@@ -435,7 +435,7 @@
f.close()
from bz2 import BZ2File
- from StringIO import StringIO
+ from cStringIO import StringIO
create_temp_file()
TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
@@ -457,7 +457,7 @@
f.close()
from bz2 import BZ2File
- from StringIO import StringIO
+ from cStringIO import StringIO
create_temp_file()
TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
@@ -485,6 +485,41 @@
xlines = list(bz2f.xreadlines())
bz2f.close()
assert xlines == ['Test']
+
+ def test_write(self):
+ def decompress(data):
+ import popen2
+ import bz2
+ pop = popen2.Popen3("bunzip2", capturestderr=1)
+ pop.tochild.write(data)
+ pop.tochild.close()
+ res = pop.fromchild.read()
+ pop.fromchild.close()
+ if pop.wait() != 0:
+ res = bz2.decompress(data)
+ return res
+
+ from bz2 import BZ2File
+ TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
+
+ bz2f = BZ2File("foo", 'w')
+ raises(TypeError, bz2f.write)
+ bz2f.write(TEXT)
+ bz2f.close()
+
+ f = open("foo", "rb")
+ assert decompress(f.read()) == TEXT
+ f.close()
+
+# def decompress(self, data):
+# pop = popen2.Popen3("bunzip2", capturestderr=1)
+# pop.tochild.write(data)
+# pop.tochild.close()
+# ret = pop.fromchild.read()
+# pop.fromchild.close()
+# if pop.wait() != 0:
+# ret = bz2.decompress(data)
+# return ret
# #!/usr/bin/python
# from test import test_support
More information about the Pypy-commit
mailing list