[Jython-checkins] jython: Patched tarfile.py and test/test_tarfile.py against CPythonLib2.7
alex.gronholm
jython-checkins at python.org
Thu Mar 15 19:48:14 CET 2012
http://hg.python.org/jython/rev/1f7dce131a96
changeset: 6391:1f7dce131a96
user: Alex Grönholm <alex.gronholm at nextday.fi>
date: Thu Mar 15 10:30:47 2012 -0700
summary:
Patched tarfile.py and test/test_tarfile.py against CPythonLib2.7
files:
Lib/tarfile.py | 1283 +++++++++++-----
Lib/test/test_tarfile.py | 1967 +++++++++++++++++--------
2 files changed, 2186 insertions(+), 1064 deletions(-)
diff --git a/Lib/tarfile.py b/Lib/tarfile.py
--- a/Lib/tarfile.py
+++ b/Lib/tarfile.py
@@ -30,13 +30,13 @@
"""Read from and write to tar format archives.
"""
-__version__ = "$Revision: 60730 $"
+__version__ = "$Revision: 85213 $"
# $Source$
-version = "0.8.0"
+version = "0.9.0"
__author__ = "Lars Gustäbel (lars at gustaebel.de)"
-__date__ = "$Date: 2008-02-11 10:36:07 -0800 (Mon, 11 Feb 2008) $"
-__cvsid__ = "$Id: tarfile.py 60730 2008-02-11 18:36:07Z lars.gustaebel $"
+__date__ = "$Date: 2010-10-04 08:37:53 -0700 (ma, 04 loka  2010) $"
+__cvsid__ = "$Id: tarfile.py 85213 2010-10-04 15:37:53Z lars.gustaebel $"
__credits__ = "Gustavo Niemeyer, Niels Gustäbel, Richard Townsend."
#---------
@@ -50,13 +50,8 @@
import time
import struct
import copy
-
-if sys.platform == 'mac':
- # This module needs work for MacOS9, especially in the area of pathname
- # handling. In many places it is assumed a simple substitution of / by the
- # local os.path.sep is good enough to convert pathnames, but this does not
- # work with the mac rooted:path:name versus :nonrooted:path:name syntax
- raise ImportError, "tarfile does not work for platform==mac"
+import re
+import operator
try:
import grp, pwd
@@ -69,42 +64,71 @@
#---------------------------------------------------------
# tar constants
#---------------------------------------------------------
-NUL = "\0" # the null character
-BLOCKSIZE = 512 # length of processing blocks
+NUL = "\0" # the null character
+BLOCKSIZE = 512 # length of processing blocks
RECORDSIZE = BLOCKSIZE * 20 # length of records
-MAGIC = "ustar" # magic tar string
-VERSION = "00" # version number
+GNU_MAGIC = "ustar \0" # magic gnu tar string
+POSIX_MAGIC = "ustar\x0000" # magic posix tar string
-LENGTH_NAME = 100 # maximum length of a filename
-LENGTH_LINK = 100 # maximum length of a linkname
-LENGTH_PREFIX = 155 # maximum length of the prefix field
-MAXSIZE_MEMBER = 077777777777L # maximum size of a file (11 octal digits)
+LENGTH_NAME = 100 # maximum length of a filename
+LENGTH_LINK = 100 # maximum length of a linkname
+LENGTH_PREFIX = 155 # maximum length of the prefix field
-REGTYPE = "0" # regular file
+REGTYPE = "0" # regular file
AREGTYPE = "\0" # regular file
-LNKTYPE = "1" # link (inside tarfile)
-SYMTYPE = "2" # symbolic link
-CHRTYPE = "3" # character special device
-BLKTYPE = "4" # block special device
-DIRTYPE = "5" # directory
+LNKTYPE = "1" # link (inside tarfile)
+SYMTYPE = "2" # symbolic link
+CHRTYPE = "3" # character special device
+BLKTYPE = "4" # block special device
+DIRTYPE = "5" # directory
FIFOTYPE = "6" # fifo special device
CONTTYPE = "7" # contiguous file
-GNUTYPE_LONGNAME = "L" # GNU tar extension for longnames
-GNUTYPE_LONGLINK = "K" # GNU tar extension for longlink
-GNUTYPE_SPARSE = "S" # GNU tar extension for sparse file
+GNUTYPE_LONGNAME = "L" # GNU tar longname
+GNUTYPE_LONGLINK = "K" # GNU tar longlink
+GNUTYPE_SPARSE = "S" # GNU tar sparse file
+
+XHDTYPE = "x" # POSIX.1-2001 extended header
+XGLTYPE = "g" # POSIX.1-2001 global header
+SOLARIS_XHDTYPE = "X" # Solaris extended header
+
+USTAR_FORMAT = 0 # POSIX.1-1988 (ustar) format
+GNU_FORMAT = 1 # GNU tar format
+PAX_FORMAT = 2 # POSIX.1-2001 (pax) format
+DEFAULT_FORMAT = GNU_FORMAT
#---------------------------------------------------------
# tarfile constants
#---------------------------------------------------------
-SUPPORTED_TYPES = (REGTYPE, AREGTYPE, LNKTYPE, # file types that tarfile
- SYMTYPE, DIRTYPE, FIFOTYPE, # can cope with.
+# File types that tarfile supports:
+SUPPORTED_TYPES = (REGTYPE, AREGTYPE, LNKTYPE,
+ SYMTYPE, DIRTYPE, FIFOTYPE,
CONTTYPE, CHRTYPE, BLKTYPE,
GNUTYPE_LONGNAME, GNUTYPE_LONGLINK,
GNUTYPE_SPARSE)
-REGULAR_TYPES = (REGTYPE, AREGTYPE, # file types that somehow
- CONTTYPE, GNUTYPE_SPARSE) # represent regular files
+# File types that will be treated as a regular file.
+REGULAR_TYPES = (REGTYPE, AREGTYPE,
+ CONTTYPE, GNUTYPE_SPARSE)
+
+# File types that are part of the GNU tar format.
+GNU_TYPES = (GNUTYPE_LONGNAME, GNUTYPE_LONGLINK,
+ GNUTYPE_SPARSE)
+
+# Fields from a pax header that override a TarInfo attribute.
+PAX_FIELDS = ("path", "linkpath", "size", "mtime",
+ "uid", "gid", "uname", "gname")
+
+# Fields in a pax header that are numbers, all other fields
+# are treated as strings.
+PAX_NUMBER_FIELDS = {
+ "atime": float,
+ "ctime": float,
+ "mtime": float,
+ "uid": int,
+ "gid": int,
+ "size": int
+}
#---------------------------------------------------------
# Bits used in the mode field, values in octal.
@@ -131,6 +155,13 @@
TOEXEC = 0001 # execute/search by other
#---------------------------------------------------------
+# initialization
+#---------------------------------------------------------
+ENCODING = sys.getfilesystemencoding()
+if ENCODING is None:
+ ENCODING = sys.getdefaultencoding()
+
+#---------------------------------------------------------
# Some useful functions
#---------------------------------------------------------
@@ -154,7 +185,10 @@
# There are two possible encodings for a number field, see
# itn() below.
if s[0] != chr(0200):
- n = int(nts(s) or "0", 8)
+ try:
+ n = int(nts(s) or "0", 8)
+ except ValueError:
+ raise InvalidHeaderError("invalid header")
else:
n = 0L
for i in xrange(len(s) - 1):
@@ -162,7 +196,7 @@
n += ord(s[i + 1])
return n
-def itn(n, digits=8, posix=False):
+def itn(n, digits=8, format=DEFAULT_FORMAT):
"""Convert a python number to a number field.
"""
# POSIX 1003.1-1988 requires numbers to be encoded as a string of
@@ -174,7 +208,7 @@
if 0 <= n < 8 ** (digits - 1):
s = "%0*o" % (digits - 1, n) + NUL
else:
- if posix:
+ if format != GNU_FORMAT or n >= 256 ** (digits - 1):
raise ValueError("overflow in number field")
if n < 0:
@@ -189,6 +223,26 @@
s = chr(0200) + s
return s
+def uts(s, encoding, errors):
+ """Convert a unicode object to a string.
+ """
+ if errors == "utf-8":
+ # An extra error handler similar to the -o invalid=UTF-8 option
+ # in POSIX.1-2001. Replace untranslatable characters with their
+ # UTF-8 representation.
+ try:
+ return s.encode(encoding, "strict")
+ except UnicodeEncodeError:
+ x = []
+ for c in s:
+ try:
+ x.append(c.encode(encoding, "strict"))
+ except UnicodeEncodeError:
+ x.append(c.encode("utf8"))
+ return "".join(x)
+ else:
+ return s.encode(encoding, errors)
+
def calc_chksums(buf):
"""Calculate the checksum for a member's header by summing up all
characters except for the chksum field which is treated as if
@@ -269,11 +323,6 @@
perm.append("-")
return "".join(perm)
-if os.sep != "/":
- normpath = lambda path: os.path.normpath(path).replace(os.sep, "/")
-else:
- normpath = os.path.normpath
-
class TarError(Exception):
"""Base exception."""
pass
@@ -289,6 +338,24 @@
class StreamError(TarError):
"""Exception for unsupported operations on stream-like TarFiles."""
pass
+class HeaderError(TarError):
+ """Base exception for header errors."""
+ pass
+class EmptyHeaderError(HeaderError):
+ """Exception for empty headers."""
+ pass
+class TruncatedHeaderError(HeaderError):
+ """Exception for truncated headers."""
+ pass
+class EOFHeaderError(HeaderError):
+ """Exception for end of file headers."""
+ pass
+class InvalidHeaderError(HeaderError):
+ """Exception for invalid headers."""
+ pass
+class SubsequentHeaderError(HeaderError):
+ """Exception for missing and invalid extended headers."""
+ pass
#---------------------------
# internal stream interface
@@ -306,7 +373,7 @@
}[mode]
if hasattr(os, "O_BINARY"):
mode |= os.O_BINARY
- self.fd = os.open(name, mode)
+ self.fd = os.open(name, mode, 0666)
def close(self):
os.close(self.fd)
@@ -357,7 +424,7 @@
except ImportError:
raise CompressionError("zlib module is not available")
self.zlib = zlib
- self.crc = zlib.crc32("")
+ self.crc = zlib.crc32("") & 0xffffffffL
if mode == "r":
self._init_read_gz()
else:
@@ -395,7 +462,7 @@
"""Write string s to the stream.
"""
if self.comptype == "gz":
- self.crc = self.zlib.crc32(s, self.crc)
+ self.crc = self.zlib.crc32(s, self.crc) & 0xffffffffL
self.pos += len(s)
if self.comptype != "tar":
s = self.cmp.compress(s)
@@ -517,7 +584,10 @@
buf = self.__read(self.bufsize)
if not buf:
break
- buf = self.cmp.decompress(buf)
+ try:
+ buf = self.cmp.decompress(buf)
+ except IOError:
+ raise ReadError("invalid compressed data")
t.append(buf)
c += len(buf)
t = "".join(t)
@@ -578,6 +648,7 @@
def __init__(self, fileobj, mode):
self.fileobj = fileobj
self.mode = mode
+ self.name = getattr(self.fileobj, "name", None)
self.init()
def init(self):
@@ -594,12 +665,11 @@
b = [self.buf]
x = len(self.buf)
while x < size:
- try:
- raw = self.fileobj.read(self.blocksize)
- data = self.bz2obj.decompress(raw)
- b.append(data)
- except EOFError:
+ raw = self.fileobj.read(self.blocksize)
+ if not raw:
break
+ data = self.bz2obj.decompress(raw)
+ b.append(data)
x += len(data)
self.buf = "".join(b)
@@ -625,7 +695,6 @@
if self.mode == "w":
raw = self.bz2obj.flush()
self.fileobj.write(raw)
- self.fileobj.close()
# class _BZ2Proxy
#------------------------
@@ -850,8 +919,8 @@
"""Construct a TarInfo object. name is the optional name
of the member.
"""
- self.name = name # member name (dirnames must end with '/')
- self.mode = 0666 # file permissions
+ self.name = name # member name
+ self.mode = 0644 # file permissions
self.uid = 0 # user id
self.gid = 0 # group id
self.size = 0 # file size
@@ -859,147 +928,525 @@
self.chksum = 0 # header checksum
self.type = REGTYPE # member type
self.linkname = "" # link name
- self.uname = "user" # user name
- self.gname = "group" # group name
+ self.uname = "" # user name
+ self.gname = "" # group name
self.devmajor = 0 # device major number
self.devminor = 0 # device minor number
self.offset = 0 # the tar header starts here
self.offset_data = 0 # the file's data starts here
+ self.pax_headers = {} # pax header information
+
+ # In pax headers the "name" and "linkname" field are called
+ # "path" and "linkpath".
+ def _getpath(self):
+ return self.name
+ def _setpath(self, name):
+ self.name = name
+ path = property(_getpath, _setpath)
+
+ def _getlinkpath(self):
+ return self.linkname
+ def _setlinkpath(self, linkname):
+ self.linkname = linkname
+ linkpath = property(_getlinkpath, _setlinkpath)
+
def __repr__(self):
return "<%s %r at %#x>" % (self.__class__.__name__,self.name,id(self))
+ def get_info(self, encoding, errors):
+ """Return the TarInfo's attributes as a dictionary.
+ """
+ info = {
+ "name": self.name,
+ "mode": self.mode & 07777,
+ "uid": self.uid,
+ "gid": self.gid,
+ "size": self.size,
+ "mtime": self.mtime,
+ "chksum": self.chksum,
+ "type": self.type,
+ "linkname": self.linkname,
+ "uname": self.uname,
+ "gname": self.gname,
+ "devmajor": self.devmajor,
+ "devminor": self.devminor
+ }
+
+ if info["type"] == DIRTYPE and not info["name"].endswith("/"):
+ info["name"] += "/"
+
+ for key in ("name", "linkname", "uname", "gname"):
+ if type(info[key]) is unicode:
+ info[key] = info[key].encode(encoding, errors)
+
+ return info
+
+ def tobuf(self, format=DEFAULT_FORMAT, encoding=ENCODING, errors="strict"):
+ """Return a tar header as a string of 512 byte blocks.
+ """
+ info = self.get_info(encoding, errors)
+
+ if format == USTAR_FORMAT:
+ return self.create_ustar_header(info)
+ elif format == GNU_FORMAT:
+ return self.create_gnu_header(info)
+ elif format == PAX_FORMAT:
+ return self.create_pax_header(info, encoding, errors)
+ else:
+ raise ValueError("invalid format")
+
+ def create_ustar_header(self, info):
+ """Return the object as a ustar header block.
+ """
+ info["magic"] = POSIX_MAGIC
+
+ if len(info["linkname"]) > LENGTH_LINK:
+ raise ValueError("linkname is too long")
+
+ if len(info["name"]) > LENGTH_NAME:
+ info["prefix"], info["name"] = self._posix_split_name(info["name"])
+
+ return self._create_header(info, USTAR_FORMAT)
+
+ def create_gnu_header(self, info):
+ """Return the object as a GNU header block sequence.
+ """
+ info["magic"] = GNU_MAGIC
+
+ buf = ""
+ if len(info["linkname"]) > LENGTH_LINK:
+ buf += self._create_gnu_long_header(info["linkname"], GNUTYPE_LONGLINK)
+
+ if len(info["name"]) > LENGTH_NAME:
+ buf += self._create_gnu_long_header(info["name"], GNUTYPE_LONGNAME)
+
+ return buf + self._create_header(info, GNU_FORMAT)
+
+ def create_pax_header(self, info, encoding, errors):
+ """Return the object as a ustar header block. If it cannot be
+ represented this way, prepend a pax extended header sequence
+ with supplement information.
+ """
+ info["magic"] = POSIX_MAGIC
+ pax_headers = self.pax_headers.copy()
+
+ # Test string fields for values that exceed the field length or cannot
+ # be represented in ASCII encoding.
+ for name, hname, length in (
+ ("name", "path", LENGTH_NAME), ("linkname", "linkpath", LENGTH_LINK),
+ ("uname", "uname", 32), ("gname", "gname", 32)):
+
+ if hname in pax_headers:
+ # The pax header has priority.
+ continue
+
+ val = info[name].decode(encoding, errors)
+
+ # Try to encode the string as ASCII.
+ try:
+ val.encode("ascii")
+ except UnicodeEncodeError:
+ pax_headers[hname] = val
+ continue
+
+ if len(info[name]) > length:
+ pax_headers[hname] = val
+
+ # Test number fields for values that exceed the field limit or values
+ # that like to be stored as float.
+ for name, digits in (("uid", 8), ("gid", 8), ("size", 12), ("mtime", 12)):
+ if name in pax_headers:
+ # The pax header has priority. Avoid overflow.
+ info[name] = 0
+ continue
+
+ val = info[name]
+ if not 0 <= val < 8 ** (digits - 1) or isinstance(val, float):
+ pax_headers[name] = unicode(val)
+ info[name] = 0
+
+ # Create a pax extended header if necessary.
+ if pax_headers:
+ buf = self._create_pax_generic_header(pax_headers)
+ else:
+ buf = ""
+
+ return buf + self._create_header(info, USTAR_FORMAT)
+
+ @classmethod
+ def create_pax_global_header(cls, pax_headers):
+ """Return the object as a pax global header block sequence.
+ """
+ return cls._create_pax_generic_header(pax_headers, type=XGLTYPE)
+
+ def _posix_split_name(self, name):
+ """Split a name longer than 100 chars into a prefix
+ and a name part.
+ """
+ prefix = name[:LENGTH_PREFIX + 1]
+ while prefix and prefix[-1] != "/":
+ prefix = prefix[:-1]
+
+ name = name[len(prefix):]
+ prefix = prefix[:-1]
+
+ if not prefix or len(name) > LENGTH_NAME:
+ raise ValueError("name is too long")
+ return prefix, name
+
+ @staticmethod
+ def _create_header(info, format):
+ """Return a header block. info is a dictionary with file
+ information, format must be one of the *_FORMAT constants.
+ """
+ parts = [
+ stn(info.get("name", ""), 100),
+ itn(info.get("mode", 0) & 07777, 8, format),
+ itn(info.get("uid", 0), 8, format),
+ itn(info.get("gid", 0), 8, format),
+ itn(info.get("size", 0), 12, format),
+ itn(info.get("mtime", 0), 12, format),
+ " ", # checksum field
+ info.get("type", REGTYPE),
+ stn(info.get("linkname", ""), 100),
+ stn(info.get("magic", POSIX_MAGIC), 8),
+ stn(info.get("uname", ""), 32),
+ stn(info.get("gname", ""), 32),
+ itn(info.get("devmajor", 0), 8, format),
+ itn(info.get("devminor", 0), 8, format),
+ stn(info.get("prefix", ""), 155)
+ ]
+
+ buf = struct.pack("%ds" % BLOCKSIZE, "".join(parts))
+ chksum = calc_chksums(buf[-BLOCKSIZE:])[0]
+ buf = buf[:-364] + "%06o\0" % chksum + buf[-357:]
+ return buf
+
+ @staticmethod
+ def _create_payload(payload):
+ """Return the string payload filled with zero bytes
+ up to the next 512 byte border.
+ """
+ blocks, remainder = divmod(len(payload), BLOCKSIZE)
+ if remainder > 0:
+ payload += (BLOCKSIZE - remainder) * NUL
+ return payload
+
+ @classmethod
+ def _create_gnu_long_header(cls, name, type):
+ """Return a GNUTYPE_LONGNAME or GNUTYPE_LONGLINK sequence
+ for name.
+ """
+ name += NUL
+
+ info = {}
+ info["name"] = "././@LongLink"
+ info["type"] = type
+ info["size"] = len(name)
+ info["magic"] = GNU_MAGIC
+
+ # create extended header + name blocks.
+ return cls._create_header(info, USTAR_FORMAT) + \
+ cls._create_payload(name)
+
+ @classmethod
+ def _create_pax_generic_header(cls, pax_headers, type=XHDTYPE):
+ """Return a POSIX.1-2001 extended or global header sequence
+ that contains a list of keyword, value pairs. The values
+ must be unicode objects.
+ """
+ records = []
+ for keyword, value in pax_headers.iteritems():
+ keyword = keyword.encode("utf8")
+ value = value.encode("utf8")
+ l = len(keyword) + len(value) + 3 # ' ' + '=' + '\n'
+ n = p = 0
+ while True:
+ n = l + len(str(p))
+ if n == p:
+ break
+ p = n
+ records.append("%d %s=%s\n" % (p, keyword, value))
+ records = "".join(records)
+
+ # We use a hardcoded "././@PaxHeader" name like star does
+ # instead of the one that POSIX recommends.
+ info = {}
+ info["name"] = "././@PaxHeader"
+ info["type"] = type
+ info["size"] = len(records)
+ info["magic"] = POSIX_MAGIC
+
+ # Create pax header + record blocks.
+ return cls._create_header(info, USTAR_FORMAT) + \
+ cls._create_payload(records)
+
@classmethod
def frombuf(cls, buf):
"""Construct a TarInfo object from a 512 byte string buffer.
"""
+ if len(buf) == 0:
+ raise EmptyHeaderError("empty header")
if len(buf) != BLOCKSIZE:
- raise ValueError("truncated header")
+ raise TruncatedHeaderError("truncated header")
if buf.count(NUL) == BLOCKSIZE:
- raise ValueError("empty header")
+ raise EOFHeaderError("end of file header")
- tarinfo = cls()
- tarinfo.buf = buf
- tarinfo.name = nts(buf[0:100])
- tarinfo.mode = nti(buf[100:108])
- tarinfo.uid = nti(buf[108:116])
- tarinfo.gid = nti(buf[116:124])
- tarinfo.size = nti(buf[124:136])
- tarinfo.mtime = nti(buf[136:148])
- tarinfo.chksum = nti(buf[148:156])
- tarinfo.type = buf[156:157]
- tarinfo.linkname = nts(buf[157:257])
- tarinfo.uname = nts(buf[265:297])
- tarinfo.gname = nts(buf[297:329])
- tarinfo.devmajor = nti(buf[329:337])
- tarinfo.devminor = nti(buf[337:345])
+ chksum = nti(buf[148:156])
+ if chksum not in calc_chksums(buf):
+ raise InvalidHeaderError("bad checksum")
+
+ obj = cls()
+ obj.buf = buf
+ obj.name = nts(buf[0:100])
+ obj.mode = nti(buf[100:108])
+ obj.uid = nti(buf[108:116])
+ obj.gid = nti(buf[116:124])
+ obj.size = nti(buf[124:136])
+ obj.mtime = nti(buf[136:148])
+ obj.chksum = chksum
+ obj.type = buf[156:157]
+ obj.linkname = nts(buf[157:257])
+ obj.uname = nts(buf[265:297])
+ obj.gname = nts(buf[297:329])
+ obj.devmajor = nti(buf[329:337])
+ obj.devminor = nti(buf[337:345])
prefix = nts(buf[345:500])
- if prefix and not tarinfo.issparse():
- tarinfo.name = prefix + "/" + tarinfo.name
+ # Old V7 tar format represents a directory as a regular
+ # file with a trailing slash.
+ if obj.type == AREGTYPE and obj.name.endswith("/"):
+ obj.type = DIRTYPE
- if tarinfo.chksum not in calc_chksums(buf):
- raise ValueError("invalid header")
- return tarinfo
+ # Remove redundant slashes from directories.
+ if obj.isdir():
+ obj.name = obj.name.rstrip("/")
- def tobuf(self, posix=False):
- """Return a tar header as a string of 512 byte blocks.
+ # Reconstruct a ustar longname.
+ if prefix and obj.type not in GNU_TYPES:
+ obj.name = prefix + "/" + obj.name
+ return obj
+
+ @classmethod
+ def fromtarfile(cls, tarfile):
+ """Return the next TarInfo object from TarFile object
+ tarfile.
"""
- buf = ""
- type = self.type
- prefix = ""
+ buf = tarfile.fileobj.read(BLOCKSIZE)
+ obj = cls.frombuf(buf)
+ obj.offset = tarfile.fileobj.tell() - BLOCKSIZE
+ return obj._proc_member(tarfile)
- if self.name.endswith("/"):
- type = DIRTYPE
+ #--------------------------------------------------------------------------
+ # The following are methods that are called depending on the type of a
+ # member. The entry point is _proc_member() which can be overridden in a
+ # subclass to add custom _proc_*() methods. A _proc_*() method MUST
+ # implement the following
+ # operations:
+ # 1. Set self.offset_data to the position where the data blocks begin,
+ # if there is data that follows.
+ # 2. Set tarfile.offset to the position where the next member's header will
+ # begin.
+ # 3. Return self or another valid TarInfo object.
+ def _proc_member(self, tarfile):
+ """Choose the right processing method depending on
+ the type and call it.
+ """
+ if self.type in (GNUTYPE_LONGNAME, GNUTYPE_LONGLINK):
+ return self._proc_gnulong(tarfile)
+ elif self.type == GNUTYPE_SPARSE:
+ return self._proc_sparse(tarfile)
+ elif self.type in (XHDTYPE, XGLTYPE, SOLARIS_XHDTYPE):
+ return self._proc_pax(tarfile)
+ else:
+ return self._proc_builtin(tarfile)
- if type in (GNUTYPE_LONGNAME, GNUTYPE_LONGLINK):
- # Prevent "././@LongLink" from being normalized.
- name = self.name
+ def _proc_builtin(self, tarfile):
+ """Process a builtin type or an unknown type which
+ will be treated as a regular file.
+ """
+ self.offset_data = tarfile.fileobj.tell()
+ offset = self.offset_data
+ if self.isreg() or self.type not in SUPPORTED_TYPES:
+ # Skip the following data blocks.
+ offset += self._block(self.size)
+ tarfile.offset = offset
+
+ # Patch the TarInfo object with saved global
+ # header information.
+ self._apply_pax_info(tarfile.pax_headers, tarfile.encoding, tarfile.errors)
+
+ return self
+
+ def _proc_gnulong(self, tarfile):
+ """Process the blocks that hold a GNU longname
+ or longlink member.
+ """
+ buf = tarfile.fileobj.read(self._block(self.size))
+
+ # Fetch the next header and process it.
+ try:
+ next = self.fromtarfile(tarfile)
+ except HeaderError:
+ raise SubsequentHeaderError("missing or bad subsequent header")
+
+ # Patch the TarInfo object from the next header with
+ # the longname information.
+ next.offset = self.offset
+ if self.type == GNUTYPE_LONGNAME:
+ next.name = nts(buf)
+ elif self.type == GNUTYPE_LONGLINK:
+ next.linkname = nts(buf)
+
+ return next
+
+ def _proc_sparse(self, tarfile):
+ """Process a GNU sparse header plus extra headers.
+ """
+ buf = self.buf
+ sp = _ringbuffer()
+ pos = 386
+ lastpos = 0L
+ realpos = 0L
+ # There are 4 possible sparse structs in the
+ # first header.
+ for i in xrange(4):
+ try:
+ offset = nti(buf[pos:pos + 12])
+ numbytes = nti(buf[pos + 12:pos + 24])
+ except ValueError:
+ break
+ if offset > lastpos:
+ sp.append(_hole(lastpos, offset - lastpos))
+ sp.append(_data(offset, numbytes, realpos))
+ realpos += numbytes
+ lastpos = offset + numbytes
+ pos += 24
+
+ isextended = ord(buf[482])
+ origsize = nti(buf[483:495])
+
+ # If the isextended flag is given,
+ # there are extra headers to process.
+ while isextended == 1:
+ buf = tarfile.fileobj.read(BLOCKSIZE)
+ pos = 0
+ for i in xrange(21):
+ try:
+ offset = nti(buf[pos:pos + 12])
+ numbytes = nti(buf[pos + 12:pos + 24])
+ except ValueError:
+ break
+ if offset > lastpos:
+ sp.append(_hole(lastpos, offset - lastpos))
+ sp.append(_data(offset, numbytes, realpos))
+ realpos += numbytes
+ lastpos = offset + numbytes
+ pos += 24
+ isextended = ord(buf[504])
+
+ if lastpos < origsize:
+ sp.append(_hole(lastpos, origsize - lastpos))
+
+ self.sparse = sp
+
+ self.offset_data = tarfile.fileobj.tell()
+ tarfile.offset = self.offset_data + self._block(self.size)
+ self.size = origsize
+
+ return self
+
+ def _proc_pax(self, tarfile):
+ """Process an extended or global header as described in
+ POSIX.1-2001.
+ """
+ # Read the header information.
+ buf = tarfile.fileobj.read(self._block(self.size))
+
+ # A pax header stores supplemental information for either
+ # the following file (extended) or all following files
+ # (global).
+ if self.type == XGLTYPE:
+ pax_headers = tarfile.pax_headers
else:
- name = normpath(self.name)
+ pax_headers = tarfile.pax_headers.copy()
- if type == DIRTYPE:
- # directories should end with '/'
- name += "/"
+ # Parse pax header information. A record looks like that:
+ # "%d %s=%s\n" % (length, keyword, value). length is the size
+ # of the complete record including the length field itself and
+ # the newline. keyword and value are both UTF-8 encoded strings.
+ regex = re.compile(r"(\d+) ([^=]+)=", re.U)
+ pos = 0
+ while True:
+ match = regex.match(buf, pos)
+ if not match:
+ break
- linkname = self.linkname
- if linkname:
- # if linkname is empty we end up with a '.'
- linkname = normpath(linkname)
+ length, keyword = match.groups()
+ length = int(length)
+ value = buf[match.end(2) + 1:match.start(1) + length - 1]
- if posix:
- if self.size > MAXSIZE_MEMBER:
- raise ValueError("file is too large (>= 8 GB)")
+ keyword = keyword.decode("utf8")
+ value = value.decode("utf8")
- if len(self.linkname) > LENGTH_LINK:
- raise ValueError("linkname is too long (>%d)" % (LENGTH_LINK))
+ pax_headers[keyword] = value
+ pos += length
- if len(name) > LENGTH_NAME:
- prefix = name[:LENGTH_PREFIX + 1]
- while prefix and prefix[-1] != "/":
- prefix = prefix[:-1]
+ # Fetch the next header.
+ try:
+ next = self.fromtarfile(tarfile)
+ except HeaderError:
+ raise SubsequentHeaderError("missing or bad subsequent header")
- name = name[len(prefix):]
- prefix = prefix[:-1]
+ if self.type in (XHDTYPE, SOLARIS_XHDTYPE):
+ # Patch the TarInfo object with the extended header info.
+ next._apply_pax_info(pax_headers, tarfile.encoding, tarfile.errors)
+ next.offset = self.offset
- if not prefix or len(name) > LENGTH_NAME:
- raise ValueError("name is too long")
+ if "size" in pax_headers:
+ # If the extended header replaces the size field,
+ # we need to recalculate the offset where the next
+ # header starts.
+ offset = next.offset_data
+ if next.isreg() or next.type not in SUPPORTED_TYPES:
+ offset += next._block(next.size)
+ tarfile.offset = offset
- else:
- if len(self.linkname) > LENGTH_LINK:
- buf += self._create_gnulong(self.linkname, GNUTYPE_LONGLINK)
+ return next
- if len(name) > LENGTH_NAME:
- buf += self._create_gnulong(name, GNUTYPE_LONGNAME)
+ def _apply_pax_info(self, pax_headers, encoding, errors):
+ """Replace fields with supplemental information from a previous
+ pax extended or global header.
+ """
+ for keyword, value in pax_headers.iteritems():
+ if keyword not in PAX_FIELDS:
+ continue
- parts = [
- stn(name, 100),
- itn(self.mode & 07777, 8, posix),
- itn(self.uid, 8, posix),
- itn(self.gid, 8, posix),
- itn(self.size, 12, posix),
- itn(self.mtime, 12, posix),
- " ", # checksum field
- type,
- stn(self.linkname, 100),
- stn(MAGIC, 6),
- stn(VERSION, 2),
- stn(self.uname, 32),
- stn(self.gname, 32),
- itn(self.devmajor, 8, posix),
- itn(self.devminor, 8, posix),
- stn(prefix, 155)
- ]
+ if keyword == "path":
+ value = value.rstrip("/")
- buf += "".join(parts).ljust(BLOCKSIZE, NUL)
- chksum = calc_chksums(buf[-BLOCKSIZE:])[0]
- buf = buf[:-364] + "%06o\0" % chksum + buf[-357:]
- self.buf = buf
- return buf
+ if keyword in PAX_NUMBER_FIELDS:
+ try:
+ value = PAX_NUMBER_FIELDS[keyword](value)
+ except ValueError:
+ value = 0
+ else:
+ value = uts(value, encoding, errors)
- def _create_gnulong(self, name, type):
- """Create a GNU longname/longlink header from name.
- It consists of an extended tar header, with the length
- of the longname as size, followed by data blocks,
- which contain the longname as a null terminated string.
+ setattr(self, keyword, value)
+
+ self.pax_headers = pax_headers.copy()
+
+ def _block(self, count):
+ """Round up a byte count by BLOCKSIZE and return it,
+ e.g. _block(834) => 1024.
"""
- name += NUL
-
- tarinfo = self.__class__()
- tarinfo.name = "././@LongLink"
- tarinfo.type = type
- tarinfo.mode = 0
- tarinfo.size = len(name)
-
- # create extended header
- buf = tarinfo.tobuf()
- # create name blocks
- buf += name
- blocks, remainder = divmod(len(name), BLOCKSIZE)
- if remainder > 0:
- buf += (BLOCKSIZE - remainder) * NUL
- return buf
+ blocks, remainder = divmod(count, BLOCKSIZE)
+ if remainder:
+ blocks += 1
+ return blocks * BLOCKSIZE
def isreg(self):
return self.type in REGULAR_TYPES
@@ -1035,16 +1482,23 @@
ignore_zeros = False # If true, skips empty or invalid blocks and
# continues processing.
- errorlevel = 0 # If 0, fatal errors only appear in debug
+ errorlevel = 1 # If 0, fatal errors only appear in debug
# messages (if debug >= 0). If > 0, errors
# are passed to the caller as exceptions.
- posix = False # If True, generates POSIX.1-1990-compliant
- # archives (no GNU extensions!)
+ format = DEFAULT_FORMAT # The format to use when creating an archive.
- fileobject = ExFileObject
+ encoding = ENCODING # Encoding for 8-bit character strings.
- def __init__(self, name=None, mode="r", fileobj=None):
+ errors = None # Error handler for unicode conversion.
+
+ tarinfo = TarInfo # The default TarInfo class to use.
+
+ fileobject = ExFileObject # The default ExFileObject class to use.
+
+ def __init__(self, name=None, mode="r", fileobj=None, format=None,
+ tarinfo=None, dereference=None, ignore_zeros=None, encoding=None,
+ errors=None, pax_headers=None, debug=None, errorlevel=None):
"""Open an (uncompressed) tar archive `name'. `mode' is either 'r' to
read from an existing archive, 'a' to append data to an existing
file or 'w' to create a new file overwriting an existing one. `mode'
@@ -1055,22 +1509,55 @@
"""
if len(mode) > 1 or mode not in "raw":
raise ValueError("mode must be 'r', 'a' or 'w'")
- self._mode = mode
- self.mode = {"r": "rb", "a": "r+b", "w": "wb"}[mode]
+ self.mode = mode
+ self._mode = {"r": "rb", "a": "r+b", "w": "wb"}[mode]
if not fileobj:
- fileobj = file(name, self.mode)
+ if self.mode == "a" and not os.path.exists(name):
+ # Create nonexistent files in append mode.
+ self.mode = "w"
+ self._mode = "wb"
+ fileobj = bltn_open(name, self._mode)
self._extfileobj = False
else:
if name is None and hasattr(fileobj, "name"):
name = fileobj.name
if hasattr(fileobj, "mode"):
- self.mode = fileobj.mode
+ self._mode = fileobj.mode
self._extfileobj = True
self.name = os.path.abspath(name) if name else None
self.fileobj = fileobj
- # Init datastructures
+ # Init attributes.
+ if format is not None:
+ self.format = format
+ if tarinfo is not None:
+ self.tarinfo = tarinfo
+ if dereference is not None:
+ self.dereference = dereference
+ if ignore_zeros is not None:
+ self.ignore_zeros = ignore_zeros
+ if encoding is not None:
+ self.encoding = encoding
+
+ if errors is not None:
+ self.errors = errors
+ elif mode == "r":
+ self.errors = "utf-8"
+ else:
+ self.errors = "strict"
+
+ if pax_headers is not None and self.format == PAX_FORMAT:
+ self.pax_headers = pax_headers
+ else:
+ self.pax_headers = {}
+
+ if debug is not None:
+ self.debug = debug
+ if errorlevel is not None:
+ self.errorlevel = errorlevel
+
+ # Init datastructures.
self.closed = False
self.members = [] # list of members as TarInfo objects
self._loaded = False # flag if all members have been read
@@ -1079,26 +1566,49 @@
self.inodes = {} # dictionary caching the inodes of
# archive members already added
- if self._mode == "r":
- self.firstmember = None
- self.firstmember = self.next()
+ try:
+ if self.mode == "r":
+ self.firstmember = None
+ self.firstmember = self.next()
- if self._mode == "a":
- # Move to the end of the archive,
- # before the first empty block.
- self.firstmember = None
- while True:
- try:
- tarinfo = self.next()
- except ReadError:
- self.fileobj.seek(0)
- break
- if tarinfo is None:
- self.fileobj.seek(- BLOCKSIZE, 1)
- break
+ if self.mode == "a":
+ # Move to the end of the archive,
+ # before the first empty block.
+ while True:
+ self.fileobj.seek(self.offset)
+ try:
+ tarinfo = self.tarinfo.fromtarfile(self)
+ self.members.append(tarinfo)
+ except EOFHeaderError:
+ self.fileobj.seek(self.offset)
+ break
+ except HeaderError, e:
+ raise ReadError(str(e))
- if self._mode in "aw":
- self._loaded = True
+ if self.mode in "aw":
+ self._loaded = True
+
+ if self.pax_headers:
+ buf = self.tarinfo.create_pax_global_header(self.pax_headers.copy())
+ self.fileobj.write(buf)
+ self.offset += len(buf)
+ except:
+ if not self._extfileobj:
+ self.fileobj.close()
+ self.closed = True
+ raise
+
+ def _getposix(self):
+ return self.format == USTAR_FORMAT
+ def _setposix(self, value):
+ import warnings
+ warnings.warn("use the format attribute instead", DeprecationWarning,
+ 2)
+ if value:
+ self.format = USTAR_FORMAT
+ else:
+ self.format = GNU_FORMAT
+ posix = property(_getposix, _setposix)
#--------------------------------------------------------------------------
# Below are the classmethods which act as alternate constructors to the
@@ -1112,7 +1622,7 @@
# by adding it to the mapping in OPEN_METH.
@classmethod
- def open(cls, name=None, mode="r", fileobj=None, bufsize=20*512):
+ def open(cls, name=None, mode="r", fileobj=None, bufsize=RECORDSIZE, **kwargs):
"""Open a tar archive for reading, writing or appending. Return
an appropriate TarFile class.
@@ -1121,7 +1631,7 @@
'r:' open for reading exclusively uncompressed
'r:gz' open for reading with gzip compression
'r:bz2' open for reading with bzip2 compression
- 'a' or 'a:' open for appending
+ 'a' or 'a:' open for appending, creating the file if necessary
'w' or 'w:' open for writing without compression
'w:gz' open for writing with gzip compression
'w:bz2' open for writing with bzip2 compression
@@ -1145,8 +1655,8 @@
if fileobj is not None:
saved_pos = fileobj.tell()
try:
- return func(name, "r", fileobj)
- except (ReadError, CompressionError):
+ return func(name, "r", fileobj, **kwargs)
+ except (ReadError, CompressionError), e:
if fileobj is not None:
fileobj.seek(saved_pos)
continue
@@ -1163,7 +1673,7 @@
func = getattr(cls, cls.OPEN_METH[comptype])
else:
raise CompressionError("unknown compression type %r" % comptype)
- return func(name, filemode, fileobj)
+ return func(name, filemode, fileobj, **kwargs)
elif "|" in mode:
filemode, comptype = mode.split("|", 1)
@@ -1174,25 +1684,26 @@
raise ValueError("mode must be 'r' or 'w'")
t = cls(name, filemode,
- _Stream(name, filemode, comptype, fileobj, bufsize))
+ _Stream(name, filemode, comptype, fileobj, bufsize),
+ **kwargs)
t._extfileobj = False
return t
elif mode in "aw":
- return cls.taropen(name, mode, fileobj)
+ return cls.taropen(name, mode, fileobj, **kwargs)
raise ValueError("undiscernible mode")
@classmethod
- def taropen(cls, name, mode="r", fileobj=None):
+ def taropen(cls, name, mode="r", fileobj=None, **kwargs):
"""Open uncompressed tar archive name for reading or writing.
"""
if len(mode) > 1 or mode not in "raw":
raise ValueError("mode must be 'r', 'a' or 'w'")
- return cls(name, mode, fileobj)
+ return cls(name, mode, fileobj, **kwargs)
@classmethod
- def gzopen(cls, name, mode="r", fileobj=None, compresslevel=9):
+ def gzopen(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open gzip compressed tar archive name for reading or writing.
Appending is not allowed.
"""
@@ -1208,7 +1719,7 @@
fileobj = gzip.GzipFile(name, mode, compresslevel, fileobj)
try:
- t = cls.taropen(name, mode, fileobj)
+ t = cls.taropen(name, mode, fileobj, **kwargs)
except IOError:
fileobj.close()
raise ReadError("not a gzip file")
@@ -1216,7 +1727,7 @@
return t
@classmethod
- def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9):
+ def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open bzip2 compressed tar archive name for reading or writing.
Appending is not allowed.
"""
@@ -1236,8 +1747,8 @@
extfileobj = False
try:
- t = cls.taropen(name, mode, fileobj)
- except IOError:
+ t = cls.taropen(name, mode, fileobj, **kwargs)
+ except (IOError, EOFError):
if not extfileobj:
fileobj.close()
raise ReadError("not a bzip2 file")
@@ -1261,7 +1772,7 @@
if self.closed:
return
- if self._mode in "aw":
+ if self.mode in "aw":
self.fileobj.write(NUL * (BLOCKSIZE * 2))
self.offset += (BLOCKSIZE * 2)
# fill up the end with zero-blocks
@@ -1277,7 +1788,7 @@
def getmember(self, name):
"""Return a TarInfo object for member `name'. If `name' can not be
found in the archive, KeyError is raised. If a member occurs more
- than once in the archive, its last occurence is assumed to be the
+ than once in the archive, its last occurrence is assumed to be the
most up-to-date version.
"""
tarinfo = self._getmember(name)
@@ -1320,14 +1831,14 @@
# Absolute paths are turned to relative paths.
if arcname is None:
arcname = name
- arcname = normpath(arcname)
drv, arcname = os.path.splitdrive(arcname)
- while arcname[0:1] == "/":
- arcname = arcname[1:]
+ arcname = arcname.replace(os.sep, "/")
+ arcname = arcname.lstrip("/")
# Now, fill the TarInfo object with
# information specific for the file.
- tarinfo = TarInfo()
+ tarinfo = self.tarinfo()
+ tarinfo.tarfile = self
# Use os.stat or os.lstat, depending on platform
# and if symlinks shall be resolved.
@@ -1336,18 +1847,15 @@
statres = os.lstat(name)
else:
statres = os.stat(name)
- elif hasattr(os, 'fstat'):
+ else:
statres = os.fstat(fileobj.fileno())
- else:
- raise NotImplementedError('fileobj argument not supported on this '
- 'platform (no os.fstat)')
linkname = ""
stmd = statres.st_mode
if stat.S_ISREG(stmd):
inode = (statres.st_ino, statres.st_dev)
- if not self.dereference and \
- statres.st_nlink > 1 and inode in self.inodes:
+ if not self.dereference and statres.st_nlink > 1 and \
+ inode in self.inodes and arcname != self.inodes[inode]:
# Is it a hardlink to an already
# archived file?
type = LNKTYPE
@@ -1360,8 +1868,6 @@
self.inodes[inode] = arcname
elif stat.S_ISDIR(stmd):
type = DIRTYPE
- if arcname[-1:] != "/":
- arcname += "/"
elif stat.S_ISFIFO(stmd):
type = FIFOTYPE
elif stat.S_ISLNK(stmd):
@@ -1380,7 +1886,7 @@
tarinfo.mode = stmd
tarinfo.uid = statres.st_uid
tarinfo.gid = statres.st_gid
- if stat.S_ISREG(stmd):
+ if type == REGTYPE:
tarinfo.size = statres.st_size
else:
tarinfo.size = 0L
@@ -1424,7 +1930,7 @@
print "%d-%02d-%02d %02d:%02d:%02d" \
% time.localtime(tarinfo.mtime)[:6],
- print tarinfo.name,
+ print tarinfo.name + ("/" if tarinfo.isdir() else ""),
if verbose:
if tarinfo.issym():
@@ -1433,33 +1939,36 @@
print "link to", tarinfo.linkname,
print
- def add(self, name, arcname=None, recursive=True):
+ def add(self, name, arcname=None, recursive=True, exclude=None, filter=None):
"""Add the file `name' to the archive. `name' may be any type of file
(directory, fifo, symbolic link, etc.). If given, `arcname'
specifies an alternative name for the file in the archive.
Directories are added recursively by default. This can be avoided by
- setting `recursive' to False.
+ setting `recursive' to False. `exclude' is a function that should
+ return True for each filename to be excluded. `filter' is a function
+ that expects a TarInfo object argument and returns the changed
+ TarInfo object, if it returns None the TarInfo object will be
+ excluded from the archive.
"""
self._check("aw")
if arcname is None:
arcname = name
+ # Exclude pathnames.
+ if exclude is not None:
+ import warnings
+ warnings.warn("use the filter argument instead",
+ DeprecationWarning, 2)
+ if exclude(name):
+ self._dbg(2, "tarfile: Excluded %r" % name)
+ return
+
# Skip if somebody tries to archive the archive...
if self.name is not None and os.path.abspath(name) == self.name:
self._dbg(2, "tarfile: Skipped %r" % name)
return
- # Special case: The user wants to add the current
- # working directory.
- if name == ".":
- if recursive:
- if arcname == ".":
- arcname = ""
- for f in os.listdir("."):
- self.add(f, os.path.join(arcname, f))
- return
-
self._dbg(1, name)
# Create a TarInfo object from the file.
@@ -1469,9 +1978,16 @@
self._dbg(1, "tarfile: Unsupported type %r" % name)
return
+ # Change or exclude the TarInfo object.
+ if filter is not None:
+ tarinfo = filter(tarinfo)
+ if tarinfo is None:
+ self._dbg(2, "tarfile: Excluded %r" % name)
+ return
+
# Append the tar header and data to the archive.
if tarinfo.isreg():
- f = file(name, "rb")
+ f = bltn_open(name, "rb")
self.addfile(tarinfo, f)
f.close()
@@ -1479,7 +1995,8 @@
self.addfile(tarinfo)
if recursive:
for f in os.listdir(name):
- self.add(os.path.join(name, f), os.path.join(arcname, f))
+ self.add(os.path.join(name, f), os.path.join(arcname, f),
+ recursive, exclude, filter)
else:
self.addfile(tarinfo)
@@ -1495,7 +2012,7 @@
tarinfo = copy.copy(tarinfo)
- buf = tarinfo.tobuf(self.posix)
+ buf = tarinfo.tobuf(self.format, self.encoding, self.errors)
self.fileobj.write(buf)
self.offset += len(buf)
@@ -1531,7 +2048,7 @@
self.extract(tarinfo, path)
# Reverse sort directories.
- directories.sort(lambda a, b: cmp(a.name, b.name))
+ directories.sort(key=operator.attrgetter('name'))
directories.reverse()
# Set correct owner, mtime and filemode on directories.
@@ -1555,10 +2072,10 @@
"""
self._check("r")
- if isinstance(member, TarInfo):
+ if isinstance(member, basestring):
+ tarinfo = self.getmember(member)
+ else:
tarinfo = member
- else:
- tarinfo = self.getmember(member)
# Prepare the link target for makelink().
if tarinfo.islnk():
@@ -1591,10 +2108,10 @@
"""
self._check("r")
- if isinstance(member, TarInfo):
+ if isinstance(member, basestring):
+ tarinfo = self.getmember(member)
+ else:
tarinfo = member
- else:
- tarinfo = self.getmember(member)
if tarinfo.isreg():
return self.fileobject(self, tarinfo)
@@ -1612,8 +2129,7 @@
raise StreamError("cannot extract (sym)link as file object")
else:
# A (sym)link's file object is its target's file object.
- return self.extractfile(self._getmember(tarinfo.linkname,
- tarinfo))
+ return self.extractfile(self._find_link_target(tarinfo))
else:
# If there's no data associated with the member (directory, chrdev,
# blkdev, etc.), return None instead of a file object.
@@ -1626,9 +2142,8 @@
# Fetch the TarInfo object for the given name
# and build the destination pathname, replacing
# forward slashes to platform specific separators.
- if targetpath[-1:] == "/":
- targetpath = targetpath[:-1]
- targetpath = os.path.normpath(targetpath)
+ targetpath = targetpath.rstrip("/")
+ targetpath = targetpath.replace("/", os.sep)
# Create all upper directories.
upperdirs = os.path.dirname(targetpath)
@@ -1682,7 +2197,7 @@
"""Make a file called targetpath.
"""
source = self.extractfile(tarinfo)
- target = file(targetpath, "wb")
+ target = bltn_open(targetpath, "wb")
copyfileobj(source, target)
source.close()
target.close()
@@ -1723,27 +2238,21 @@
(platform limitation), we try to make a copy of the referenced file
instead of a link.
"""
- linkpath = tarinfo.linkname
- try:
+ if hasattr(os, "symlink") and hasattr(os, "link"):
+ # For systems that support symbolic and hard links.
if tarinfo.issym():
- os.symlink(linkpath, targetpath)
+ os.symlink(tarinfo.linkname, targetpath)
else:
# See extract().
- os.link(tarinfo._link_target, targetpath)
- except AttributeError:
- if tarinfo.issym():
- linkpath = os.path.join(os.path.dirname(tarinfo.name),
- linkpath)
- linkpath = normpath(linkpath)
-
+ if os.path.exists(tarinfo._link_target):
+ os.link(tarinfo._link_target, targetpath)
+ else:
+ self._extract_member(self._find_link_target(tarinfo), targetpath)
+ else:
try:
- self._extract_member(self.getmember(linkpath), targetpath)
- except (EnvironmentError, KeyError), e:
- linkpath = os.path.normpath(linkpath)
- try:
- shutil.copy2(linkpath, targetpath)
- except EnvironmentError, e:
- raise IOError("link could not be created")
+ self._extract_member(self._find_link_target(tarinfo), targetpath)
+ except KeyError:
+ raise ExtractError("unable to resolve link inside archive")
def chown(self, tarinfo, targetpath):
"""Set owner of targetpath according to tarinfo.
@@ -1787,10 +2296,6 @@
"""
if not hasattr(os, 'utime'):
return
- if sys.platform == "win32" and tarinfo.isdir():
- # According to msdn.microsoft.com, it is an error (EACCES)
- # to use utime() on directories.
- return
try:
os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime))
except EnvironmentError, e:
@@ -1810,193 +2315,64 @@
# Read the next block.
self.fileobj.seek(self.offset)
+ tarinfo = None
while True:
- buf = self.fileobj.read(BLOCKSIZE)
- if not buf:
- return None
-
try:
- tarinfo = TarInfo.frombuf(buf)
-
- # Set the TarInfo object's offset to the current position of the
- # TarFile and set self.offset to the position where the data blocks
- # should begin.
- tarinfo.offset = self.offset
- self.offset += BLOCKSIZE
-
- tarinfo = self.proc_member(tarinfo)
-
- except ValueError, e:
+ tarinfo = self.tarinfo.fromtarfile(self)
+ except EOFHeaderError, e:
if self.ignore_zeros:
- self._dbg(2, "0x%X: empty or invalid block: %s" %
- (self.offset, e))
+ self._dbg(2, "0x%X: %s" % (self.offset, e))
self.offset += BLOCKSIZE
continue
- else:
- if self.offset == 0:
- raise ReadError("empty, unreadable or compressed "
- "file: %s" % e)
- return None
+ except InvalidHeaderError, e:
+ if self.ignore_zeros:
+ self._dbg(2, "0x%X: %s" % (self.offset, e))
+ self.offset += BLOCKSIZE
+ continue
+ elif self.offset == 0:
+ raise ReadError(str(e))
+ except EmptyHeaderError:
+ if self.offset == 0:
+ raise ReadError("empty file")
+ except TruncatedHeaderError, e:
+ if self.offset == 0:
+ raise ReadError(str(e))
+ except SubsequentHeaderError, e:
+ raise ReadError(str(e))
break
- # Some old tar programs represent a directory as a regular
- # file with a trailing slash.
- if tarinfo.isreg() and tarinfo.name.endswith("/"):
- tarinfo.type = DIRTYPE
-
- # Directory names should have a '/' at the end.
- if tarinfo.isdir() and not tarinfo.name.endswith("/"):
- tarinfo.name += "/"
-
- self.members.append(tarinfo)
- return tarinfo
-
- #--------------------------------------------------------------------------
- # The following are methods that are called depending on the type of a
- # member. The entry point is proc_member() which is called with a TarInfo
- # object created from the header block from the current offset. The
- # proc_member() method can be overridden in a subclass to add custom
- # proc_*() methods. A proc_*() method MUST implement the following
- # operations:
- # 1. Set tarinfo.offset_data to the position where the data blocks begin,
- # if there is data that follows.
- # 2. Set self.offset to the position where the next member's header will
- # begin.
- # 3. Return tarinfo or another valid TarInfo object.
- def proc_member(self, tarinfo):
- """Choose the right processing method for tarinfo depending
- on its type and call it.
- """
- if tarinfo.type in (GNUTYPE_LONGNAME, GNUTYPE_LONGLINK):
- return self.proc_gnulong(tarinfo)
- elif tarinfo.type == GNUTYPE_SPARSE:
- return self.proc_sparse(tarinfo)
+ if tarinfo is not None:
+ self.members.append(tarinfo)
else:
- return self.proc_builtin(tarinfo)
-
- def proc_builtin(self, tarinfo):
- """Process a builtin type member or an unknown member
- which will be treated as a regular file.
- """
- tarinfo.offset_data = self.offset
- if tarinfo.isreg() or tarinfo.type not in SUPPORTED_TYPES:
- # Skip the following data blocks.
- self.offset += self._block(tarinfo.size)
- return tarinfo
-
- def proc_gnulong(self, tarinfo):
- """Process the blocks that hold a GNU longname
- or longlink member.
- """
- buf = ""
- count = tarinfo.size
- while count > 0:
- block = self.fileobj.read(BLOCKSIZE)
- buf += block
- self.offset += BLOCKSIZE
- count -= BLOCKSIZE
-
- # Fetch the next header and process it.
- b = self.fileobj.read(BLOCKSIZE)
- t = TarInfo.frombuf(b)
- t.offset = self.offset
- self.offset += BLOCKSIZE
- next = self.proc_member(t)
-
- # Patch the TarInfo object from the next header with
- # the longname information.
- next.offset = tarinfo.offset
- if tarinfo.type == GNUTYPE_LONGNAME:
- next.name = nts(buf)
- elif tarinfo.type == GNUTYPE_LONGLINK:
- next.linkname = nts(buf)
-
- return next
-
- def proc_sparse(self, tarinfo):
- """Process a GNU sparse header plus extra headers.
- """
- buf = tarinfo.buf
- sp = _ringbuffer()
- pos = 386
- lastpos = 0L
- realpos = 0L
- # There are 4 possible sparse structs in the
- # first header.
- for i in xrange(4):
- try:
- offset = nti(buf[pos:pos + 12])
- numbytes = nti(buf[pos + 12:pos + 24])
- except ValueError:
- break
- if offset > lastpos:
- sp.append(_hole(lastpos, offset - lastpos))
- sp.append(_data(offset, numbytes, realpos))
- realpos += numbytes
- lastpos = offset + numbytes
- pos += 24
-
- isextended = ord(buf[482])
- origsize = nti(buf[483:495])
-
- # If the isextended flag is given,
- # there are extra headers to process.
- while isextended == 1:
- buf = self.fileobj.read(BLOCKSIZE)
- self.offset += BLOCKSIZE
- pos = 0
- for i in xrange(21):
- try:
- offset = nti(buf[pos:pos + 12])
- numbytes = nti(buf[pos + 12:pos + 24])
- except ValueError:
- break
- if offset > lastpos:
- sp.append(_hole(lastpos, offset - lastpos))
- sp.append(_data(offset, numbytes, realpos))
- realpos += numbytes
- lastpos = offset + numbytes
- pos += 24
- isextended = ord(buf[504])
-
- if lastpos < origsize:
- sp.append(_hole(lastpos, origsize - lastpos))
-
- tarinfo.sparse = sp
-
- tarinfo.offset_data = self.offset
- self.offset += self._block(tarinfo.size)
- tarinfo.size = origsize
+ self._loaded = True
return tarinfo
#--------------------------------------------------------------------------
# Little helper methods:
- def _block(self, count):
- """Round up a byte count by BLOCKSIZE and return it,
- e.g. _block(834) => 1024.
- """
- blocks, remainder = divmod(count, BLOCKSIZE)
- if remainder:
- blocks += 1
- return blocks * BLOCKSIZE
-
- def _getmember(self, name, tarinfo=None):
+ def _getmember(self, name, tarinfo=None, normalize=False):
"""Find an archive member by name from bottom to top.
If tarinfo is given, it is used as the starting point.
"""
# Ensure that all members have been loaded.
members = self.getmembers()
- if tarinfo is None:
- end = len(members)
- else:
- end = members.index(tarinfo)
+ # Limit the member search list up to tarinfo.
+ if tarinfo is not None:
+ members = members[:members.index(tarinfo)]
- for i in xrange(end - 1, -1, -1):
- if name == members[i].name:
- return members[i]
+ if normalize:
+ name = os.path.normpath(name)
+
+ for member in reversed(members):
+ if normalize:
+ member_name = os.path.normpath(member.name)
+ else:
+ member_name = member.name
+
+ if name == member_name:
+ return member
def _load(self):
"""Read through the entire archive file and look for readable
@@ -2014,8 +2390,27 @@
"""
if self.closed:
raise IOError("%s is closed" % self.__class__.__name__)
- if mode is not None and self._mode not in mode:
- raise IOError("bad operation for mode %r" % self._mode)
+ if mode is not None and self.mode not in mode:
+ raise IOError("bad operation for mode %r" % self.mode)
+
+ def _find_link_target(self, tarinfo):
+ """Find the target member of a symlink or hardlink member in the
+ archive.
+ """
+ if tarinfo.issym():
+ # Always search the entire archive.
+ linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname
+ limit = None
+ else:
+ # Search the archive before the link, because a hard link is
+ # just a reference to an already archived file.
+ linkname = tarinfo.linkname
+ limit = tarinfo
+
+ member = self._getmember(linkname, tarinfo=limit, normalize=True)
+ if member is None:
+ raise KeyError("linkname %r not found" % linkname)
+ return member
def __iter__(self):
"""Provide an iterator object.
@@ -2030,6 +2425,20 @@
"""
if level <= self.debug:
print >> sys.stderr, msg
+
+ def __enter__(self):
+ self._check()
+ return self
+
+ def __exit__(self, type, value, traceback):
+ if type is None:
+ self.close()
+ else:
+ # An exception occurred. We must not call close() because
+ # it would try to write end-of-archive blocks and padding.
+ if not self._extfileobj:
+ self.fileobj.close()
+ self.closed = True
# class TarFile
class TarIter:
@@ -2121,6 +2530,9 @@
ZipFile class.
"""
def __init__(self, file, mode="r", compression=TAR_PLAIN):
+ from warnings import warnpy3k
+ warnpy3k("the TarFileCompat class has been removed in Python 3.0",
+ stacklevel=2)
if compression == TAR_PLAIN:
self.tarfile = TarFile.taropen(file, mode)
elif compression == TAR_GZIPPED:
@@ -2154,10 +2566,10 @@
except ImportError:
from StringIO import StringIO
import calendar
- zinfo.name = zinfo.filename
- zinfo.size = zinfo.file_size
- zinfo.mtime = calendar.timegm(zinfo.date_time)
- self.tarfile.addfile(zinfo, StringIO(bytes))
+ tinfo = TarInfo(zinfo.filename)
+ tinfo.size = len(bytes)
+ tinfo.mtime = calendar.timegm(zinfo.date_time)
+ self.tarfile.addfile(tinfo, StringIO(bytes))
def close(self):
self.tarfile.close()
#class TarFileCompat
@@ -2176,4 +2588,5 @@
except TarError:
return False
+bltn_open = open
open = TarFile.open
diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py
--- a/Lib/test/test_tarfile.py
+++ b/Lib/test/test_tarfile.py
@@ -1,8 +1,11 @@
+# -*- coding: iso-8859-15 -*-
+
import sys
import os
import shutil
-import tempfile
import StringIO
+from hashlib import md5
+import errno
import unittest
import tarfile
@@ -20,484 +23,887 @@
except ImportError:
bz2 = None
-def path(path):
- return test_support.findfile(path)
+def md5sum(data):
+ return md5(data).hexdigest()
-testtar = path("testtar.tar")
-tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir")
-tempname = test_support.TESTFN
-membercount = 13
+TEMPDIR = os.path.abspath(test_support.TESTFN)
+tarname = test_support.findfile("testtar.tar")
+gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
+bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
+tmpname = os.path.join(TEMPDIR, "tmp.tar")
-def tarname(comp=""):
- if not comp:
- return testtar
- return os.path.join(dirname(), "%s%s%s" % (testtar, os.extsep, comp))
+md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
+md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
-def dirname():
- if not os.path.exists(tempdir):
- os.mkdir(tempdir)
- return tempdir
-def tmpname():
- return tempname
+class ReadTest(unittest.TestCase):
-
-class BaseTest(unittest.TestCase):
- comp = ''
- mode = 'r'
- sep = ':'
+ tarname = tarname
+ mode = "r:"
def setUp(self):
- mode = self.mode + self.sep + self.comp
- self.tar = tarfile.open(tarname(self.comp), mode)
+ self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
def tearDown(self):
self.tar.close()
-class ReadTest(BaseTest):
- def test(self):
- """Test member extraction.
- """
- members = 0
- for tarinfo in self.tar:
- members += 1
- if not tarinfo.isreg():
- continue
- f = self.tar.extractfile(tarinfo)
- self.assert_(len(f.read()) == tarinfo.size,
- "size read does not match expected size")
- f.close()
+class UstarReadTest(ReadTest):
- self.assert_(members == membercount,
- "could not find all members")
+ def test_fileobj_regular_file(self):
+ tarinfo = self.tar.getmember("ustar/regtype")
+ fobj = self.tar.extractfile(tarinfo)
+ data = fobj.read()
+ self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
+ "regular file extraction failed")
- def test_sparse(self):
- """Test sparse member extraction.
- """
- if self.sep != "|":
- f1 = self.tar.extractfile("S-SPARSE")
- f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS")
- self.assert_(f1.read() == f2.read(),
- "_FileObject failed on sparse file member")
+ def test_fileobj_readlines(self):
+ self.tar.extract("ustar/regtype", TEMPDIR)
+ tarinfo = self.tar.getmember("ustar/regtype")
+ fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
+ fobj2 = self.tar.extractfile(tarinfo)
- def test_readlines(self):
- """Test readlines() method of _FileObject.
- """
- if self.sep != "|":
- filename = "0-REGTYPE-TEXT"
- self.tar.extract(filename, dirname())
- f = open(os.path.join(dirname(), filename), "rU")
- lines1 = f.readlines()
- f.close()
- lines2 = self.tar.extractfile(filename).readlines()
- self.assert_(lines1 == lines2,
- "_FileObject.readline() does not work correctly")
+ lines1 = fobj1.readlines()
+ lines2 = fobj2.readlines()
+ self.assertTrue(lines1 == lines2,
+ "fileobj.readlines() failed")
+ self.assertTrue(len(lines2) == 114,
+ "fileobj.readlines() failed")
+ self.assertTrue(lines2[83] ==
+ "I will gladly admit that Python is not the fastest running scripting language.\n",
+ "fileobj.readlines() failed")
- def test_iter(self):
- # Test iteration over ExFileObject.
- if self.sep != "|":
- filename = "0-REGTYPE-TEXT"
- self.tar.extract(filename, dirname())
- f = open(os.path.join(dirname(), filename), "rU")
- lines1 = f.readlines()
- f.close()
- lines2 = [line for line in self.tar.extractfile(filename)]
- self.assert_(lines1 == lines2,
- "ExFileObject iteration does not work correctly")
+ def test_fileobj_iter(self):
+ self.tar.extract("ustar/regtype", TEMPDIR)
+ tarinfo = self.tar.getmember("ustar/regtype")
+ fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
+ fobj2 = self.tar.extractfile(tarinfo)
+ lines1 = fobj1.readlines()
+ lines2 = [line for line in fobj2]
+ self.assertTrue(lines1 == lines2,
+ "fileobj.__iter__() failed")
- def test_seek(self):
- """Test seek() method of _FileObject, incl. random reading.
- """
- if self.sep != "|":
- filename = "0-REGTYPE-TEXT"
- self.tar.extract(filename, dirname())
- f = open(os.path.join(dirname(), filename), "rb")
- data = f.read()
- f.close()
+ def test_fileobj_seek(self):
+ self.tar.extract("ustar/regtype", TEMPDIR)
+ fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb")
+ data = fobj.read()
+ fobj.close()
- tarinfo = self.tar.getmember(filename)
- fobj = self.tar.extractfile(tarinfo)
+ tarinfo = self.tar.getmember("ustar/regtype")
+ fobj = self.tar.extractfile(tarinfo)
- text = fobj.read()
- fobj.seek(0)
- self.assert_(0 == fobj.tell(),
- "seek() to file's start failed")
- fobj.seek(2048, 0)
- self.assert_(2048 == fobj.tell(),
- "seek() to absolute position failed")
- fobj.seek(-1024, 1)
- self.assert_(1024 == fobj.tell(),
- "seek() to negative relative position failed")
- fobj.seek(1024, 1)
- self.assert_(2048 == fobj.tell(),
- "seek() to positive relative position failed")
- s = fobj.read(10)
- self.assert_(s == data[2048:2058],
- "read() after seek failed")
- fobj.seek(0, 2)
- self.assert_(tarinfo.size == fobj.tell(),
- "seek() to file's end failed")
- self.assert_(fobj.read() == "",
- "read() at file's end did not return empty string")
- fobj.seek(-tarinfo.size, 2)
- self.assert_(0 == fobj.tell(),
- "relative seek() to file's start failed")
- fobj.seek(512)
- s1 = fobj.readlines()
- fobj.seek(512)
- s2 = fobj.readlines()
- self.assert_(s1 == s2,
- "readlines() after seek failed")
- fobj.seek(0)
- self.assert_(len(fobj.readline()) == fobj.tell(),
- "tell() after readline() failed")
- fobj.seek(512)
- self.assert_(len(fobj.readline()) + 512 == fobj.tell(),
- "tell() after seek() and readline() failed")
- fobj.seek(0)
- line = fobj.readline()
- self.assert_(fobj.read() == data[len(line):],
- "read() after readline() failed")
+ text = fobj.read()
+ fobj.seek(0)
+ self.assertTrue(0 == fobj.tell(),
+ "seek() to file's start failed")
+ fobj.seek(2048, 0)
+ self.assertTrue(2048 == fobj.tell(),
+ "seek() to absolute position failed")
+ fobj.seek(-1024, 1)
+ self.assertTrue(1024 == fobj.tell(),
+ "seek() to negative relative position failed")
+ fobj.seek(1024, 1)
+ self.assertTrue(2048 == fobj.tell(),
+ "seek() to positive relative position failed")
+ s = fobj.read(10)
+ self.assertTrue(s == data[2048:2058],
+ "read() after seek failed")
+ fobj.seek(0, 2)
+ self.assertTrue(tarinfo.size == fobj.tell(),
+ "seek() to file's end failed")
+ self.assertTrue(fobj.read() == "",
+ "read() at file's end did not return empty string")
+ fobj.seek(-tarinfo.size, 2)
+ self.assertTrue(0 == fobj.tell(),
+ "relative seek() to file's start failed")
+ fobj.seek(512)
+ s1 = fobj.readlines()
+ fobj.seek(512)
+ s2 = fobj.readlines()
+ self.assertTrue(s1 == s2,
+ "readlines() after seek failed")
+ fobj.seek(0)
+ self.assertTrue(len(fobj.readline()) == fobj.tell(),
+ "tell() after readline() failed")
+ fobj.seek(512)
+ self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
+ "tell() after seek() and readline() failed")
+ fobj.seek(0)
+ line = fobj.readline()
+ self.assertTrue(fobj.read() == data[len(line):],
+ "read() after readline() failed")
+ fobj.close()
+
+ # Test if symbolic and hard links are resolved by extractfile(). The
+ # test link members each point to a regular member whose data is
+ # supposed to be exported.
+ def _test_fileobj_link(self, lnktype, regtype):
+ a = self.tar.extractfile(lnktype)
+ b = self.tar.extractfile(regtype)
+ self.assertEqual(a.name, b.name)
+
+ def test_fileobj_link1(self):
+ self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
+
+ def test_fileobj_link2(self):
+ self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
+
+ def test_fileobj_symlink1(self):
+ self._test_fileobj_link("ustar/symtype", "ustar/regtype")
+
+ def test_fileobj_symlink2(self):
+ self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
+
+
+class CommonReadTest(ReadTest):
+
+ def test_empty_tarfile(self):
+ # Test for issue6123: Allow opening empty archives.
+ # This test checks if tarfile.open() is able to open an empty tar
+ # archive successfully. Note that an empty tar archive is not the
+ # same as an empty file!
+ tarfile.open(tmpname, self.mode.replace("r", "w")).close()
+ try:
+ tar = tarfile.open(tmpname, self.mode)
+ tar.getnames()
+ except tarfile.ReadError:
+ self.fail("tarfile.open() failed on empty archive")
+ self.assertListEqual(tar.getmembers(), [])
+
+ def test_null_tarfile(self):
+ # Test for issue6123: Allow opening empty archives.
+ # This test guarantees that tarfile.open() does not treat an empty
+ # file as an empty tar archive.
+ open(tmpname, "wb").close()
+ self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
+ self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
+
+ def test_ignore_zeros(self):
+ # Test TarFile's ignore_zeros option.
+ if self.mode.endswith(":gz"):
+ _open = gzip.GzipFile
+ elif self.mode.endswith(":bz2"):
+ _open = bz2.BZ2File
+ else:
+ _open = open
+
+ for char in ('\0', 'a'):
+ # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
+ # are ignored correctly.
+ fobj = _open(tmpname, "wb")
+ fobj.write(char * 1024)
+ fobj.write(tarfile.TarInfo("foo").tobuf())
fobj.close()
- def test_old_dirtype(self):
- """Test old style dirtype member (bug #1336623).
- """
- # Old tars create directory members using a REGTYPE
- # header with a "/" appended to the filename field.
+ tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
+ self.assertListEqual(tar.getnames(), ["foo"],
+ "ignore_zeros=True should have skipped the %r-blocks" % char)
+ tar.close()
- # Create an old tar style directory entry.
- filename = tmpname()
- tarinfo = tarfile.TarInfo("directory/")
- tarinfo.type = tarfile.REGTYPE
- fobj = open(filename, "w")
- fobj.write(tarinfo.tobuf())
+class MiscReadTest(CommonReadTest):
+
+ def test_no_name_argument(self):
+ fobj = open(self.tarname, "rb")
+ self.tar.close()
+ self.tar = tarfile.open(fileobj=fobj, mode="r")
+ self.assertEqual(self.tar.name, os.path.abspath(fobj.name))
fobj.close()
+ def test_no_name_attribute(self):
+ fp = open(self.tarname, "rb")
+ data = fp.read()
+ fp.close()
+ fobj = StringIO.StringIO(data)
+ self.assertRaises(AttributeError, getattr, fobj, "name")
+ self.tar.close()
+ self.tar = tarfile.open(fileobj=fobj, mode="r")
+ self.assertEqual(self.tar.name, None)
+
+ def test_empty_name_attribute(self):
+ fp = open(self.tarname, "rb")
+ data = fp.read()
+ fp.close()
+ fobj = StringIO.StringIO(data)
+ fobj.name = ""
+ self.tar.close()
+ self.tar = tarfile.open(fileobj=fobj, mode="r")
+ self.assertEqual(self.tar.name, None)
+
+ def test_fileobj_with_offset(self):
+ # Skip the first member and store values from the second member
+ # of the testtar.
+ tar = tarfile.open(self.tarname, mode=self.mode)
+ tar.next()
+ t = tar.next()
+ name = t.name
+ offset = t.offset
+ data = tar.extractfile(t).read()
+ tar.close()
+
+ # Open the testtar and seek to the offset of the second member.
+ if self.mode.endswith(":gz"):
+ _open = gzip.GzipFile
+ elif self.mode.endswith(":bz2"):
+ _open = bz2.BZ2File
+ else:
+ _open = open
+ fobj = _open(self.tarname, "rb")
+ fobj.seek(offset)
+
+ # Test if the tarfile starts with the second member.
+ tar.close()
+ tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
+ t = tar.next()
+ self.assertEqual(t.name, name)
+ # Read to the end of fileobj and test if seeking back to the
+ # beginning works.
+ tar.getmembers()
+ self.assertEqual(tar.extractfile(t).read(), data,
+ "seek back did not work")
+ tar.close()
+ fobj.close()
+
+ def test_fail_comp(self):
+ # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
+ if self.mode == "r:":
+ return
+ self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
+ fobj = open(tarname, "rb")
+ self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode)
+
+ def test_v7_dirtype(self):
+ # Test old style dirtype member (bug #1336623):
+ # Old V7 tars create directory members using an AREGTYPE
+ # header with a "/" appended to the filename field.
+ tarinfo = self.tar.getmember("misc/dirtype-old-v7")
+ self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
+ "v7 dirtype failed")
+
+ def test_xstar_type(self):
+ # The xstar format stores extra atime and ctime fields inside the
+ # space reserved for the prefix field. The prefix field must be
+ # ignored in this case, otherwise it will mess up the name.
try:
- # Test if it is still a directory entry when
- # read back.
- tar = tarfile.open(filename)
- tarinfo = tar.getmembers()[0]
- tar.close()
+ self.tar.getmember("misc/regtype-xstar")
+ except KeyError:
+ self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
- self.assert_(tarinfo.type == tarfile.DIRTYPE)
- self.assert_(tarinfo.name.endswith("/"))
- finally:
- try:
- os.unlink(filename)
- except:
- pass
+ def test_check_members(self):
+ for tarinfo in self.tar:
+ self.assertTrue(int(tarinfo.mtime) == 07606136617,
+ "wrong mtime for %s" % tarinfo.name)
+ if not tarinfo.name.startswith("ustar/"):
+ continue
+ self.assertTrue(tarinfo.uname == "tarfile",
+ "wrong uname for %s" % tarinfo.name)
- def test_dirtype(self):
- for tarinfo in self.tar:
- if tarinfo.isdir():
- self.assert_(tarinfo.name.endswith("/"))
- self.assert_(not tarinfo.name[:-1].endswith("/"))
+ def test_find_members(self):
+ self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
+ "could not find all members")
+
+ def test_extract_hardlink(self):
+ # Test hardlink extraction (e.g. bug #857297).
+ tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")
+
+ tar.extract("ustar/regtype", TEMPDIR)
+ try:
+ tar.extract("ustar/lnktype", TEMPDIR)
+ except EnvironmentError, e:
+ if e.errno == errno.ENOENT:
+ self.fail("hardlink not extracted properly")
+
+ data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read()
+ self.assertEqual(md5sum(data), md5_regtype)
+
+ try:
+ tar.extract("ustar/symtype", TEMPDIR)
+ except EnvironmentError, e:
+ if e.errno == errno.ENOENT:
+ self.fail("symlink not extracted properly")
+
+ data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read()
+ self.assertEqual(md5sum(data), md5_regtype)
def test_extractall(self):
# Test if extractall() correctly restores directory permissions
# and times (see issue1735).
- if (sys.platform == "win32" or
- test_support.is_jython and os._name == 'nt'):
- # Win32 has no support for utime() on directories or
- # fine grained permissions.
- return
-
- fobj = StringIO.StringIO()
- tar = tarfile.open(fileobj=fobj, mode="w:")
- for name in ("foo", "foo/bar"):
- tarinfo = tarfile.TarInfo(name)
- tarinfo.type = tarfile.DIRTYPE
- tarinfo.mtime = 07606136617
- tarinfo.mode = 0755
- tar.addfile(tarinfo)
- tar.close()
- fobj.seek(0)
-
- TEMPDIR = os.path.join(dirname(), "extract-test")
- tar = tarfile.open(fileobj=fobj)
- tar.extractall(TEMPDIR)
- for tarinfo in tar.getmembers():
+ tar = tarfile.open(tarname, encoding="iso8859-1")
+ directories = [t for t in tar if t.isdir()]
+ tar.extractall(TEMPDIR, directories)
+ for tarinfo in directories:
path = os.path.join(TEMPDIR, tarinfo.name)
- self.assertEqual(tarinfo.mode, os.stat(path).st_mode & 0777)
+ if (sys.platform == "win32" or
+ test_support.is_jython and os._name == 'nt'):
+ # Win32 has no support for fine grained permissions.
+ self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777)
self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
tar.close()
- def test_star(self):
+ def test_init_close_fobj(self):
+ # Issue #7341: Close the internal file object in the TarFile
+ # constructor in case of an error. For the test we rely on
+ # the fact that opening an empty file raises a ReadError.
+ empty = os.path.join(TEMPDIR, "empty")
+ open(empty, "wb").write("")
+
try:
- self.tar.getmember("7-STAR")
- except KeyError:
- self.fail("finding 7-STAR member failed (mangled prefix?)")
+ tar = object.__new__(tarfile.TarFile)
+ try:
+ tar.__init__(empty)
+ except tarfile.ReadError:
+ self.assertTrue(tar.fileobj.closed)
+ else:
+ self.fail("ReadError not raised")
+ finally:
+ os.remove(empty)
-class ReadStreamTest(ReadTest):
- sep = "|"
+class StreamReadTest(CommonReadTest):
- def test(self):
- """Test member extraction, and for StreamError when
- seeking backwards.
- """
- ReadTest.test(self)
- tarinfo = self.tar.getmembers()[0]
- f = self.tar.extractfile(tarinfo)
+ mode="r|"
+
+ def test_fileobj_regular_file(self):
+ tarinfo = self.tar.next() # get "regtype" (can't use getmember)
+ fobj = self.tar.extractfile(tarinfo)
+ data = fobj.read()
+ self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
+ "regular file extraction failed")
+
+ def test_provoke_stream_error(self):
+ tarinfos = self.tar.getmembers()
+ f = self.tar.extractfile(tarinfos[0]) # read the first member
self.assertRaises(tarfile.StreamError, f.read)
- def test_stream(self):
- """Compare the normal tar and the stream tar.
- """
- stream = self.tar
- tar = tarfile.open(tarname(), 'r')
+ def test_compare_members(self):
+ tar1 = tarfile.open(tarname, encoding="iso8859-1")
+ tar2 = self.tar
- while 1:
- t1 = tar.next()
- t2 = stream.next()
+ while True:
+ t1 = tar1.next()
+ t2 = tar2.next()
if t1 is None:
break
- self.assert_(t2 is not None, "stream.next() failed.")
+ self.assertTrue(t2 is not None, "stream.next() failed.")
if t2.islnk() or t2.issym():
- self.assertRaises(tarfile.StreamError, stream.extractfile, t2)
+ self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
continue
- v1 = tar.extractfile(t1)
- v2 = stream.extractfile(t2)
+
+ v1 = tar1.extractfile(t1)
+ v2 = tar2.extractfile(t2)
if v1 is None:
continue
- self.assert_(v2 is not None, "stream.extractfile() failed")
- self.assert_(v1.read() == v2.read(), "stream extraction failed")
+ self.assertTrue(v2 is not None, "stream.extractfile() failed")
+ self.assertTrue(v1.read() == v2.read(), "stream extraction failed")
+
+ tar1.close()
+
+
+class DetectReadTest(unittest.TestCase):
+
+ def _testfunc_file(self, name, mode):
+ try:
+ tarfile.open(name, mode)
+ except tarfile.ReadError:
+ self.fail()
+
+ def _testfunc_fileobj(self, name, mode):
+ try:
+ tarfile.open(name, mode, fileobj=open(name, "rb"))
+ except tarfile.ReadError:
+ self.fail()
+
+ def _test_modes(self, testfunc):
+ testfunc(tarname, "r")
+ testfunc(tarname, "r:")
+ testfunc(tarname, "r:*")
+ testfunc(tarname, "r|")
+ testfunc(tarname, "r|*")
+
+ if gzip:
+ self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
+ self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
+ self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
+ self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
+
+ testfunc(gzipname, "r")
+ testfunc(gzipname, "r:*")
+ testfunc(gzipname, "r:gz")
+ testfunc(gzipname, "r|*")
+ testfunc(gzipname, "r|gz")
+
+ if bz2:
+ self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
+ self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
+ self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
+ self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
+
+ testfunc(bz2name, "r")
+ testfunc(bz2name, "r:*")
+ testfunc(bz2name, "r:bz2")
+ testfunc(bz2name, "r|*")
+ testfunc(bz2name, "r|bz2")
+
+ def test_detect_file(self):
+ self._test_modes(self._testfunc_file)
+
+ def test_detect_fileobj(self):
+ self._test_modes(self._testfunc_fileobj)
+
+
+class MemberReadTest(ReadTest):
+
+ def _test_member(self, tarinfo, chksum=None, **kwargs):
+ if chksum is not None:
+ self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
+ "wrong md5sum for %s" % tarinfo.name)
+
+ kwargs["mtime"] = 07606136617
+ kwargs["uid"] = 1000
+ kwargs["gid"] = 100
+ if "old-v7" not in tarinfo.name:
+ # V7 tar can't handle alphabetic owners.
+ kwargs["uname"] = "tarfile"
+ kwargs["gname"] = "tarfile"
+ for k, v in kwargs.iteritems():
+ self.assertTrue(getattr(tarinfo, k) == v,
+ "wrong value in %s field of %s" % (k, tarinfo.name))
+
+ def test_find_regtype(self):
+ tarinfo = self.tar.getmember("ustar/regtype")
+ self._test_member(tarinfo, size=7011, chksum=md5_regtype)
+
+ def test_find_conttype(self):
+ tarinfo = self.tar.getmember("ustar/conttype")
+ self._test_member(tarinfo, size=7011, chksum=md5_regtype)
+
+ def test_find_dirtype(self):
+ tarinfo = self.tar.getmember("ustar/dirtype")
+ self._test_member(tarinfo, size=0)
+
+ def test_find_dirtype_with_size(self):
+ tarinfo = self.tar.getmember("ustar/dirtype-with-size")
+ self._test_member(tarinfo, size=255)
+
+ def test_find_lnktype(self):
+ tarinfo = self.tar.getmember("ustar/lnktype")
+ self._test_member(tarinfo, size=0, linkname="ustar/regtype")
+
+ def test_find_symtype(self):
+ tarinfo = self.tar.getmember("ustar/symtype")
+ self._test_member(tarinfo, size=0, linkname="regtype")
+
+ def test_find_blktype(self):
+ tarinfo = self.tar.getmember("ustar/blktype")
+ self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
+
+ def test_find_chrtype(self):
+ tarinfo = self.tar.getmember("ustar/chrtype")
+ self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
+
+ def test_find_fifotype(self):
+ tarinfo = self.tar.getmember("ustar/fifotype")
+ self._test_member(tarinfo, size=0)
+
+ def test_find_sparse(self):
+ tarinfo = self.tar.getmember("ustar/sparse")
+ self._test_member(tarinfo, size=86016, chksum=md5_sparse)
+
+ def test_find_umlauts(self):
+ tarinfo = self.tar.getmember("ustar/umlauts-ÄÖÜäöüß")
+ self._test_member(tarinfo, size=7011, chksum=md5_regtype)
+
+ def test_find_ustar_longname(self):
+ name = "ustar/" + "12345/" * 39 + "1234567/longname"
+ self.assertIn(name, self.tar.getnames())
+
+ def test_find_regtype_oldv7(self):
+ tarinfo = self.tar.getmember("misc/regtype-old-v7")
+ self._test_member(tarinfo, size=7011, chksum=md5_regtype)
+
+ def test_find_pax_umlauts(self):
+ self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
+ tarinfo = self.tar.getmember("pax/umlauts-ÄÖÜäöüß")
+ self._test_member(tarinfo, size=7011, chksum=md5_regtype)
+
+
+class LongnameTest(ReadTest):
+
+ def test_read_longname(self):
+ # Test reading of longname (bug #1471427).
+ longname = self.subdir + "/" + "123/" * 125 + "longname"
+ try:
+ tarinfo = self.tar.getmember(longname)
+ except KeyError:
+ self.fail("longname not found")
+ self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
+
+ def test_read_longlink(self):
+ longname = self.subdir + "/" + "123/" * 125 + "longname"
+ longlink = self.subdir + "/" + "123/" * 125 + "longlink"
+ try:
+ tarinfo = self.tar.getmember(longlink)
+ except KeyError:
+ self.fail("longlink not found")
+ self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
+
+ def test_truncated_longname(self):
+ longname = self.subdir + "/" + "123/" * 125 + "longname"
+ tarinfo = self.tar.getmember(longname)
+ offset = tarinfo.offset
+ self.tar.fileobj.seek(offset)
+ fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512))
+ self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
+
+ def test_header_offset(self):
+ # Test if the start offset of the TarInfo object includes
+ # the preceding extended header.
+ longname = self.subdir + "/" + "123/" * 125 + "longname"
+ offset = self.tar.getmember(longname).offset
+ fobj = open(tarname)
+ fobj.seek(offset)
+ tarinfo = tarfile.TarInfo.frombuf(fobj.read(512))
+ self.assertEqual(tarinfo.type, self.longnametype)
+
+
+class GNUReadTest(LongnameTest):
+
+ subdir = "gnu"
+ longnametype = tarfile.GNUTYPE_LONGNAME
+
+ def test_sparse_file(self):
+ tarinfo1 = self.tar.getmember("ustar/sparse")
+ fobj1 = self.tar.extractfile(tarinfo1)
+ tarinfo2 = self.tar.getmember("gnu/sparse")
+ fobj2 = self.tar.extractfile(tarinfo2)
+ self.assertTrue(fobj1.read() == fobj2.read(),
+ "sparse file extraction failed")
+
+
+class PaxReadTest(LongnameTest):
+
+ subdir = "pax"
+ longnametype = tarfile.XHDTYPE
+
+ def test_pax_global_headers(self):
+ tar = tarfile.open(tarname, encoding="iso8859-1")
+
+ tarinfo = tar.getmember("pax/regtype1")
+ self.assertEqual(tarinfo.uname, "foo")
+ self.assertEqual(tarinfo.gname, "bar")
+ self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"ÄÖÜäöüß")
+
+ tarinfo = tar.getmember("pax/regtype2")
+ self.assertEqual(tarinfo.uname, "")
+ self.assertEqual(tarinfo.gname, "bar")
+ self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"ÄÖÜäöüß")
+
+ tarinfo = tar.getmember("pax/regtype3")
+ self.assertEqual(tarinfo.uname, "tarfile")
+ self.assertEqual(tarinfo.gname, "tarfile")
+ self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"ÄÖÜäöüß")
+
+ def test_pax_number_fields(self):
+ # All following number fields are read from the pax header.
+ tar = tarfile.open(tarname, encoding="iso8859-1")
+ tarinfo = tar.getmember("pax/regtype4")
+ self.assertEqual(tarinfo.size, 7011)
+ self.assertEqual(tarinfo.uid, 123)
+ self.assertEqual(tarinfo.gid, 123)
+ self.assertEqual(tarinfo.mtime, 1041808783.0)
+ self.assertEqual(type(tarinfo.mtime), float)
+ self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
+ self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
+
+
+class WriteTestBase(unittest.TestCase):
+ # Put all write tests in here that are supposed to be tested
+ # in all possible mode combinations.
+
+ def test_fileobj_no_close(self):
+ fobj = StringIO.StringIO()
+ tar = tarfile.open(fileobj=fobj, mode=self.mode)
+ tar.addfile(tarfile.TarInfo("foo"))
+ tar.close()
+ self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
+
+
+class WriteTest(WriteTestBase):
+
+ mode = "w:"
+
+ def test_100_char_name(self):
+ # The name field in a tar header stores strings of at most 100 chars.
+ # If a string is shorter than 100 chars it has to be padded with '\0',
+ # which implies that a string of exactly 100 chars is stored without
+ # a trailing '\0'.
+ name = "0123456789" * 10
+ tar = tarfile.open(tmpname, self.mode)
+ t = tarfile.TarInfo(name)
+ tar.addfile(t)
+ tar.close()
+
+ tar = tarfile.open(tmpname)
+ self.assertTrue(tar.getnames()[0] == name,
+ "failed to store 100 char filename")
+ tar.close()
+
+ def test_tar_size(self):
+ # Test for bug #1013882.
+ tar = tarfile.open(tmpname, self.mode)
+ path = os.path.join(TEMPDIR, "file")
+ fobj = open(path, "wb")
+ fobj.write("aaa")
+ fobj.close()
+ tar.add(path)
+ tar.close()
+ self.assertTrue(os.path.getsize(tmpname) > 0,
+ "tarfile is empty")
+
+ # The test_*_size tests test for bug #1167128.
+ def test_file_size(self):
+ tar = tarfile.open(tmpname, self.mode)
+
+ path = os.path.join(TEMPDIR, "file")
+ fobj = open(path, "wb")
+ fobj.close()
+ tarinfo = tar.gettarinfo(path)
+ self.assertEqual(tarinfo.size, 0)
+
+ fobj = open(path, "wb")
+ fobj.write("aaa")
+ fobj.close()
+ tarinfo = tar.gettarinfo(path)
+ self.assertEqual(tarinfo.size, 3)
tar.close()
- stream.close()
-class ReadDetectTest(ReadTest):
+ def test_directory_size(self):
+ path = os.path.join(TEMPDIR, "directory")
+ os.mkdir(path)
+ try:
+ tar = tarfile.open(tmpname, self.mode)
+ tarinfo = tar.gettarinfo(path)
+ self.assertEqual(tarinfo.size, 0)
+ finally:
+ os.rmdir(path)
- def setUp(self):
- self.tar = tarfile.open(tarname(self.comp), self.mode)
+ def test_link_size(self):
+ if hasattr(os, "link"):
+ link = os.path.join(TEMPDIR, "link")
+ target = os.path.join(TEMPDIR, "link_target")
+ fobj = open(target, "wb")
+ fobj.write("aaa")
+ fobj.close()
+ os.link(target, link)
+ try:
+ tar = tarfile.open(tmpname, self.mode)
+ # Record the link target in the inodes list.
+ tar.gettarinfo(target)
+ tarinfo = tar.gettarinfo(link)
+ self.assertEqual(tarinfo.size, 0)
+ finally:
+ os.remove(target)
+ os.remove(link)
- def tearDown(self):
- self.tar.close()
-
-class ReadDetectFileobjTest(ReadTest):
-
- def setUp(self):
- name = tarname(self.comp)
- self.fileobj = open(name, "rb")
- self.tar = tarfile.open(name, mode=self.mode,
- fileobj=self.fileobj)
-
- def tearDown(self):
- self.tar.close()
- self.fileobj.close()
-
-class ReadAsteriskTest(ReadTest):
-
- def setUp(self):
- mode = self.mode + self.sep + "*"
- self.tar = tarfile.open(tarname(self.comp), mode)
-
-class ReadStreamAsteriskTest(ReadStreamTest):
-
- def setUp(self):
- mode = self.mode + self.sep + "*"
- self.tar = tarfile.open(tarname(self.comp), mode)
-
-class ReadFileobjTest(BaseTest):
-
- def test_fileobj_with_offset(self):
- # Skip the first member and store values from the second member
- # of the testtar.
- self.tar.next()
- t = self.tar.next()
- name = t.name
- offset = t.offset
- data = self.tar.extractfile(t).read()
- self.tar.close()
-
- # Open the testtar and seek to the offset of the second member.
- if self.comp == "gz":
- _open = gzip.GzipFile
- elif self.comp == "bz2":
- _open = bz2.BZ2File
- else:
- _open = open
- fobj = _open(tarname(self.comp), "rb")
- fobj.seek(offset)
-
- # Test if the tarfile starts with the second member.
- self.tar.close()
- self.tar = tarfile.open(tarname(self.comp), "r:", fileobj=fobj)
- t = self.tar.next()
- self.assertEqual(t.name, name)
- # Read to the end of fileobj and test if seeking back to the
- # beginning works.
- self.tar.getmembers()
- self.assertEqual(self.tar.extractfile(t).read(), data,
- "seek back did not work")
- self.tar.close()
- fobj.close()
-
-class WriteTest(BaseTest):
- mode = 'w'
-
- def setUp(self):
- mode = self.mode + self.sep + self.comp
- self.src = tarfile.open(tarname(self.comp), 'r')
- self.dstname = tmpname()
- self.dst = tarfile.open(self.dstname, mode)
-
- def tearDown(self):
- self.src.close()
- self.dst.close()
-
- def test_posix(self):
- self.dst.posix = 1
- self._test()
-
- def test_nonposix(self):
- self.dst.posix = 0
- self._test()
-
- def test_small(self):
- self.dst.add(os.path.join(os.path.dirname(__file__),"cfgparser.1"))
- self.dst.close()
- self.assertNotEqual(os.stat(self.dstname).st_size, 0)
-
- def _test(self):
- for tarinfo in self.src:
- if not tarinfo.isreg():
- continue
- f = self.src.extractfile(tarinfo)
- if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME and "/" not in tarinfo.name:
- self.assertRaises(ValueError, self.dst.addfile,
- tarinfo, f)
- else:
- self.dst.addfile(tarinfo, f)
+ def test_symlink_size(self):
+ if hasattr(os, "symlink"):
+ path = os.path.join(TEMPDIR, "symlink")
+ os.symlink("link_target", path)
+ try:
+ tar = tarfile.open(tmpname, self.mode)
+ tarinfo = tar.gettarinfo(path)
+ self.assertEqual(tarinfo.size, 0)
+ finally:
+ os.remove(path)
def test_add_self(self):
- dstname = os.path.abspath(self.dstname)
+ # Test for #1257255.
+ dstname = os.path.abspath(tmpname)
- self.assertEqual(self.dst.name, dstname, "archive name must be absolute")
+ tar = tarfile.open(tmpname, self.mode)
+ self.assertTrue(tar.name == dstname, "archive name must be absolute")
- self.dst.add(dstname)
- self.assertEqual(self.dst.getnames(), [], "added the archive to itself")
+ tar.add(dstname)
+ self.assertTrue(tar.getnames() == [], "added the archive to itself")
cwd = os.getcwd()
- os.chdir(dirname())
- self.dst.add(dstname)
+ os.chdir(TEMPDIR)
+ tar.add(dstname)
os.chdir(cwd)
- self.assertEqual(self.dst.getnames(), [], "added the archive to itself")
+ self.assertTrue(tar.getnames() == [], "added the archive to itself")
+ def test_exclude(self):
+ tempdir = os.path.join(TEMPDIR, "exclude")
+ os.mkdir(tempdir)
+ try:
+ for name in ("foo", "bar", "baz"):
+ name = os.path.join(tempdir, name)
+ open(name, "wb").close()
-class Write100Test(BaseTest):
- # The name field in a tar header stores strings of at most 100 chars.
- # If a string is shorter than 100 chars it has to be padded with '\0',
- # which implies that a string of exactly 100 chars is stored without
- # a trailing '\0'.
+ exclude = os.path.isfile
- def setUp(self):
- self.name = "01234567890123456789012345678901234567890123456789"
- self.name += "01234567890123456789012345678901234567890123456789"
+ tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
+ with test_support.check_warnings(("use the filter argument",
+ DeprecationWarning)):
+ tar.add(tempdir, arcname="empty_dir", exclude=exclude)
+ tar.close()
- self.tar = tarfile.open(tmpname(), "w")
- t = tarfile.TarInfo(self.name)
- self.tar.addfile(t)
- self.tar.close()
+ tar = tarfile.open(tmpname, "r")
+ self.assertEqual(len(tar.getmembers()), 1)
+ self.assertEqual(tar.getnames()[0], "empty_dir")
+ finally:
+ shutil.rmtree(tempdir)
- self.tar = tarfile.open(tmpname())
+ def test_filter(self):
+ tempdir = os.path.join(TEMPDIR, "filter")
+ os.mkdir(tempdir)
+ try:
+ for name in ("foo", "bar", "baz"):
+ name = os.path.join(tempdir, name)
+ open(name, "wb").close()
- def tearDown(self):
- self.tar.close()
+ def filter(tarinfo):
+ if os.path.basename(tarinfo.name) == "bar":
+ return
+ tarinfo.uid = 123
+ tarinfo.uname = "foo"
+ return tarinfo
- def test(self):
- self.assertEqual(self.tar.getnames()[0], self.name,
- "failed to store 100 char filename")
+ tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
+ tar.add(tempdir, arcname="empty_dir", filter=filter)
+ tar.close()
+ tar = tarfile.open(tmpname, "r")
+ for tarinfo in tar:
+ self.assertEqual(tarinfo.uid, 123)
+ self.assertEqual(tarinfo.uname, "foo")
+ self.assertEqual(len(tar.getmembers()), 3)
+ tar.close()
+ finally:
+ shutil.rmtree(tempdir)
-class WriteSize0Test(BaseTest):
- mode = 'w'
+ # Guarantee that stored pathnames are not modified. Don't
+ # remove ./ or ../ or double slashes. Still make absolute
+ # pathnames relative.
+ # For details see bug #6054.
+ def _test_pathname(self, path, cmp_path=None, dir=False):
+ # Create a tarfile with an empty member named path
+ # and compare the stored name with the original.
+ foo = os.path.join(TEMPDIR, "foo")
+ if not dir:
+ open(foo, "w").close()
+ else:
+ os.mkdir(foo)
- def setUp(self):
- self.tmpdir = dirname()
- self.dstname = tmpname()
- self.dst = tarfile.open(self.dstname, "w")
+ tar = tarfile.open(tmpname, self.mode)
+ tar.add(foo, arcname=path)
+ tar.close()
- def tearDown(self):
- self.dst.close()
+ tar = tarfile.open(tmpname, "r")
+ t = tar.next()
+ tar.close()
- def test_file(self):
- path = os.path.join(self.tmpdir, "file")
- f = open(path, "w")
- f.close()
- tarinfo = self.dst.gettarinfo(path)
- self.assertEqual(tarinfo.size, 0)
- f = open(path, "w")
- f.write("aaa")
- f.close()
- tarinfo = self.dst.gettarinfo(path)
- self.assertEqual(tarinfo.size, 3)
+ if not dir:
+ os.remove(foo)
+ else:
+ os.rmdir(foo)
- def test_directory(self):
- path = os.path.join(self.tmpdir, "directory")
- if os.path.exists(path):
- # This shouldn't be necessary, but is <wink> if a previous
- # run was killed in mid-stream.
- shutil.rmtree(path)
- os.mkdir(path)
- tarinfo = self.dst.gettarinfo(path)
- self.assertEqual(tarinfo.size, 0)
+ self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
- def test_symlink(self):
- if hasattr(os, "symlink"):
- path = os.path.join(self.tmpdir, "symlink")
- os.symlink("link_target", path)
- tarinfo = self.dst.gettarinfo(path)
- self.assertEqual(tarinfo.size, 0)
+ def test_pathnames(self):
+ self._test_pathname("foo")
+ self._test_pathname(os.path.join("foo", ".", "bar"))
+ self._test_pathname(os.path.join("foo", "..", "bar"))
+ self._test_pathname(os.path.join(".", "foo"))
+ self._test_pathname(os.path.join(".", "foo", "."))
+ self._test_pathname(os.path.join(".", "foo", ".", "bar"))
+ self._test_pathname(os.path.join(".", "foo", "..", "bar"))
+ self._test_pathname(os.path.join(".", "foo", "..", "bar"))
+ self._test_pathname(os.path.join("..", "foo"))
+ self._test_pathname(os.path.join("..", "foo", ".."))
+ self._test_pathname(os.path.join("..", "foo", ".", "bar"))
+ self._test_pathname(os.path.join("..", "foo", "..", "bar"))
+ self._test_pathname("foo" + os.sep + os.sep + "bar")
+ self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
-class WriteStreamTest(WriteTest):
- sep = '|'
+ def test_abs_pathnames(self):
+ if sys.platform == "win32":
+ self._test_pathname("C:\\foo", "foo")
+ else:
+ self._test_pathname("/foo", "foo")
+ self._test_pathname("///foo", "foo")
- def test_padding(self):
- self.dst.close()
+ def test_cwd(self):
+ # Test adding the current working directory.
+ cwd = os.getcwd()
+ os.chdir(TEMPDIR)
+ try:
+ open("foo", "w").close()
- if self.comp == "gz":
- f = gzip.GzipFile(self.dstname)
- s = f.read()
- f.close()
- elif self.comp == "bz2":
- b = bz2.BZ2Decompressor()
- f = file(self.dstname)
- s = f.read()
- f.close()
- s = b.decompress(s)
- self.assertEqual(len(f.unused_data), 0, "trailing data")
+ tar = tarfile.open(tmpname, self.mode)
+ tar.add(".")
+ tar.close()
+
+ tar = tarfile.open(tmpname, "r")
+ for t in tar:
+ self.assert_(t.name == "." or t.name.startswith("./"))
+ tar.close()
+ finally:
+ os.chdir(cwd)
+
+
+class StreamWriteTest(WriteTestBase):
+
+ mode = "w|"
+
+ def test_stream_padding(self):
+ # Test for bug #1543303.
+ tar = tarfile.open(tmpname, self.mode)
+ tar.close()
+
+ if self.mode.endswith("gz"):
+ fobj = gzip.GzipFile(tmpname)
+ data = fobj.read()
+ fobj.close()
+ elif self.mode.endswith("bz2"):
+ dec = bz2.BZ2Decompressor()
+ data = open(tmpname, "rb").read()
+ data = dec.decompress(data)
+ self.assertTrue(len(dec.unused_data) == 0,
+ "found trailing data")
else:
- f = file(self.dstname)
- s = f.read()
- f.close()
+ fobj = open(tmpname, "rb")
+ data = fobj.read()
+ fobj.close()
- self.assertEqual(s.count("\0"), tarfile.RECORDSIZE,
+ self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
"incorrect zero padding")
+ def test_file_mode(self):
+ # Test for issue #8464: Create files with correct
+ # permissions.
+ if sys.platform == "win32" or not hasattr(os, "umask"):
+ return
-class WriteGNULongTest(unittest.TestCase):
- """This testcase checks for correct creation of GNU Longname
- and Longlink extensions.
+ if os.path.exists(tmpname):
+ os.remove(tmpname)
- It creates a tarfile and adds empty members with either
- long names, long linknames or both and compares the size
- of the tarfile with the expected size.
+ original_umask = os.umask(0022)
+ try:
+ tar = tarfile.open(tmpname, self.mode)
+ tar.close()
+ mode = os.stat(tmpname).st_mode & 0777
+ self.assertEqual(mode, 0644, "wrong file permissions")
+ finally:
+ os.umask(original_umask)
- It checks for SF bug #812325 in TarFile._create_gnulong().
- While I was writing this testcase, I noticed a second bug
- in the same method:
- Long{names,links} weren't null-terminated which lead to
- bad tarfiles when their length was a multiple of 512. This
- is tested as well.
- """
+class GNUWriteTest(unittest.TestCase):
+ # This testcase checks for correct creation of GNU Longname
+ # and Longlink extended headers (cp. bug #812325).
def _length(self, s):
blocks, remainder = divmod(len(s) + 1, 512)
@@ -506,19 +912,17 @@
return blocks * 512
def _calc_size(self, name, link=None):
- # initial tar header
+ # Initial tar header
count = 512
if len(name) > tarfile.LENGTH_NAME:
- # gnu longname extended header + longname
+ # GNU longname extended header + longname
count += 512
count += self._length(name)
-
if link is not None and len(link) > tarfile.LENGTH_LINK:
- # gnu longlink extended header + longlink
+ # GNU longlink extended header + longlink
count += 512
count += self._length(link)
-
return count
def _test(self, name, link=None):
@@ -527,22 +931,24 @@
tarinfo.linkname = link
tarinfo.type = tarfile.LNKTYPE
- tar = tarfile.open(tmpname(), "w")
- tar.posix = False
+ tar = tarfile.open(tmpname, "w")
+ tar.format = tarfile.GNU_FORMAT
tar.addfile(tarinfo)
v1 = self._calc_size(name, link)
v2 = tar.offset
- self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
+ self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
tar.close()
- tar = tarfile.open(tmpname())
+ tar = tarfile.open(tmpname)
member = tar.next()
- self.failIf(member is None, "unable to read longname member")
- self.assert_(tarinfo.name == member.name and \
- tarinfo.linkname == member.linkname, \
- "unable to read longname member")
+ self.assertIsNotNone(member,
+ "unable to read longname member")
+ self.assertEqual(tarinfo.name, member.name,
+ "unable to read longname member")
+ self.assertEqual(tarinfo.linkname, member.linkname,
+ "unable to read longname member")
tar.close()
def test_longname_1023(self):
@@ -575,278 +981,581 @@
self._test(("longnam/" * 127) + "longname_",
("longlnk/" * 127) + "longlink_")
-class ReadGNULongTest(unittest.TestCase):
+
+class HardlinkTest(unittest.TestCase):
+ # Test the creation of LNKTYPE (hardlink) members in an archive.
def setUp(self):
- self.tar = tarfile.open(tarname())
+ self.foo = os.path.join(TEMPDIR, "foo")
+ self.bar = os.path.join(TEMPDIR, "bar")
+
+ fobj = open(self.foo, "wb")
+ fobj.write("foo")
+ fobj.close()
+
+ os.link(self.foo, self.bar)
+
+ self.tar = tarfile.open(tmpname, "w")
+ self.tar.add(self.foo)
def tearDown(self):
self.tar.close()
-
- def test_1471427(self):
- """Test reading of longname (bug #1471427).
- """
- name = "test/" * 20 + "0-REGTYPE"
- try:
- tarinfo = self.tar.getmember(name)
- except KeyError:
- tarinfo = None
- self.assert_(tarinfo is not None, "longname not found")
- self.assert_(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
-
- def test_read_name(self):
- name = ("0-LONGNAME-" * 10)[:101]
- try:
- tarinfo = self.tar.getmember(name)
- except KeyError:
- tarinfo = None
- self.assert_(tarinfo is not None, "longname not found")
-
- def test_read_link(self):
- link = ("1-LONGLINK-" * 10)[:101]
- name = ("0-LONGNAME-" * 10)[:101]
- try:
- tarinfo = self.tar.getmember(link)
- except KeyError:
- tarinfo = None
- self.assert_(tarinfo is not None, "longlink not found")
- self.assert_(tarinfo.linkname == name, "linkname wrong")
-
- def test_truncated_longname(self):
- f = open(tarname())
- fobj = StringIO.StringIO(f.read(1024))
- f.close()
- tar = tarfile.open(name="foo.tar", fileobj=fobj)
- self.assert_(len(tar.getmembers()) == 0, "")
- tar.close()
-
-
-class ExtractHardlinkTest(BaseTest):
-
- def test_hardlink(self):
- """Test hardlink extraction (bug #857297)
- """
- # Prevent errors from being caught
- self.tar.errorlevel = 1
-
- self.tar.extract("0-REGTYPE", dirname())
- try:
- # Extract 1-LNKTYPE which is a hardlink to 0-REGTYPE
- self.tar.extract("1-LNKTYPE", dirname())
- except EnvironmentError, e:
- import errno
- if e.errno == errno.ENOENT:
- self.fail("hardlink not extracted properly")
-
-class CreateHardlinkTest(BaseTest):
- """Test the creation of LNKTYPE (hardlink) members in an archive.
- In this respect tarfile.py mimics the behaviour of GNU tar: If
- a file has a st_nlink > 1, it will be added a REGTYPE member
- only the first time.
- """
-
- def setUp(self):
- self.tar = tarfile.open(tmpname(), "w")
-
- self.foo = os.path.join(dirname(), "foo")
- self.bar = os.path.join(dirname(), "bar")
-
- if os.path.exists(self.foo):
- os.remove(self.foo)
- if os.path.exists(self.bar):
- os.remove(self.bar)
-
- f = open(self.foo, "w")
- f.write("foo")
- f.close()
- self.tar.add(self.foo)
+ os.remove(self.foo)
+ os.remove(self.bar)
def test_add_twice(self):
- # If st_nlink == 1 then the same file will be added as
- # REGTYPE every time.
+ # The same name will be added as a REGTYPE every
+ # time regardless of st_nlink.
tarinfo = self.tar.gettarinfo(self.foo)
- self.assertEqual(tarinfo.type, tarfile.REGTYPE,
+ self.assertTrue(tarinfo.type == tarfile.REGTYPE,
"add file as regular failed")
def test_add_hardlink(self):
- # If st_nlink > 1 then the same file will be added as
- # LNKTYPE.
- os.link(self.foo, self.bar)
- tarinfo = self.tar.gettarinfo(self.foo)
- self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
- "add file as hardlink failed")
-
tarinfo = self.tar.gettarinfo(self.bar)
- self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
+ self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
"add file as hardlink failed")
def test_dereference_hardlink(self):
self.tar.dereference = True
- os.link(self.foo, self.bar)
tarinfo = self.tar.gettarinfo(self.bar)
- self.assertEqual(tarinfo.type, tarfile.REGTYPE,
+ self.assertTrue(tarinfo.type == tarfile.REGTYPE,
"dereferencing hardlink failed")
-# Gzip TestCases
-class ReadTestGzip(ReadTest):
- comp = "gz"
-class ReadStreamTestGzip(ReadStreamTest):
- comp = "gz"
-class WriteTestGzip(WriteTest):
- comp = "gz"
-class WriteStreamTestGzip(WriteStreamTest):
- comp = "gz"
-class ReadDetectTestGzip(ReadDetectTest):
- comp = "gz"
-class ReadDetectFileobjTestGzip(ReadDetectFileobjTest):
- comp = "gz"
-class ReadAsteriskTestGzip(ReadAsteriskTest):
- comp = "gz"
-class ReadStreamAsteriskTestGzip(ReadStreamAsteriskTest):
- comp = "gz"
-class ReadFileobjTestGzip(ReadFileobjTest):
- comp = "gz"
+class PaxWriteTest(GNUWriteTest):
-# Filemode test cases
+ def _test(self, name, link=None):
+ # See GNUWriteTest.
+ tarinfo = tarfile.TarInfo(name)
+ if link:
+ tarinfo.linkname = link
+ tarinfo.type = tarfile.LNKTYPE
-class FileModeTest(unittest.TestCase):
- def test_modes(self):
- self.assertEqual(tarfile.filemode(0755), '-rwxr-xr-x')
- self.assertEqual(tarfile.filemode(07111), '---s--s--t')
+ tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
+ tar.addfile(tarinfo)
+ tar.close()
-class OpenFileobjTest(BaseTest):
+ tar = tarfile.open(tmpname)
+ if link:
+ l = tar.getmembers()[0].linkname
+ self.assertTrue(link == l, "PAX longlink creation failed")
+ else:
+ n = tar.getmembers()[0].name
+ self.assertTrue(name == n, "PAX longname creation failed")
- def test_opener(self):
- # Test for SF bug #1496501.
- fobj = StringIO.StringIO("foo\n")
+ def test_pax_global_header(self):
+ pax_headers = {
+ u"foo": u"bar",
+ u"uid": u"0",
+ u"mtime": u"1.23",
+ u"test": u"äöü",
+ u"äöü": u"test"}
+
+ tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
+ pax_headers=pax_headers)
+ tar.addfile(tarfile.TarInfo("test"))
+ tar.close()
+
+ # Test if the global header was written correctly.
+ tar = tarfile.open(tmpname, encoding="iso8859-1")
+ self.assertEqual(tar.pax_headers, pax_headers)
+ self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
+
+ # Test if all the fields are unicode.
+ for key, val in tar.pax_headers.iteritems():
+ self.assertTrue(type(key) is unicode)
+ self.assertTrue(type(val) is unicode)
+ if key in tarfile.PAX_NUMBER_FIELDS:
+ try:
+ tarfile.PAX_NUMBER_FIELDS[key](val)
+ except (TypeError, ValueError):
+ self.fail("unable to convert pax header field")
+
+ def test_pax_extended_header(self):
+ # The fields from the pax header have priority over the
+ # TarInfo.
+ pax_headers = {u"path": u"foo", u"uid": u"123"}
+
+ tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
+ t = tarfile.TarInfo()
+ t.name = u"äöü" # non-ASCII
+ t.uid = 8**8 # too large
+ t.pax_headers = pax_headers
+ tar.addfile(t)
+ tar.close()
+
+ tar = tarfile.open(tmpname, encoding="iso8859-1")
+ t = tar.getmembers()[0]
+ self.assertEqual(t.pax_headers, pax_headers)
+ self.assertEqual(t.name, "foo")
+ self.assertEqual(t.uid, 123)
+
+
+class UstarUnicodeTest(unittest.TestCase):
+ # All *UnicodeTests FIXME
+
+ format = tarfile.USTAR_FORMAT
+
+ def test_iso8859_1_filename(self):
+ self._test_unicode_filename("iso8859-1")
+
+ def test_utf7_filename(self):
+ self._test_unicode_filename("utf7")
+
+ def test_utf8_filename(self):
+ self._test_unicode_filename("utf8")
+
+ def _test_unicode_filename(self, encoding):
+ tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
+ name = u"äöü"
+ tar.addfile(tarfile.TarInfo(name))
+ tar.close()
+
+ tar = tarfile.open(tmpname, encoding=encoding)
+ self.assertTrue(type(tar.getnames()[0]) is not unicode)
+ self.assertEqual(tar.getmembers()[0].name, name.encode(encoding))
+ tar.close()
+
+ def test_unicode_filename_error(self):
+ tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
+ tarinfo = tarfile.TarInfo()
+
+ tarinfo.name = "äöü"
+ if self.format == tarfile.PAX_FORMAT:
+ self.assertRaises(UnicodeError, tar.addfile, tarinfo)
+ else:
+ tar.addfile(tarinfo)
+
+ tarinfo.name = u"äöü"
+ self.assertRaises(UnicodeError, tar.addfile, tarinfo)
+
+ tarinfo.name = "foo"
+ tarinfo.uname = u"äöü"
+ self.assertRaises(UnicodeError, tar.addfile, tarinfo)
+
+ def test_unicode_argument(self):
+ tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
+ for t in tar:
+ self.assertTrue(type(t.name) is str)
+ self.assertTrue(type(t.linkname) is str)
+ self.assertTrue(type(t.uname) is str)
+ self.assertTrue(type(t.gname) is str)
+ tar.close()
+
+ def test_uname_unicode(self):
+ for name in (u"äöü", "äöü"):
+ t = tarfile.TarInfo("foo")
+ t.uname = name
+ t.gname = name
+
+ fobj = StringIO.StringIO()
+ tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
+ tar.addfile(t)
+ tar.close()
+ fobj.seek(0)
+
+ tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
+ t = tar.getmember("foo")
+ self.assertEqual(t.uname, "äöü")
+ self.assertEqual(t.gname, "äöü")
+
+
+class GNUUnicodeTest(UstarUnicodeTest):
+
+ format = tarfile.GNU_FORMAT
+
+
+class PaxUnicodeTest(UstarUnicodeTest):
+
+ format = tarfile.PAX_FORMAT
+
+ def _create_unicode_name(self, name):
+ tar = tarfile.open(tmpname, "w", format=self.format)
+ t = tarfile.TarInfo()
+ t.pax_headers["path"] = name
+ tar.addfile(t)
+ tar.close()
+
+ def test_error_handlers(self):
+ # Test if the unicode error handlers work correctly for characters
+ # that cannot be expressed in a given encoding.
+ self._create_unicode_name(u"äöü")
+
+ for handler, name in (("utf-8", u"äöü".encode("utf8")),
+ ("replace", "???"), ("ignore", "")):
+ tar = tarfile.open(tmpname, format=self.format, encoding="ascii",
+ errors=handler)
+ self.assertEqual(tar.getnames()[0], name)
+
+ self.assertRaises(UnicodeError, tarfile.open, tmpname,
+ encoding="ascii", errors="strict")
+
+ def test_error_handler_utf8(self):
+ # Create a pathname that has one component representable using
+ # iso8859-1 and the other only in iso8859-15.
+ self._create_unicode_name(u"äöü/¤")
+
+ tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1",
+ errors="utf-8")
+ self.assertEqual(tar.getnames()[0], "äöü/" + u"¤".encode("utf8"))
+
+
+class AppendTest(unittest.TestCase):
+ # Test append mode (cp. patch #1652681).
+
+ def setUp(self):
+ self.tarname = tmpname
+ if os.path.exists(self.tarname):
+ os.remove(self.tarname)
+
+ def _add_testfile(self, fileobj=None):
+ tar = tarfile.open(self.tarname, "a", fileobj=fileobj)
+ tar.addfile(tarfile.TarInfo("bar"))
+ tar.close()
+
+ def _create_testtar(self, mode="w:"):
+ src = tarfile.open(tarname, encoding="iso8859-1")
+ t = src.getmember("ustar/regtype")
+ t.name = "foo"
+ f = src.extractfile(t)
+ tar = tarfile.open(self.tarname, mode)
+ tar.addfile(t, f)
+ tar.close()
+
+ def _test(self, names=["bar"], fileobj=None):
+ tar = tarfile.open(self.tarname, fileobj=fileobj)
+ self.assertEqual(tar.getnames(), names)
+
+ def test_non_existing(self):
+ self._add_testfile()
+ self._test()
+
+ def test_empty(self):
+ tarfile.open(self.tarname, "w:").close()
+ self._add_testfile()
+ self._test()
+
+ def test_empty_fileobj(self):
+ fobj = StringIO.StringIO("\0" * 1024)
+ self._add_testfile(fobj)
+ fobj.seek(0)
+ self._test(fileobj=fobj)
+
+ def test_fileobj(self):
+ self._create_testtar()
+ data = open(self.tarname).read()
+ fobj = StringIO.StringIO(data)
+ self._add_testfile(fobj)
+ fobj.seek(0)
+ self._test(names=["foo", "bar"], fileobj=fobj)
+
+ def test_existing(self):
+ self._create_testtar()
+ self._add_testfile()
+ self._test(names=["foo", "bar"])
+
+ def test_append_gz(self):
+ if gzip is None:
+ return
+ self._create_testtar("w:gz")
+ self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
+
+ def test_append_bz2(self):
+ if bz2 is None:
+ return
+ self._create_testtar("w:bz2")
+ self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
+
+ # Append mode is supposed to fail if the tarfile to append to
+ # does not end with a zero block.
+ def _test_error(self, data):
+ open(self.tarname, "wb").write(data)
+ self.assertRaises(tarfile.ReadError, self._add_testfile)
+
+ def test_null(self):
+ self._test_error("")
+
+ def test_incomplete(self):
+ self._test_error("\0" * 13)
+
+ def test_premature_eof(self):
+ data = tarfile.TarInfo("foo").tobuf()
+ self._test_error(data)
+
+ def test_trailing_garbage(self):
+ data = tarfile.TarInfo("foo").tobuf()
+ self._test_error(data + "\0" * 13)
+
+ def test_invalid(self):
+ self._test_error("a" * 512)
+
+
+class LimitsTest(unittest.TestCase):
+
+ def test_ustar_limits(self):
+ # 100 char name
+ tarinfo = tarfile.TarInfo("0123456789" * 10)
+ tarinfo.tobuf(tarfile.USTAR_FORMAT)
+
+ # 101 char name that cannot be stored
+ tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
+ self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
+
+ # 256 char name with a slash at pos 156
+ tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
+ tarinfo.tobuf(tarfile.USTAR_FORMAT)
+
+ # 256 char name that cannot be stored
+ tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
+ self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
+
+ # 512 char name
+ tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
+ self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
+
+ # 512 char linkname
+ tarinfo = tarfile.TarInfo("longlink")
+ tarinfo.linkname = "123/" * 126 + "longname"
+ self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
+
+ # uid > 8 digits
+ tarinfo = tarfile.TarInfo("name")
+ tarinfo.uid = 010000000
+ self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
+
+ def test_gnu_limits(self):
+ tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
+ tarinfo.tobuf(tarfile.GNU_FORMAT)
+
+ tarinfo = tarfile.TarInfo("longlink")
+ tarinfo.linkname = "123/" * 126 + "longname"
+ tarinfo.tobuf(tarfile.GNU_FORMAT)
+
+ # uid >= 256 ** 7
+ tarinfo = tarfile.TarInfo("name")
+ tarinfo.uid = 04000000000000000000L
+ self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
+
+ def test_pax_limits(self):
+ tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
+ tarinfo.tobuf(tarfile.PAX_FORMAT)
+
+ tarinfo = tarfile.TarInfo("longlink")
+ tarinfo.linkname = "123/" * 126 + "longname"
+ tarinfo.tobuf(tarfile.PAX_FORMAT)
+
+ tarinfo = tarfile.TarInfo("name")
+ tarinfo.uid = 04000000000000000000L
+ tarinfo.tobuf(tarfile.PAX_FORMAT)
+
+
+class ContextManagerTest(unittest.TestCase):
+
+ def test_basic(self):
+ with tarfile.open(tarname) as tar:
+ self.assertFalse(tar.closed, "closed inside runtime context")
+ self.assertTrue(tar.closed, "context manager failed")
+
+ def test_closed(self):
+ # The __enter__() method is supposed to raise IOError
+ # if the TarFile object is already closed.
+ tar = tarfile.open(tarname)
+ tar.close()
+ with self.assertRaises(IOError):
+ with tar:
+ pass
+
+ def test_exception(self):
+ # Test if the IOError exception is passed through properly.
+ with self.assertRaises(Exception) as exc:
+ with tarfile.open(tarname) as tar:
+ raise IOError
+ self.assertIsInstance(exc.exception, IOError,
+ "wrong exception raised in context manager")
+ self.assertTrue(tar.closed, "context manager failed")
+
+ def test_no_eof(self):
+ # __exit__() must not write end-of-archive blocks if an
+ # exception was raised.
try:
- tarfile.open("", mode="r", fileobj=fobj)
- except tarfile.ReadError:
- self.assertEqual(fobj.tell(), 0, "fileobj's position has moved")
+ with tarfile.open(tmpname, "w") as tar:
+ raise Exception
+ except:
+ pass
+ self.assertEqual(os.path.getsize(tmpname), 0,
+ "context manager wrote an end-of-archive block")
+ self.assertTrue(tar.closed, "context manager failed")
- def test_no_name_argument(self):
- fobj = open(testtar, "rb")
- self.tar.close()
- self.tar = tarfile.open(fileobj=fobj, mode="r")
- self.assertEqual(self.tar.name, os.path.abspath(fobj.name))
+ def test_eof(self):
+ # __exit__() must write end-of-archive blocks, i.e. call
+ # TarFile.close() if there was no error.
+ with tarfile.open(tmpname, "w"):
+ pass
+ self.assertNotEqual(os.path.getsize(tmpname), 0,
+ "context manager wrote no end-of-archive block")
+
+ def test_fileobj(self):
+ # Test that __exit__() did not close the external file
+ # object.
+ fobj = open(tmpname, "wb")
+ try:
+ with tarfile.open(fileobj=fobj, mode="w") as tar:
+ raise Exception
+ except:
+ pass
+ self.assertFalse(fobj.closed, "external file object was closed")
+ self.assertTrue(tar.closed, "context manager failed")
fobj.close()
- def test_no_name_attribute(self):
- fp = open(testtar, "rb")
- data = fp.read()
- fp.close()
- fobj = StringIO.StringIO(data)
- self.assertRaises(AttributeError, getattr, fobj, "name")
- self.tar.close()
- self.tar = tarfile.open(fileobj=fobj, mode="r")
- self.assertEqual(self.tar.name, None)
- def test_empty_name_attribute(self):
- fp = open(testtar, "rb")
- data = fp.read()
- fp.close()
- fobj = StringIO.StringIO(data)
- fobj.name = ""
- self.tar.close()
- self.tar = tarfile.open(fileobj=fobj, mode="r")
- self.assertEqual(self.tar.name, None)
+class LinkEmulationTest(ReadTest):
+ # Test for issue #8741 regression. On platforms that do not support
+ # symbolic or hard links tarfile tries to extract these types of members as
+ # the regular files they point to.
+ def _test_link_extraction(self, name):
+ self.tar.extract(name, TEMPDIR)
+ data = open(os.path.join(TEMPDIR, name), "rb").read()
+ self.assertEqual(md5sum(data), md5_regtype)
-if bz2:
- # Bzip2 TestCases
- class ReadTestBzip2(ReadTestGzip):
- comp = "bz2"
- class ReadStreamTestBzip2(ReadStreamTestGzip):
- comp = "bz2"
- class WriteTestBzip2(WriteTest):
- comp = "bz2"
- class WriteStreamTestBzip2(WriteStreamTestGzip):
- comp = "bz2"
- class ReadDetectTestBzip2(ReadDetectTest):
- comp = "bz2"
- class ReadDetectFileobjTestBzip2(ReadDetectFileobjTest):
- comp = "bz2"
- class ReadAsteriskTestBzip2(ReadAsteriskTest):
- comp = "bz2"
- class ReadStreamAsteriskTestBzip2(ReadStreamAsteriskTest):
- comp = "bz2"
- class ReadFileobjTestBzip2(ReadFileobjTest):
- comp = "bz2"
+ def test_hardlink_extraction1(self):
+ self._test_link_extraction("ustar/lnktype")
-# If importing gzip failed, discard the Gzip TestCases.
-if not gzip:
- del ReadTestGzip
- del ReadStreamTestGzip
- del WriteTestGzip
- del WriteStreamTestGzip
+ def test_hardlink_extraction2(self):
+ self._test_link_extraction("./ustar/linktest2/lnktype")
+
+ def test_symlink_extraction1(self):
+ self._test_link_extraction("ustar/symtype")
+
+ def test_symlink_extraction2(self):
+ self._test_link_extraction("./ustar/linktest2/symtype")
+
+
+class GzipMiscReadTest(MiscReadTest):
+ tarname = gzipname
+ mode = "r:gz"
+class GzipUstarReadTest(UstarReadTest):
+ tarname = gzipname
+ mode = "r:gz"
+class GzipStreamReadTest(StreamReadTest):
+ tarname = gzipname
+ mode = "r|gz"
+class GzipWriteTest(WriteTest):
+ mode = "w:gz"
+class GzipStreamWriteTest(StreamWriteTest):
+ mode = "w|gz"
+
+
+class Bz2MiscReadTest(MiscReadTest):
+ tarname = bz2name
+ mode = "r:bz2"
+class Bz2UstarReadTest(UstarReadTest):
+ tarname = bz2name
+ mode = "r:bz2"
+class Bz2StreamReadTest(StreamReadTest):
+ tarname = bz2name
+ mode = "r|bz2"
+class Bz2WriteTest(WriteTest):
+ mode = "w:bz2"
+class Bz2StreamWriteTest(StreamWriteTest):
+ mode = "w|bz2"
+
+class Bz2PartialReadTest(unittest.TestCase):
+ # Issue5068: The _BZ2Proxy.read() method loops forever
+ # on an empty or partial bzipped file.
+
+ def _test_partial_input(self, mode):
+ class MyStringIO(StringIO.StringIO):
+ hit_eof = False
+ def read(self, n):
+ if self.hit_eof:
+ raise AssertionError("infinite loop detected in tarfile.open()")
+ self.hit_eof = self.pos == self.len
+ return StringIO.StringIO.read(self, n)
+ def seek(self, *args):
+ self.hit_eof = False
+ return StringIO.StringIO.seek(self, *args)
+
+ data = bz2.compress(tarfile.TarInfo("foo").tobuf())
+ for x in range(len(data) + 1):
+ try:
+ tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
+ except tarfile.ReadError:
+ pass # we have no interest in ReadErrors
+
+ def test_partial_input(self):
+ self._test_partial_input("r")
+
+ def test_partial_input_bz2(self):
+ self._test_partial_input("r:bz2")
+
def test_main():
- # Create archive.
- f = open(tarname(), "rb")
- fguts = f.read()
- f.close()
- if gzip:
- # create testtar.tar.gz
- tar = gzip.open(tarname("gz"), "wb")
- tar.write(fguts)
- tar.close()
- if bz2:
- # create testtar.tar.bz2
- tar = bz2.BZ2File(tarname("bz2"), "wb")
- tar.write(fguts)
- tar.close()
+ os.makedirs(TEMPDIR)
tests = [
- FileModeTest,
- OpenFileobjTest,
- ReadTest,
- ReadStreamTest,
- ReadDetectTest,
- ReadDetectFileobjTest,
- ReadAsteriskTest,
- ReadStreamAsteriskTest,
- ReadFileobjTest,
+ UstarReadTest,
+ MiscReadTest,
+ StreamReadTest,
+ DetectReadTest,
+ MemberReadTest,
+ GNUReadTest,
+ PaxReadTest,
WriteTest,
- Write100Test,
- WriteSize0Test,
- WriteStreamTest,
- WriteGNULongTest,
- ReadGNULongTest,
+ StreamWriteTest,
+ GNUWriteTest,
+ PaxWriteTest,
+ UstarUnicodeTest,
+ GNUUnicodeTest,
+ PaxUnicodeTest,
+ AppendTest,
+ LimitsTest,
+ ContextManagerTest,
]
if hasattr(os, "link"):
- tests.append(ExtractHardlinkTest)
- tests.append(CreateHardlinkTest)
+ tests.append(HardlinkTest)
+ else:
+ tests.append(LinkEmulationTest)
+
+ fobj = open(tarname, "rb")
+ data = fobj.read()
+ fobj.close()
if gzip:
- tests.extend([
- ReadTestGzip, ReadStreamTestGzip,
- WriteTestGzip, WriteStreamTestGzip,
- ReadDetectTestGzip, ReadDetectFileobjTestGzip,
- ReadAsteriskTestGzip, ReadStreamAsteriskTestGzip,
- ReadFileobjTestGzip
- ])
+ # Create testtar.tar.gz and add gzip-specific tests.
+ tar = gzip.open(gzipname, "wb")
+ tar.write(data)
+ tar.close()
+
+ tests += [
+ GzipMiscReadTest,
+ GzipUstarReadTest,
+ GzipStreamReadTest,
+ GzipWriteTest,
+ GzipStreamWriteTest,
+ ]
if bz2:
- tests.extend([
- ReadTestBzip2, ReadStreamTestBzip2,
- WriteTestBzip2, WriteStreamTestBzip2,
- ReadDetectTestBzip2, ReadDetectFileobjTestBzip2,
- ReadAsteriskTestBzip2, ReadStreamAsteriskTestBzip2,
- ReadFileobjTestBzip2
- ])
+ # Create testtar.tar.bz2 and add bz2-specific tests.
+ tar = bz2.BZ2File(bz2name, "wb")
+ tar.write(data)
+ tar.close()
+
+ tests += [
+ Bz2MiscReadTest,
+ Bz2UstarReadTest,
+ Bz2StreamReadTest,
+ Bz2WriteTest,
+ Bz2StreamWriteTest,
+ Bz2PartialReadTest,
+ ]
+
try:
test_support.run_unittest(*tests)
finally:
- if gzip:
- os.remove(tarname("gz"))
- if bz2:
- os.remove(tarname("bz2"))
- if os.path.exists(dirname()):
- shutil.rmtree(dirname())
- if os.path.exists(tmpname()):
- os.remove(tmpname())
+ if os.path.exists(TEMPDIR):
+ shutil.rmtree(TEMPDIR)
if __name__ == "__main__":
test_main()
--
Repository URL: http://hg.python.org/jython
More information about the Jython-checkins
mailing list