[Python-checkins] r54152 - in python/trunk: Doc/lib/libzipfile.tex Lib/test/test_zipfile.py Lib/zipfile.py Misc/NEWS

martin.v.loewis python-checkins at python.org
Tue Mar 6 11:41:26 CET 2007


Author: martin.v.loewis
Date: Tue Mar  6 11:41:24 2007
New Revision: 54152

Modified:
   python/trunk/Doc/lib/libzipfile.tex
   python/trunk/Lib/test/test_zipfile.py
   python/trunk/Lib/zipfile.py
   python/trunk/Misc/NEWS
Log:
Patch #1121142: Implement ZipFile.open.


Modified: python/trunk/Doc/lib/libzipfile.tex
==============================================================================
--- python/trunk/Doc/lib/libzipfile.tex	(original)
+++ python/trunk/Doc/lib/libzipfile.tex	Tue Mar  6 11:41:24 2007
@@ -141,6 +141,32 @@
   Return a list of archive members by name.
 \end{methoddesc}
 
+\begin{methoddesc}{open}{name\optional{, mode\optional{, pwd}}}
+    Extract a member from the archive as a file-like object (ZipExtFile).
+    \var{name} is the name of the file in the archive. The \var{mode}
+    parameter, if included, must be one of the following: \code{'r'} (the 
+    default), \code{'U'}, or \code{'rU'}. Choosing \code{'U'} or 
+    \code{'rU'} will enable universal newline support in the read-only
+    object. \var{pwd} is the password used for encrypted files.
+    \begin{notice}
+        The file-like object is read-only and provides the following methods:
+        \method{read()}, \method{readline()}, \method{readlines()},
+        \method{__iter__()}, \method{next()}. 
+    \end{notice}
+    \begin{notice}
+        If the ZipFile was created by passing in a file-like object as the 
+        first argument to the constructor, then the object returned by
+        \method{open()} shares the ZipFile's file pointer.  Under these 
+        circumstances, the object returned by \method{open()} should not 
+        be used after any additional operations are performed on the 
+        ZipFile object.  If the ZipFile was created by passing in a string
+        (the filename) as the first argument to the constructor, then 
+        \method{open()} will create a new file object that will be held
+        by the ZipExtFile, allowing it to operate independently of the 
+        ZipFile.
+    \end{notice}
+\end{methoddesc}
+
 \begin{methoddesc}{printdir}{}
   Print a table of contents for the archive to \code{sys.stdout}.
 \end{methoddesc}

Modified: python/trunk/Lib/test/test_zipfile.py
==============================================================================
--- python/trunk/Lib/test/test_zipfile.py	(original)
+++ python/trunk/Lib/test/test_zipfile.py	Tue Mar  6 11:41:24 2007
@@ -4,26 +4,29 @@
 except ImportError:
     zlib = None
 
-import zipfile, os, unittest, sys, shutil
+import zipfile, os, unittest, sys, shutil, struct
 
 from StringIO import StringIO
 from tempfile import TemporaryFile
+from random import randint, random
 
 from test.test_support import TESTFN, run_unittest
 
 TESTFN2 = TESTFN + "2"
+FIXEDTEST_SIZE = 10
 
 class TestsWithSourceFile(unittest.TestCase):
     def setUp(self):
-        line_gen = ("Test of zipfile line %d." % i for i in range(0, 1000))
-        self.data = '\n'.join(line_gen)
+        self.line_gen = ("Zipfile test line %d. random float: %f" % (i, random())
+                          for i in xrange(FIXEDTEST_SIZE))
+        self.data = '\n'.join(self.line_gen) + '\n'
 
         # Make a source file with some lines
         fp = open(TESTFN, "wb")
         fp.write(self.data)
         fp.close()
 
-    def zipTest(self, f, compression):
+    def makeTestArchive(self, f, compression):
         # Create the ZIP archive
         zipfp = zipfile.ZipFile(f, "w", compression)
         zipfp.write(TESTFN, "another"+os.extsep+"name")
@@ -31,6 +34,9 @@
         zipfp.writestr("strfile", self.data)
         zipfp.close()
 
+    def zipTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
         # Read the ZIP archive
         zipfp = zipfile.ZipFile(f, "r", compression)
         self.assertEqual(zipfp.read(TESTFN), self.data)
