[Python-checkins] python/nondist/sandbox/pep262 install_db.py,NONE,1.1

akuchling@users.sourceforge.net akuchling@users.sourceforge.net
Sun, 30 Mar 2003 09:05:16 -0800


Update of /cvsroot/python/python/nondist/sandbox/pep262
In directory sc8-pr-cvs1:/tmp/cvs-serv7316

Added Files:
	install_db.py 
Log Message:
Old installation database code that I have lying around.  I haven't checked 
that it still works, but will begin working on it after lunch.


--- NEW FILE: install_db.py ---
"""distutils.install_db

Code for the database of installed Python packages.

"""

# XXX next steps:
#   1) write test cases for this module
#   2) integrate into install_* commands
# 2.5) take it to the Distutils-SIG
#   3) write a package manager

__revision__ = "$Id: install_db.py,v 1.1 2003/03/30 17:05:13 akuchling Exp $"

import os, sys
import binascii, cStringIO, sha, rfc822

from distutils.dist import DistributionMetadata

INSTALLDB = ('%s%slib%spython%i.%i%sinstall' % (sys.prefix, os.sep, 
                                                 os.sep,
                                                 sys.version_info[0],
                                                 sys.version_info[1],
                                                 os.sep))
INSTALLDB = '/tmp/i'

_inst_db = None
def get_install_db ():
    global _inst_db
    if _inst_db is None:
        _inst_db = InstallationDatabase()
    return _inst_db

class InstallationDatabase:
    def __init__ (self, path=None):
        """InstallationDatabase(path:string)
        Read the installation database rooted at the specified path.
        If path is None, INSTALLDB is used as the default.    
        """
        if path is None:
            path = INSTALLDB
        self.path = path
        self._cache = {}
        
    def get_package (self, package_name):
        """get_package(package_name:string) : Package
        Get the object corresponding to a single package.
        """
        try:
            return self._cache[package_name]
        except KeyError:
            for package in self:
                if package.name == package_name:
                    self._cache[package_name] = package
                    return package

            return None
        
    def list_packages (self):
        """list_packages() : [Package]
        Return a list of all packages installed on the system, 
        enumerated in no particular order.
        """
        return list(self)
    
    def find_package (self, path):
        """find_file(path:string) : Package
        Search and return the package containing the file 'path'.  
        Returns None if the file doesn't belong to any package
        that the InstallationDatabase knows about.
        XXX should this work for directories?
        """
        for package in self:
            if package.has_file(path):
                return package
        return None
             
    def __iter__ (self):
        return _InstallDBIterator(self)
    
# class InstallationDatabase


class _InstallDBIterator:
    def __init__ (self, instdb):
        self.instdb = instdb
        self.queue = [instdb.path]
        

    def next (self):
        if len(self.queue) == 0:
            raise StopIteration

        while len(self.queue):
            filename = self.queue.pop(0)
            if os.path.isdir(filename):
                for fn2 in os.listdir(filename):
                    self.queue.append( os.path.join(filename, fn2))
            else:
                break

        return Package(filename)
        

class Package(DistributionMetadata):
    """Instance attributes:
    filename : string
      Name of file in which the package's data is stored.
    files : {string : (size:int, perms:int, owner:string, group:string,
                       digest:string)}
       Dictionary mapping the path of a file installed by this package 
       to information about the file.
    """

    def __init__ (self, filename=None):
        DistributionMetadata.__init__(self)
        self.files = {}
        self.filename = filename
        if filename is not None:
            self.read_file()
        
    def __repr__ (self):
        return '<Package %s: %s>' % (self.name, self.filename)

    def read_file (self):
        input = open(self.filename, 'rt')

        sections = input.readline().split()

        if 'PKG-INFO' in sections:
            m = rfc822.Message(input)
            self.read_pkg_info(m)
            
        if 'FILES' in sections:
            while 1:
                line = input.readline()
                if line.strip() == "":
                    break

                line = line.split()
                line = line[:6]
                path, size, perms, owner, group, shasum = line
                self.files[path] = (int(size), int(perms),
                                    owner, group, shasum)

        input.close()
        
    def add_file (self, path):
        """add_file(path:string):None
        Record the size, ownership, &c., information for an installed file.
        XXX as written, this would stat() the file.  Should the size/perms/
        checksum all be provided as parameters to this method instead?
        """
        if not os.path.isfile(path):
            return
        # XXX what to do when hashing: binary or text mode?
        input = open(path, 'rb')
        digest = _hash_file(input)
        input.close()
        stats = os.stat(path)
        self.files[path] = (stats.st_size, stats.st_mode,
                            stats.st_uid, stats.st_gid, digest)
                            
        
    def has_file (self, path):
        """has_file(path:string) : Boolean
        Returns true if the specified path belongs to a file in this
        package.
        """
        return self.files.has_key(path)


    def check_file (self, path):
        """check_file(path:string) : [string]
        Checks whether the file's size, checksum, and ownership match,
        returning a possibly-empty list of mismatches.
        """
        input = open(path, 'rb')
        digest = _hash_file(input)
        input.close()
        expected = self.files[path]
        stats = os.stat(path)

        L = []
        if stats.st_size != expected[0]:
            L.append('Modified size: %i (expected %i)' %
                     (stats.st_size, expected[0]))
        if stats.st_mode != expected[1]:
            L.append('Modified mode: %i (expected %i)' %
                     (stats.st_mode, expected[1]))
        if expected[2] != 'unknown' and stats.st_uid != expected[2]:
            L.append('Modified user ownership: %s (expected %s)' %
                     (stats.st_uid, expected[2]))
        if expected[3] != 'unknown' and stats.st_gid != expected[3]:
            L.append('Modified group ownership: %s (expected %s)' %
                     (stats.st_gid, expected[3]))
        if digest != expected[4]:
            L.append('Incorrect SHA digest')

        return L
        
    def as_text (self):
        output = cStringIO.StringIO()
        print >>output, 'PKG-INFO FILES'
        self._write_pkg_info(output)
        output.write('\n')
        for path, t in self.files.items():
            line = '%s\t%s\t%s\t%s\t%s\t%s' % (path,
                                               t[0], t[1], t[2], t[3], t[4])
            print >>output, line
        output.write('\n')
        return output.getvalue()
        
# class Package

def _hash_file (input):
    h = sha.new()
    while 1:
        data = input.read(4096)
        if data == "":
            break
        h.update(data)
    digest = binascii.b2a_hex(h.digest())
    return digest


if __name__ == '__main__':
    db = InstallationDatabase('/tmp/i/')
    for p in db:
        print p.__dict__
    print db.list_packages()
    f = open('/tmp/i2', 'wt')
    f.write(p.as_text())
    f.close()
    print p.check_file('/www/bin/apachectl')