[Python-checkins] r57431 - sandbox/trunk/import_in_py/zipimport_/zipimport.py

brett.cannon python-checkins at python.org
Sat Aug 25 01:34:36 CEST 2007


Author: brett.cannon
Date: Sat Aug 25 01:34:36 2007
New Revision: 57431

Modified:
   sandbox/trunk/import_in_py/zipimport_/zipimport.py
Log:
Flesh out rest of the PEP 302 optional extensions.  Tests are currently
lacking, though.

Also do not keep the zip file unnecessarily open.

Lastly, add comments about how path info currently differs.  The original
zipimporter tacked on path info (both for __file__ and __path__) on to the path
to the zip file.  Need to change the implementation to suport this.


Modified: sandbox/trunk/import_in_py/zipimport_/zipimport.py
==============================================================================
--- sandbox/trunk/import_in_py/zipimport_/zipimport.py	(original)
+++ sandbox/trunk/import_in_py/zipimport_/zipimport.py	Sat Aug 25 01:34:36 2007
@@ -1,8 +1,11 @@
 """A re-implementation of zipimport to use importlib."""
 import importlib
 
+import contextlib
+import datetime
 import imp
 import os
+import time
 import zipfile
 
 
@@ -14,6 +17,8 @@
 
     """XXX Might need to implement 'archive', 'prefix' attributes."""
 
+    _handler = importlib.handle_code
+
     def __init__(self, archivepath):
         """Open the specified zip file.
 
@@ -21,41 +26,66 @@
         than a zip file is passed in then ZipImportError is raised.
 
         """
+        # XXX Need to tweak to handle zip archive package info like
+        # _pkg.zip/pkg
         if not zipfile.is_zipfile(archivepath):
             raise ZipImportError("the specified path must be a zip file")
-        self._zip = zipfile.ZipFile(archivepath, 'r')
+        # XXX Don't open file; open as needed.
         self._path = archivepath
+        self._path_cache = {}
+
+    def _check_paths(self, base_path):
+        source_suffixes = importlib.suffix_list(imp.PY_SOURCE)
+        bytecode_suffixes = importlib.suffix_list(imp.PY_COMPILED)
+        source, bytecode = None, None
+        with contextlib.closing(zipfile.ZipFile(self._path)) as zip_:
+            for suffix in source_suffixes:
+                path = base_path + suffix
+                try:
+                    zip_.getinfo(path)
+                except KeyError:
+                    continue
+                else:
+                    source = path
+                    break
+            for suffix in bytecode_suffixes:
+                path = base_path + suffix
+                try:
+                    zip_.getinfo(path)
+                except KeyError:
+                    continue
+                else:
+                    bytecode = path
+        if source is not None or bytecode is not None:
+            return source, bytecode
+        else:
+            return None
 
     def find_module(self, fullname, path=None):
         """Check if the specified module is contained within the zip file,
         returning self if it is or None if it is not."""
         path_name = fullname.replace('.', os.sep)
-        suffixes = importlib.suffix_list(imp.PY_COMPILED)
-        suffixes += importlib.suffix_list(imp.PY_SOURCE)
-        for suffix in suffixes:
-            pkg_init_path = os.path.join(path_name, '__init__' + suffix)
-            try:
-                self._zip.getinfo(pkg_init_path)
-            except KeyError:
-                continue
-            else:
-                return self
-        for suffix in suffixes:
-            file_path = path_name + suffix
-            try:
-                self._zip.getinfo(file_path)
-            except KeyError:
-                continue
-            else:
-                return self
-        else:
-            return None
-
+        # Look for a package.
+        base_path = os.path.join(path_name, '__init__')
+        results = self._check_paths(base_path)
+        if results is not None:
+            self._path_cache[fullname] = results + (True,)
+            return self
+        results = self._check_paths(path_name)
+        if results is not None:
+            self._path_cache[fullname] = results + (False,)
+            return self
+        return None
 
     def get_code(self, fullname):
         """Return the code object for the module, raising ZipImportError if the
         module is not found."""
-        raise NotImplementedError
+        try:
+            info = self._path_cache[fullname]
+        except KeyError:
+            raise ZipImportError('%s is not known' % fullname)
+        # XXX Change the paths to start with the path to the zipfile.
+        return self._handler(fullname, info[0], info[1])[0]
 
     def get_data(self, pathname):
         """Return the data (raw source or bytecode) for the specified module,
@@ -65,37 +95,68 @@
         source or bytecode files.
 
         """
-        try:
-            return self._zip.open(pathname, 'r').read()
-        except KeyError:
-            raise IOError('%s does not exist in the zip file %s' % (pathname,
-                            self._path))
+        with contextlib.closing(zipfile.ZipFile(self._path)) as zip_:
+            try:
+                return zip_.open(pathname, 'r').read()
+            except KeyError:
+                raise IOError('%s does not exist in the zip file %s' % (pathname,
+                                self._path))
 
     def get_source(self, fullname):
         """Get the source code for the specified module, raising ZipImportError
         if the module does not exist, or None if only bytecode exists."""
-        raise NotImplementedError
+        try:
+            info = self._path_cache[fullname]
+        except KeyError:
+            raise ZipImportError("%s is not known" % fullname)
+        if info[0] is None:
+            return None
+        with importlib.closing(zipfile.ZipFile(self._path)) as zip_:
+            return zip_.open(info[0], 'U').read()
 
-    # XXX Are the PEP 302 optional extensions allowed to assume find_module has
-    # been called before any of the optional methods?
     def is_package(self, fullname):
         """Return True if the module name represents a package, False if it
         does not."""
-        path_name = fullname.replace('.', os.sep)
-        init_path = os.path.join(path_name, '__init__.py')
         try:
-            self._zip.getinfo(init_path)
+            return self._path_cache[fullname][2]
         except KeyError:
-            try:
-                self._zip.getinfo(init_path + ('c' if __debug__ else 'o'))
-            except KeyError:
-                return False
-            else:
-                return True
-        else:
-            return True
+            raise ZipImportError("%s is not known" % fullname)
+
+    def mod_time(self, name):
+        """Return the last modification time of the module's source code from
+        the epoch (based on the local time)."""
+        # Use time.mktime to get timestamp.
+        try:
+            info = self._path_cache[fullname]
+        except KeyError:
+            raise ZipImportError('%s is not known' % name)
+        with contextlib.closing(zipfile.ZipFile(self._path)) as zip_:
+            file_info = zip_.getinfo(info[0])
+        file_mtime = datetime.datetime(*file_info.date_time)
+        return int(time.mktime(file_time.timetuple()))
+
+    def get_bytecode(self, name):
+        """Get the bytecode for the module."""
+        try:
+            bytecode_path = self._path_cache[name][1]
+        except KeyError:
+            raise ZipImportError('%s is not known' % name)
+        with contextlib.closing(zipfile.ZipFile(self._path)) as zip_:
+            return zip_.open(bytecode_path, 'r').read()
+
+    def write_bytecode(self, name, timestamp, data):
+        """Return False as zip files are never modified."""
+        return False
 
     def load_module(self, fullname):
         """Load the specified module, returning the module or raising
         ZipImportError if the module could not be found."""
-        raise NotImplementedError
+        try:
+            info = self._path_cache[fullname]
+        except KeyError:
+            raise ZipImportError('%s is not known' % fullname)
+        # XXX Change paths to be os.path.join(self._path, info[0])
+        code_object, path = self._handler(fullname, info[0], info[1])
+        # XXX __path__ needs to start with the zip file.
+        return importlib.module_init(loader, code_object, fullname, path,
+                                        info[2])


More information about the Python-checkins mailing list