@@ -85,22 +91,144 @@
 
         # Check that testzip doesn't raise an exception
         zipfp.testzip()
+        zipfp.close()
 
+    def testStored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipTest(f, zipfile.ZIP_STORED)
 
+    def zipOpenTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r", compression)
+        zipdata1 = []
+        zipopen1 = zipfp.open(TESTFN)
+        while 1:
+            read_data = zipopen1.read(256)
+            if not read_data:
+                break
+            zipdata1.append(read_data)
+
+        zipdata2 = []
+        zipopen2 = zipfp.open("another"+os.extsep+"name")
+        while 1:
+            read_data = zipopen2.read(256)
+            if not read_data:
+                break
+            zipdata2.append(read_data)
+                    
+        self.assertEqual(''.join(zipdata1), self.data)
+        self.assertEqual(''.join(zipdata2), self.data)
         zipfp.close()
+            
+    def testOpenStored(self):            
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipOpenTest(f, zipfile.ZIP_STORED)
 
+    def zipRandomOpenTest(self, f, compression):
+        self.makeTestArchive(f, compression)
 
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r", compression)
+        zipdata1 = []
+        zipopen1 = zipfp.open(TESTFN)
+        while 1:
+            read_data = zipopen1.read(randint(1, 1024))
+            if not read_data:
+                break
+            zipdata1.append(read_data)
 
+        self.assertEqual(''.join(zipdata1), self.data)
+        zipfp.close()
+    
+    def testRandomOpenStored(self):            
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
+            
+    def zipReadlineTest(self, f, compression):
+        self.makeTestArchive(f, compression)
 
-    def testStored(self):
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r")
+        zipopen = zipfp.open(TESTFN)
+        for line in self.line_gen:
+            linedata = zipopen.readline()
+            self.assertEqual(linedata, line + '\n')
+
+        zipfp.close()
+
+    def zipReadlinesTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r")
+        ziplines = zipfp.open(TESTFN).readlines()
+        for line, zipline in zip(self.line_gen, ziplines):
+            self.assertEqual(zipline, line + '\n')
+
+        zipfp.close()
+
+    def zipIterlinesTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r")
+        for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
+            self.assertEqual(zipline, line + '\n')
+
+        zipfp.close()
+    
+    def testReadlineStored(self):            
         for f in (TESTFN2, TemporaryFile(), StringIO()):
-            self.zipTest(f, zipfile.ZIP_STORED)
+            self.zipReadlineTest(f, zipfile.ZIP_STORED)
+
+    def testReadlinesStored(self):            
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipReadlinesTest(f, zipfile.ZIP_STORED)
+
+    def testIterlinesStored(self):            
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipIterlinesTest(f, zipfile.ZIP_STORED)
 
     if zlib:
         def testDeflated(self):
             for f in (TESTFN2, TemporaryFile(), StringIO()):
                 self.zipTest(f, zipfile.ZIP_DEFLATED)
 
+        def testOpenDeflated(self):
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipOpenTest(f, zipfile.ZIP_DEFLATED)
+
+        def testRandomOpenDeflated(self):
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED)
+
+        def testReadlineDeflated(self):            
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipReadlineTest(f, zipfile.ZIP_DEFLATED)
+
+        def testReadlinesDeflated(self):            
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED)
+
+        def testIterlinesDeflated(self):            
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED)
+                
+        def testLowCompression(self):
+            # Checks for cases where compressed data is larger than original
+            # Create the ZIP archive
+            zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED)
+            zipfp.writestr("strfile", '12')
+            zipfp.close()
+
+            # Get an open object for strfile
+            zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED)
+            openobj = zipfp.open("strfile")
+            self.assertEqual(openobj.read(1), '1')
+            self.assertEqual(openobj.read(1), '2')
+
     def testAbsoluteArcnames(self):
         zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
         zipfp.write(TESTFN, "/absolute")
@@ -110,7 +238,6 @@
         self.assertEqual(zipfp.namelist(), ["absolute"])
         zipfp.close()
 
-
     def tearDown(self):
         os.remove(TESTFN)
         os.remove(TESTFN2)
@@ -123,7 +250,7 @@
         self._limit = zipfile.ZIP64_LIMIT
         zipfile.ZIP64_LIMIT = 5
 
