[Python-Dev] Improved tmpfile module

Zack Weinberg zack@codesourcery.com
Mon, 24 Jun 2002 20:06:09 -0700


Attached please find a rewritten and improved tmpfile.py.  The major
change is to make the temporary file names significantly harder to
predict.  This foils denial-of-service attacks, where a hostile
program floods /tmp with files named @12345.NNNN to prevent process
12345 from creating any temp files.  It also makes the race condition
inherent in tmpfile.mktemp() somewhat harder to exploit.

I also implemented three new interfaces:

(fd, name) = mkstemp(suffix="", binary=1): Creates a temporary file,
returning both an OS-level file descriptor open on it and its name.
This is useful in situations where you need to know the name of the
temporary file, but can't risk the race in mktemp.

name = mkdtemp(suffix=""): Creates a temporary directory, without
race.

file = NamedTemporaryFile(mode='w+b', bufsize=-1, suffix=""): This is
just the non-POSIX version of tmpfile.TemporaryFile() made available
on all platforms, and with the .path attribute documented.  It
provides a convenient way to get a temporary file with a name, that
will be automatically deleted on close, and with a high-level file
object associated with it.

Finally, I tore out a lot of the posix/not-posix conditionals, relying
on the os module to provide open() and O_EXCL -- this should make all
the recommended interfaces race-safe on non-posix systems, which they
were not before.

Comments?  I would very much like to see something along these lines
in 2.3; I have an application that needs to be reliable in the face of
the aforementioned denial of service.

Please note that I wound up removing all the top-level 'del foo'
statements (cleaning up the namespace) as I could not figure out how
to do them properly.  I'm not a python guru.

zw

"""Temporary files and filenames."""

import os
from errno import EEXIST
from random import Random

__all__ = [
     "TemporaryFile", "NamedTemporaryFile",  # recommended (high level)
     "mkstemp", "mkdtemp",                   # recommended (low level)
     "mktemp", "gettempprefix",              # deprecated
     "tempdir", "template"                   # control
     ]

### Parameters that the caller may set to override the defaults.
tempdir = None

# _template contains an appropriate pattern for the name of each
# temporary file.

if os.name == 'nt':
    _template = '~%s~'
elif os.name in ('mac', 'riscos'):
    _template = 'Python-Tmp-%s'
else:
    _template = 'pyt%s' # better ideas?

### Recommended, user-visible interfaces.

_text_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL
if os.name == 'posix':
    _bin_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL
else:
    _bin_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL | os.O_BINARY

def mkstemp(suffix="", binary=1):
    """Function to create a named temporary file, with 'suffix' for
    its suffix.  Returns an OS-level handle to the file and the name,
    as a tuple.  If 'binary' is 1, the file is opened in binary mode,
    otherwise text mode (if this is a meaningful concept for the
    operating system in use).  In any case, the file is readable and
    writable only by the creating user, and executable by no one."""

    if binary: flags = _bin_openflags
    else: flags = _text_openflags

    while 1:
        name = _candidate_name(suffix)
        try:
            fd = os.open(name, flags, 0600)
            return (fd, name)
        except OSError, e:
            if e.errno == EEXIST:
                continue # try again
            raise

def mkdtemp(suffix=""):
    """Function to create a named temporary directory, with 'suffix'
    for its suffix.  Returns the name of the directory.  The directory
    is readable, writable, and searchable only by the creating user."""

    while 1:
        name = _candidate_name(suffix)
        try:
            os.mkdir(name, 0700)
            return name
        except OSError, e:
            if e.errno == EEXIST:
                continue # try again
            raise

class _TemporaryFileWrapper:
    """Temporary file wrapper

    This class provides a wrapper around files opened for temporary use.
    In particular, it seeks to automatically remove the file when it is
    no longer needed.
    """

    # Cache the unlinker so we don't get spurious errors at shutdown
    # when the module-level "os" is None'd out.  Note that this must
    # be referenced as self.unlink, because the name TemporaryFileWrapper
    # may also get None'd out before __del__ is called.
    unlink = os.unlink

    def __init__(self, file, path):
        self.file = file
        self.path = path
        self.close_called = 0

    def close(self):
        if not self.close_called:
            self.close_called = 1
            self.file.close()
            self.unlink(self.path)

    def __del__(self):
        self.close()

    def __getattr__(self, name):
        file = self.__dict__['file']
        a = getattr(file, name)
        if type(a) != type(0):
            setattr(self, name, a)
        return a

def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix=""):

    """Create a named temporary file, with 'suffix' for its suffix.
    It will automatically be deleted when it is closed.  Pass 'mode'
    and 'bufsize' to fdopen.  Returns a file object; the name of the
    file is accessible as file.path."""

    if 'b' in mode: binary = 1
    else: binary = 0

    (fd, name) = mkstemp(suffix, binary)
    file = os.fdopen(fd, mode, bufsize)
    return _TemporaryFileWrapper(file, name)

if os.name != 'posix':
    # A file cannot be unlinked while open, so TemporaryFile
    # degenerates to NamedTemporaryFile.
    TemporaryFile = NamedTemporaryFile
