zlib and zip files

Jan Prochazka jena.forum at centrum.cz
Fri Apr 14 11:14:55 EDT 2006


Michael Ekstrand napsal(a):

> Jan Prochazka wrote:
>
>> Hi,
>> I need to decompress zip archive. I wrote a parser of zip file, i obtain
>> the compressed data, but when i call zlib.decompress(data) on them,
>> it throws this error:
>>
>> decbuf = decompressor.decompress(compressed_data)
>>
>> error: Error -3 while decompressing: unknown compression method
>>
>> I try to compress by zlib the same data which are in that file, the
>> result was 6 bytes bigger, '\x78\xda' on begin and '\xc9\x1f\x87\x0b' 
>> on end
>> when i tried to put this extending data to zlib decompressor, it
>> decompress some files of zip archive, but it fails, when i try to
>> decompress 100MB file from archive
>>
>> could you help, please?
>
>
> The zlib module is for reading and writing the gzip compression 
> format, used by the gzip program; it is not the same as a zip archive 
> a la PKZip.  The zipfile module will let you read and write zip archives.
>
> - Michael
>
Yes, zipfile can read zip format, but i need to read nad write big files 
to zip archive, and zipfile.ZipFile object can only write or read 
strings stored in memory. (i need the ZipFile.read() method to return 
file-like object, not string). also any variant of write method don't 
accpet file-like object
The gzip compressor and decompressor can work on the fly , but the 
format that it produces is a bit other than the format of compressed 
data zipfile (but it differs only in 6 bytes described, so i think, it 
shoud by possible to create and parse zipfile only using zlib module).

Here is my module for parsing zip files:

import struct, zlib

class ZipHeaderEntry:
    name = ''
    offset = 0
    uncomlen = 0
    comlen = 0
   
class ZipStream:
    entries = []
    fd = None # file like object
   
    def __init__(self, fd):
        self.fd = fd
        self.entries = []

class ZipWriteStream(ZipStream):
    pass

class ZipReadStream(ZipStream):
    cbytesleft = 0
    ubytesleft = 0
    dec = None # decompress object
    decbuf = None
    writed_footer = False
   
    def __init__(self, fd):
        ZipStream.__init__(self, fd)
        self.read_directory()

    def open_entry(self, entry):
        self.fd.seek(entry.offset)   
        self.dec = zlib.decompressobj()
        self.ubytesleft = entry.uncomlen
        self.cbytesleft = entry.comlen
        print 'ubytes=', self.ubytesleft, 'cbytes=', self.cbytesleft
        self.read_header()
        self.dec.decompress('\x78\xda', 0) # patch bytes on the begin of 
compressed buffer
        self.decbuf = ''
        self.writed_footer = False

    def decompress_next(self):
        assert len(self.decbuf) == 0
        rbytes = 0x1000
        if rbytes > self.cbytesleft: rbytes = self.cbytesleft
        udata = self.fd.read(rbytes)
        self.cbytesleft -= rbytes
        self.decbuf = self.dec.decompress(udata)
        if self.cbytesleft == 0 and not self.writed_footer:
            self.decbuf += self.dec.decompress('\xc9\x1f\x87\x0b') # 
patch bytes on the end of compressed buffer
            self.writed_footer = True

    def read(self, bytes = None):
        if bytes is None: bytes = self.ubytesleft
        if bytes > self.ubytesleft: bytes = self.ubytesleft
        res = ''
        while bytes > 0:
            s = self.decbuf[:bytes]
            self.decbuf = self.decbuf[bytes:]
            self.ubytesleft -= len(s)
            res += s
            bytes -= len(s)
            if bytes > 0:
                self.decompress_next()
        return res

    def open_file(self, filename):
        for entry in self.entries:
            if entry.name.upper() == filename.upper():
                return self.open_entry(entry)
        raise Exception('File not found in archive: %s' % filename)
               

    def read_header(self):
        hdr = self.fd.read(0x1E)
        hdrvalues = struct.unpack('=ccccHHHHHLLLHH', hdr)
        sigp, sigk, sig3, sig4, ver, flag, method, tm, dt, crc, uncsize, 
comsize, fnlen, extlen = hdrvalues
        assert sigp == 'P' and sigk == 'K' and sig3 == '\x03' and sig4 
== '\x04'
        name = self.fd.read(fnlen)
        extra = self.fd.read(extlen)
        print name

    def read_directory(self):
        self.fd.seek(0, 2)
        size = self.fd.tell()
        dpos = 0x1000
        if dpos > size: dpos = size
        self.fd.seek(-dpos, 1)
        enddata = self.fd.read()
        index = enddata.find('PK\x05\x06')
        assert index >= 0
        enddata = enddata[index: index + 0x16]
        sig, ndisk, ndiskc, entriesdisk, entries, dirsize, dirofs, 
comlen = struct.unpack('=LHHHHLLH', enddata)
       
        self.fd.seek(dirofs)

        for i in xrange(entries):
            cdirdata = self.fd.read(0x2E)
            hdrvalues = struct.unpack('=LBBBBHHHHLLLHHHHHLL', cdirdata)
            (sig, vermade, hosts, verex, osver, flag, method, dt, tm, crc,
            csize, uncsize, fnlen, extlen, comlen, disknum, fileattr, 
extattr, fileofs) = hdrvalues
            name = self.fd.read(fnlen)
            extra = self.fd.read(extlen)
            comment = self.fd.read(comlen)
            entry = ZipHeaderEntry()
            entry.name = name
            entry.offset = fileofs
            entry.uncomlen = uncsize
            entry.comlen = csize
            self.entries.append(entry)




More information about the Python-list mailing list