-        line_gen = ("Test of zipfile line %d." % i for i in range(0, 1000))
+        line_gen = ("Test of zipfile line %d." % i for i in range(0, FIXEDTEST_SIZE))
         self.data = '\n'.join(line_gen)
 
         # Make a source file with some lines
@@ -344,6 +471,26 @@
         except zipfile.BadZipfile:
             os.unlink(TESTFN)
 
+    def testIsZipErroneousFile(self):
+        # This test checks that the is_zipfile function correctly identifies 
+        # a file that is not a zip file
+        fp = open(TESTFN, "w")
+        fp.write("this is not a legal zip file\n")
+        fp.close()
+        chk = zipfile.is_zipfile(TESTFN) 
+        os.unlink(TESTFN)
+        self.assert_(chk is False)       
+
+    def testIsZipValidFile(self):
+        # This test checks that the is_zipfile function correctly identifies 
+        # a file that is a zip file
+        zipf = zipfile.ZipFile(TESTFN, mode="w")
+        zipf.writestr("foo.txt", "O, for a Muse of Fire!")
+        zipf.close()
+        chk = zipfile.is_zipfile(TESTFN) 
+        os.unlink(TESTFN)
+        self.assert_(chk is True)       
+
     def testNonExistentFileRaisesIOError(self):
         # make sure we don't raise an AttributeError when a partially-constructed
         # ZipFile instance is finalized; this tests for regression on SF tracker
@@ -371,7 +518,6 @@
         # and report that the first file in the archive was corrupt.
         self.assertRaises(RuntimeError, zipf.testzip)
 
-
 class DecryptionTests(unittest.TestCase):
     # This test checks that ZIP decryption works. Since the library does not
     # support encryption at the moment, we use a pre-generated encrypted
@@ -411,9 +557,255 @@
         self.zip.setpassword("python")
         self.assertEquals(self.zip.read("test.txt"), self.plain)
 