else:
    def TemporaryFile(mode='w+b', bufsize=-1, suffix=""):
        """Create a temporary file.  It has no name and will not
        survive being closed; the 'suffix' argument is ignored. Pass
        'mode' and 'bufsize' to fdopen.  Returns a file object."""

        if 'b' in mode: binary = 1
        else: binary = 0

        (fd, name) = mkstemp(binary=binary)
        file = os.fdopen(fd, mode, bufsize)
        os.unlink(name)
        return file

### Deprecated, user-visible interfaces.

def mktemp(suffix=""):
    """User-callable function to return a unique temporary file name."""
    while 1:
        name = _candidate_name(suffix)
        if not os.path.exists(name):
            return name

def gettempprefix():
    """Function to calculate a prefix of the filename to use.

    This incorporates the current process id on systems that support such a
    notion, so that concurrent processes don't generate the same prefix.
    """

    global _template
    return (_template % `os.getpid`) + '.'

### Threading gook.

try:
    from thread import allocate_lock
except ImportError:
    class _DummyMutex:
        def acquire(self): pass
        release = acquire
    def allocate_lock():
        return _DummyMutex()
    del _DummyMutex

_init_once_lock = allocate_lock()

def _init_once(var, constructor):
    """If 'var' is not None, initialize it to the return value from
    'constructor'.  Do this exactly once, no matter how many threads
    call this routine.

    FIXME: How would I cause 'var' to be passed by reference to this
    routine, so that the caller can write simply
        _init_once(foo, make_foo)
    instead of
        foo = _init_once(foo, make_foo)
    ?"""

    # Check once outside the lock, so we can avoid acquiring it if
    # the variable has already been initialized.
    if var is not None:
        return var

    try:
        _init_once_lock.acquire()
        # Check again inside the lock, in case someone else got
        # here first.
        if var is None:
            var = constructor()
    finally:
        _init_once_lock.release()
    return var

### Internal routines and data.

_seq = None

def _candidate_name(suffix):
    """Return a candidate temporary name in 'tempdir' (global) ending
    with 'suffix'."""

    # We have to make sure that _seq and tempdir are initialized only
    # once, even in the presence of multiple threads of control.
    global _seq
    global tempdir
    _seq = _init_once(_seq, _RandomFilenameSequence)
    tempdir = _init_once(tempdir, _gettempdir)

    # Most of the work is done by _RandomFilenameSequence.
    return os.path.join(tempdir, _seq.get()) + suffix

class _RandomFilenameSequence:
    characters = (  "abcdefghijklmnopqrstuvwxyz"
                  + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
                  + "0123456789-_")

    def __init__(self):
        self.mutex = allocate_lock()
        self.rng = Random()

    def get(self):
        global _template

	# Only one thread can call into the RNG at a time.
        self.mutex.acquire()

        c = self.characters
        r = self.rng

        letters = ''.join([r.choice(c), r.choice(c), r.choice(c),
                           r.choice(c), r.choice(c), r.choice(c)])
        self.mutex.release()

        return (_template % letters)

# XXX This tries to be not UNIX specific, but I don't know beans about
# how to choose a temp directory or filename on MS-DOS or other
# systems so it may have to be changed...

# _gettempdir deduces whether a candidate temp dir is usable by
# trying to create a file in it, and write to it.  If that succeeds,
# great, it closes the file and unlinks it.  There's a race, though:
# the *name* of the test file it tries is the same across all threads
# under most OSes (Linux is an exception), and letting multiple threads
# all try to open, write to, close, and unlink a single file can cause
# a variety of bogus errors (e.g., you cannot unlink a file under
# Windows if anyone has it open, and two threads cannot create the
# same file in O_EXCL mode under Unix).  The simplest cure is to serialize
# calls to _gettempdir, which is done above in _candidate_name().

def _gettempdir():
    """Function to calculate the directory to use."""
    try:
        pwd = os.getcwd()
    except (AttributeError, os.error):
        pwd = os.curdir
    attempdirs = ['/tmp', '/var/tmp', '/usr/tmp', pwd]
    if os.name == 'nt':
        attempdirs.insert(0, 'C:\\TEMP')
        attempdirs.insert(0, '\\TEMP')
    elif os.name == 'mac':
        import macfs, MACFS
        try:
            refnum, dirid = macfs.FindFolder(MACFS.kOnSystemDisk,
                                             MACFS.kTemporaryFolderType, 1)
            dirname = macfs.FSSpec((refnum, dirid, '')).as_pathname()
            attempdirs.insert(0, dirname)
        except macfs.error:
            pass
    elif os.name == 'riscos':
        scrapdir = os.getenv('Wimp$ScrapDir')
        if scrapdir:
            attempdirs.insert(0, scrapdir)
    for envname in 'TMPDIR', 'TEMP', 'TMP':
        if os.environ.has_key(envname):
            attempdirs.insert(0, os.environ[envname])
    testfile = gettempprefix() + 'test'
    for dir in attempdirs:
        try:
            filename = os.path.join(dir, testfile)
            fd = os.open(filename,
                         os.O_RDWR | os.O_CREAT | os.O_EXCL, 0700)
            fp = os.fdopen(fd, 'w')
            fp.write('blat')
            fp.close()
            os.unlink(filename)
            del fp, fd
            return dir
        except IOError:
            pass

    msg = "Can't find a usable temporary directory amongst " + `attempdirs`
    raise IOError, msg