+
+class TestsWithRandomBinaryFiles(unittest.TestCase):
+    def setUp(self):
+        datacount = randint(16, 64)*1024 + randint(1, 1024)
+        self.data = ''.join((struct.pack('<f', random()*randint(-1000, 1000)) for i in xrange(datacount)))
+
+        # Make a source file with some lines
+        fp = open(TESTFN, "wb")
+        fp.write(self.data)
+        fp.close()
+
+    def makeTestArchive(self, f, compression):
+        # Create the ZIP archive
+        zipfp = zipfile.ZipFile(f, "w", compression)
+        zipfp.write(TESTFN, "another"+os.extsep+"name")
+        zipfp.write(TESTFN, TESTFN)
+        zipfp.close()
+
+    def zipTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r", compression)
+        testdata = zipfp.read(TESTFN)
+        self.assertEqual(len(testdata), len(self.data))
+        self.assertEqual(testdata, self.data)
+        self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data)
+        zipfp.close()
+
+    def testStored(self):
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipTest(f, zipfile.ZIP_STORED)
+            
+    def zipOpenTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r", compression)
+        zipdata1 = []
+        zipopen1 = zipfp.open(TESTFN)
+        while 1:
+            read_data = zipopen1.read(256)
+            if not read_data:
+                break
+            zipdata1.append(read_data)
+
+        zipdata2 = []
+        zipopen2 = zipfp.open("another"+os.extsep+"name")
+        while 1:
+            read_data = zipopen2.read(256)
+            if not read_data:
+                break
+            zipdata2.append(read_data)
+                    
+        testdata1 = ''.join(zipdata1)               
+        self.assertEqual(len(testdata1), len(self.data))
+        self.assertEqual(testdata1, self.data)
+
+        testdata2 = ''.join(zipdata2)               
+        self.assertEqual(len(testdata1), len(self.data))
+        self.assertEqual(testdata1, self.data)
+        zipfp.close()
+            
+    def testOpenStored(self):            
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipOpenTest(f, zipfile.ZIP_STORED)
+
+    def zipRandomOpenTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r", compression)
+        zipdata1 = []
+        zipopen1 = zipfp.open(TESTFN)
+        while 1:
+            read_data = zipopen1.read(randint(1, 1024))
+            if not read_data:
+                break
+            zipdata1.append(read_data)
+
+        testdata = ''.join(zipdata1)
+        self.assertEqual(len(testdata), len(self.data))
+        self.assertEqual(testdata, self.data)
+        zipfp.close()
+    
+    def testRandomOpenStored(self):            
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
+
+class TestsWithMultipleOpens(unittest.TestCase):
+    def setUp(self):
+        # Create the ZIP archive
+        zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED)
+        zipfp.writestr('ones', '1'*FIXEDTEST_SIZE)
+        zipfp.writestr('twos', '2'*FIXEDTEST_SIZE)
+        zipfp.close()
+                
+    def testSameFile(self):
+        # Verify that (when the ZipFile is in control of creating file objects)
+        # multiple open() calls can be made without interfering with each other.
+        zipf = zipfile.ZipFile(TESTFN2, mode="r")
+        zopen1 = zipf.open('ones')
+        zopen2 = zipf.open('ones')
+        data1 = zopen1.read(500)
+        data2 = zopen2.read(500)
+        data1 += zopen1.read(500)
+        data2 += zopen2.read(500)
+        self.assertEqual(data1, data2)
+        zipf.close()
+
+    def testDifferentFile(self):
+        # Verify that (when the ZipFile is in control of creating file objects)
+        # multiple open() calls can be made without interfering with each other.
+        zipf = zipfile.ZipFile(TESTFN2, mode="r")
+        zopen1 = zipf.open('ones')
+        zopen2 = zipf.open('twos')
+        data1 = zopen1.read(500)
+        data2 = zopen2.read(500)
+        data1 += zopen1.read(500)
+        data2 += zopen2.read(500)
+        self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
+        self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
+        zipf.close()
+
+    def testInterleaved(self):
+        # Verify that (when the ZipFile is in control of creating file objects)
+        # multiple open() calls can be made without interfering with each other.
+        zipf = zipfile.ZipFile(TESTFN2, mode="r")
+        zopen1 = zipf.open('ones')
+        data1 = zopen1.read(500)
+        zopen2 = zipf.open('twos')
+        data2 = zopen2.read(500)
+        data1 += zopen1.read(500)
+        data2 += zopen2.read(500)
+        self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
+        self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
+        zipf.close()
+       
+    def tearDown(self):
+        os.remove(TESTFN2)
+        
+
+class UniversalNewlineTests(unittest.TestCase):
+    def setUp(self):
+        self.line_gen = ["Test of zipfile line %d." % i for i in xrange(FIXEDTEST_SIZE)]
+        self.seps = ('\r', '\r\n', '\n')
+        self.arcdata, self.arcfiles = {}, {}
+        for n, s in enumerate(self.seps):
+            self.arcdata[s] = s.join(self.line_gen) + s
+            self.arcfiles[s] = '%s-%d' % (TESTFN, n)
+            file(self.arcfiles[s], "wb").write(self.arcdata[s])
+
+    def makeTestArchive(self, f, compression):
+        # Create the ZIP archive
+        zipfp = zipfile.ZipFile(f, "w", compression)
+        for fn in self.arcfiles.values():
+            zipfp.write(fn, fn)
+        zipfp.close()
+
+    def readTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r")
+        for sep, fn in self.arcfiles.items():
+            zipdata = zipfp.open(fn, "rU").read()
+            self.assertEqual(self.arcdata[sep], zipdata)
+
+        zipfp.close()
+        
+    def readlineTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r")
+        for sep, fn in self.arcfiles.items():
+            zipopen = zipfp.open(fn, "rU")
+            for line in self.line_gen:
+                linedata = zipopen.readline()
+                self.assertEqual(linedata, line + '\n')
+
+        zipfp.close()
+
+    def readlinesTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r")
+        for sep, fn in self.arcfiles.items():
+            ziplines = zipfp.open(fn, "rU").readlines()
+            for line, zipline in zip(self.line_gen, ziplines):
+                self.assertEqual(zipline, line + '\n')
+
+        zipfp.close()
+
+    def iterlinesTest(self, f, compression):
+        self.makeTestArchive(f, compression)
+
+        # Read the ZIP archive
+        zipfp = zipfile.ZipFile(f, "r")
+        for sep, fn in self.arcfiles.items():
+            for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
+                self.assertEqual(zipline, line + '\n')
+
+        zipfp.close()
+
+    def testReadStored(self):         
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.readTest(f, zipfile.ZIP_STORED)
+    
+    def testReadlineStored(self):         
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.readlineTest(f, zipfile.ZIP_STORED)
+
+    def testReadlinesStored(self):            
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.readlinesTest(f, zipfile.ZIP_STORED)
+
+    def testIterlinesStored(self):            
+        for f in (TESTFN2, TemporaryFile(), StringIO()):
+            self.iterlinesTest(f, zipfile.ZIP_STORED)
+       
+    if zlib:
+        def testReadDeflated(self):            
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.readTest(f, zipfile.ZIP_DEFLATED)
+
+        def testReadlineDeflated(self):            
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.readlineTest(f, zipfile.ZIP_DEFLATED)
+
+        def testReadlinesDeflated(self):            
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.readlinesTest(f, zipfile.ZIP_DEFLATED)
+
+        def testIterlinesDeflated(self):            
+            for f in (TESTFN2, TemporaryFile(), StringIO()):
+                self.iterlinesTest(f, zipfile.ZIP_DEFLATED)
+
+    def tearDown(self):
+        for sep, fn in self.arcfiles.items():
+            os.remove(fn)
+
+
 def test_main():
     run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests, 
-                 PyZipFileTests, DecryptionTests)
+                 PyZipFileTests, DecryptionTests, TestsWithMultipleOpens, 
+                 UniversalNewlineTests, TestsWithRandomBinaryFiles)
+
     #run_unittest(TestZip64InSmallFiles)
 
 if __name__ == "__main__":

Modified: python/trunk/Lib/zipfile.py
==============================================================================
--- python/trunk/Lib/zipfile.py	(original)
+++ python/trunk/Lib/zipfile.py	Tue Mar  6 11:41:24 2007
@@ -355,6 +355,200 @@
         self._UpdateKeys(c)
         return c
 
+class ZipExtFile:
+    """File-like object for reading an archive member.
+       Is returned by ZipFile.open(). 
+    """
+    
+    def __init__(self, fileobj, zipinfo, decrypt=None):
+        self.fileobj = fileobj
+        self.decrypter = decrypt
+        self.bytes_read = 0L
+        self.rawbuffer = ''
+        self.readbuffer = ''
+        self.linebuffer = ''
+        self.eof = False
+        self.univ_newlines = False
+        self.nlSeps = ("\n", )
+        self.lastdiscard = ''
+
+        self.compress_type = zipinfo.compress_type
+        self.compress_size = zipinfo.compress_size
+        
+        self.closed  = False
+        self.mode    = "r"
+        self.name = zipinfo.filename
+
+        # read from compressed files in 64k blocks
+        self.compreadsize = 64*1024
+        if self.compress_type == ZIP_DEFLATED:
+            self.dc = zlib.decompressobj(-15)
+
+    def set_univ_newlines(self, univ_newlines):
+        self.univ_newlines = univ_newlines
+        
+        # pick line separator char(s) based on universal newlines flag
+        self.nlSeps = ("\n", )
+        if self.univ_newlines:
+            self.nlSeps = ("\r\n", "\r", "\n")
+
+    def __iter__(self):
+        return self
+        
+    def next(self):
+        nextline = self.readline()
+        if not nextline:
+            raise StopIteration()
+
+        return nextline
+
+    def close(self):
+        self.closed = True
+
+    def _checkfornewline(self):
+        nl, nllen = -1, -1
+        if self.linebuffer:
+            # ugly check for cases where half of an \r\n pair was
+            # read on the last pass, and the \r was discarded.  In this
+            # case we just throw away the \n at the start of the buffer.
+            if (self.lastdiscard, self.linebuffer[0]) == ('\r','\n'):
+                self.linebuffer = self.linebuffer[1:]
+
+            for sep in self.nlSeps:                
+                nl = self.linebuffer.find(sep)
+                if nl >= 0:
+                    nllen = len(sep)
+                    return nl, nllen
+
+        return nl, nllen
+        
+    def readline(self, size = -1):
+        """Read a line with approx. size. If size is negative,
+           read a whole line. 
+        """
+        if size < 0:
+            size = sys.maxint
+        elif size == 0:
+            return ''
+
+        # check for a newline already in buffer
+        nl, nllen = self._checkfornewline()
+        
+        if nl >= 0:
+            # the next line was already in the buffer
+            nl = min(nl, size)
+        else:
+            # no line break in buffer - try to read more
+            size -= len(self.linebuffer)
+            while nl < 0 and size > 0:
+                buf = self.read(min(size, 100))
+                if not buf:
+                    break
+                self.linebuffer += buf
+                size -= len(buf)
+
+                # check for a newline in buffer
+                nl, nllen = self._checkfornewline()
+                
+            # we either ran out of bytes in the file, or
+            # met the specified size limit without finding a newline,
+            # so return current buffer
+            if nl < 0:
+                s = self.linebuffer
+                self.linebuffer = ''
+                return s
+
+        buf = self.linebuffer[:nl]
+        self.lastdiscard = self.linebuffer[nl:nl + nllen]
+        self.linebuffer = self.linebuffer[nl + nllen:]
+
+        # line is always returned with \n as newline char (except possibly
+        # for a final incomplete line in the file, which is handled above).
+        return buf + "\n"
+
+    def readlines(self, sizehint = -1):
+        """Return a list with all (following) lines. The sizehint parameter
+        is ignored in this implementation.
+        """
+        result = []
+        while True:
+            line = self.readline()
+            if not line: break
+            result.append(line)
+        return result
+
+    def read(self, size = None):
+        # act like file() obj and return empty string if size is 0
+        if size == 0:
+            return ''
+
+        # determine read size
+        bytesToRead = self.compress_size - self.bytes_read
+
+        # adjust read size for encrypted files since the first 12 bytes
+        # are for the encryption/password information
+        if self.decrypter is not None:
+            bytesToRead -= 12
+
+        if size is not None and size >= 0:
+            if self.compress_type == ZIP_STORED:
+                lr = len(self.readbuffer)
+                bytesToRead = min(bytesToRead, size - lr)
+            elif self.compress_type == ZIP_DEFLATED:
+                if len(self.readbuffer) > size:
+                    # the user has requested fewer bytes than we've already
+                    # pulled through the decompressor; don't read any more
+                    bytesToRead = 0
+                else:
+                    # user will use up the buffer, so read some more
+                    lr = len(self.rawbuffer)
+                    bytesToRead = min(bytesToRead, self.compreadsize - lr)
+
+        # avoid reading past end of file contents
+        if bytesToRead + self.bytes_read > self.compress_size:
+            bytesToRead = self.compress_size - self.bytes_read
+
+        # try to read from file (if necessary)
+        if bytesToRead > 0:
+            bytes = self.fileobj.read(bytesToRead)
+            self.bytes_read += len(bytes)
+            self.rawbuffer += bytes
+
+            # handle contents of raw buffer
+            if self.rawbuffer:
+                newdata = self.rawbuffer
+                self.rawbuffer = ''
+
+                # decrypt new data if we were given an object to handle that
+                if newdata and self.decrypter is not None:
+                    newdata = ''.join(map(self.decrypter, newdata))
+
+                # decompress newly read data if necessary
+                if newdata and self.compress_type == ZIP_DEFLATED:
+                    newdata = self.dc.decompress(newdata)
+                    self.rawbuffer = self.dc.unconsumed_tail
+                    if self.eof and len(self.rawbuffer) == 0:
+                        # we're out of raw bytes (both from the file and 
+                        # the local buffer); flush just to make sure the 
+                        # decompressor is done
+                        newdata += self.dc.flush()
+                        # prevent decompressor from being used again
+                        self.dc = None
+
+                self.readbuffer += newdata
+
+
+        # return what the user asked for
+        if size is None or len(self.readbuffer) <= size:
+            bytes = self.readbuffer
+            self.readbuffer = ''
+        else:
+            bytes = self.readbuffer[:size]
+            self.readbuffer = self.readbuffer[size:]
+
+        return bytes
+  
+
 class ZipFile:
     """ Class with methods to open, read, write, close, list zip files.
 
@@ -534,73 +728,75 @@
 
     def read(self, name, pwd=None):
         """Return file bytes (as a string) for name."""
-        if self.mode not in ("r", "a"):
-            raise RuntimeError, 'read() requires mode "r" or "a"'
+        return self.open(name, "r", pwd).read()
+
+    def open(self, name, mode="r", pwd=None):
+        """Return file-like object for 'name'."""
+        if mode not in ("r", "U", "rU"):
+            raise RuntimeError, 'open() requires mode "r", "U", or "rU"'
         if not self.fp:
             raise RuntimeError, \
                   "Attempt to read ZIP archive that was already closed"
+
+        # Only open a new file for instances where we were not 
+        # given a file object in the constructor
+        if self._filePassed:
+            zef_file = self.fp
+        else:
+            zef_file = open(self.filename, 'rb')
+
+        # Get info object for name
         zinfo = self.getinfo(name)
-        is_encrypted = zinfo.flag_bits & 0x1
-        if is_encrypted:
-            if not pwd:
-                pwd = self.pwd
-            if not pwd:
-                raise RuntimeError, "File %s is encrypted, " \
-                      "password required for extraction" % name
-        filepos = self.fp.tell()
 
-        self.fp.seek(zinfo.header_offset, 0)
+        filepos = zef_file.tell()
+
+        zef_file.seek(zinfo.header_offset, 0)
 
         # Skip the file header:
-        fheader = self.fp.read(30)
+        fheader = zef_file.read(30)
         if fheader[0:4] != stringFileHeader:
             raise BadZipfile, "Bad magic number for file header"
 
         fheader = struct.unpack(structFileHeader, fheader)
-        fname = self.fp.read(fheader[_FH_FILENAME_LENGTH])
+        fname = zef_file.read(fheader[_FH_FILENAME_LENGTH])
         if fheader[_FH_EXTRA_FIELD_LENGTH]:
-            self.fp.read(fheader[_FH_EXTRA_FIELD_LENGTH])
+            zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH])
 
         if fname != zinfo.orig_filename:
             raise BadZipfile, \
                       'File name in directory "%s" and header "%s" differ.' % (
                           zinfo.orig_filename, fname)
 
-        bytes = self.fp.read(zinfo.compress_size)
-        # Go with decryption
+        # check for encrypted flag & handle password
+        is_encrypted = zinfo.flag_bits & 0x1
+        zd = None
         if is_encrypted:
+            if not pwd:
+                pwd = self.pwd
+            if not pwd:
+                raise RuntimeError, "File %s is encrypted, " \
+                      "password required for extraction" % name
+
             zd = _ZipDecrypter(pwd)
             # The first 12 bytes in the cypher stream is an encryption header
             #  used to strengthen the algorithm. The first 11 bytes are
             #  completely random, while the 12th contains the MSB of the CRC,
             #  and is used to check the correctness of the password.
+            bytes = zef_file.read(12)
             h = map(zd, bytes[0:12])
             if ord(h[11]) != ((zinfo.CRC>>24)&255):
                 raise RuntimeError, "Bad password for file %s" % name
-            bytes = "".join(map(zd, bytes[12:]))
-        # Go with decompression
-        self.fp.seek(filepos, 0)
-        if zinfo.compress_type == ZIP_STORED:
-            pass
-        elif zinfo.compress_type == ZIP_DEFLATED:
-            if not zlib:
-                raise RuntimeError, \
-                      "De-compression requires the (missing) zlib module"
-            # zlib compress/decompress code by Jeremy Hylton of CNRI
-            dc = zlib.decompressobj(-15)
-            bytes = dc.decompress(bytes)
-            # need to feed in unused pad byte so that zlib won't choke
-            ex = dc.decompress('Z') + dc.flush()
-            if ex:
-                bytes = bytes + ex
-        else:
-            raise BadZipfile, \
-                  "Unsupported compression method %d for file %s" % \
-            (zinfo.compress_type, name)
-        crc = binascii.crc32(bytes)
-        if crc != zinfo.CRC:
-            raise BadZipfile, "Bad CRC-32 for file %s" % name
-        return bytes
+
+        # build and return a ZipExtFile
+        if zd is None:
+            zef = ZipExtFile(zef_file, zinfo)
+        else:
+            zef = ZipExtFile(zef_file, zinfo, zd)
+
+        # set universal newlines on ZipExtFile if necessary
+        if "U" in mode:
+            zef.set_univ_newlines(True)
+        return zef
 
     def _writecheck(self, zinfo):
         """Check for errors before writing a file to the archive."""

Modified: python/trunk/Misc/NEWS
==============================================================================
--- python/trunk/Misc/NEWS	(original)
+++ python/trunk/Misc/NEWS	Tue Mar  6 11:41:24 2007
@@ -139,6 +139,8 @@
 Library
 -------
 
+- Patch #1121142: Implement ZipFile.open.
+
 - Taught setup.py how to locate Berkeley DB on Macs using MacPorts.
 
 - Added heapq.merge() for merging sorted input streams.


More information about the Python-checkins mailing list