[Python-Dev] Improved tmpfile module
Zack Weinberg
zack@codesourcery.com
Mon, 24 Jun 2002 20:06:09 -0700
Attached please find a rewritten and improved tmpfile.py. The major
change is to make the temporary file names significantly harder to
predict. This foils denial-of-service attacks, where a hostile
program floods /tmp with files named @12345.NNNN to prevent process
12345 from creating any temp files. It also makes the race condition
inherent in tmpfile.mktemp() somewhat harder to exploit.
I also implemented three new interfaces:
(fd, name) = mkstemp(suffix="", binary=1): Creates a temporary file,
returning both an OS-level file descriptor open on it and its name.
This is useful in situations where you need to know the name of the
temporary file, but can't risk the race in mktemp.
name = mkdtemp(suffix=""): Creates a temporary directory, without
race.
file = NamedTemporaryFile(mode='w+b', bufsize=-1, suffix=""): This is
just the non-POSIX version of tmpfile.TemporaryFile() made available
on all platforms, and with the .path attribute documented. It
provides a convenient way to get a temporary file with a name, that
will be automatically deleted on close, and with a high-level file
object associated with it.
Finally, I tore out a lot of the posix/not-posix conditionals, relying
on the os module to provide open() and O_EXCL -- this should make all
the recommended interfaces race-safe on non-posix systems, which they
were not before.
Comments? I would very much like to see something along these lines
in 2.3; I have an application that needs to be reliable in the face of
the aforementioned denial of service.
Please note that I wound up removing all the top-level 'del foo'
statements (cleaning up the namespace) as I could not figure out how
to do them properly. I'm not a python guru.
zw
"""Temporary files and filenames."""
import os
from errno import EEXIST
from random import Random
__all__ = [
"TemporaryFile", "NamedTemporaryFile", # recommended (high level)
"mkstemp", "mkdtemp", # recommended (low level)
"mktemp", "gettempprefix", # deprecated
"tempdir", "template" # control
]
### Parameters that the caller may set to override the defaults.
tempdir = None
# _template contains an appropriate pattern for the name of each
# temporary file.
if os.name == 'nt':
_template = '~%s~'
elif os.name in ('mac', 'riscos'):
_template = 'Python-Tmp-%s'
else:
_template = 'pyt%s' # better ideas?
### Recommended, user-visible interfaces.
_text_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL
if os.name == 'posix':
_bin_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL
else:
_bin_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL | os.O_BINARY
def mkstemp(suffix="", binary=1):
"""Function to create a named temporary file, with 'suffix' for
its suffix. Returns an OS-level handle to the file and the name,
as a tuple. If 'binary' is 1, the file is opened in binary mode,
otherwise text mode (if this is a meaningful concept for the
operating system in use). In any case, the file is readable and
writable only by the creating user, and executable by no one."""
if binary: flags = _bin_openflags
else: flags = _text_openflags
while 1:
name = _candidate_name(suffix)
try:
fd = os.open(name, flags, 0600)
return (fd, name)
except OSError, e:
if e.errno == EEXIST:
continue # try again
raise
def mkdtemp(suffix=""):
"""Function to create a named temporary directory, with 'suffix'
for its suffix. Returns the name of the directory. The directory
is readable, writable, and searchable only by the creating user."""
while 1:
name = _candidate_name(suffix)
try:
os.mkdir(name, 0700)
return name
except OSError, e:
if e.errno == EEXIST:
continue # try again
raise
class _TemporaryFileWrapper:
"""Temporary file wrapper
This class provides a wrapper around files opened for temporary use.
In particular, it seeks to automatically remove the file when it is
no longer needed.
"""
# Cache the unlinker so we don't get spurious errors at shutdown
# when the module-level "os" is None'd out. Note that this must
# be referenced as self.unlink, because the name TemporaryFileWrapper
# may also get None'd out before __del__ is called.
unlink = os.unlink
def __init__(self, file, path):
self.file = file
self.path = path
self.close_called = 0
def close(self):
if not self.close_called:
self.close_called = 1
self.file.close()
self.unlink(self.path)
def __del__(self):
self.close()
def __getattr__(self, name):
file = self.__dict__['file']
a = getattr(file, name)
if type(a) != type(0):
setattr(self, name, a)
return a
def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix=""):
"""Create a named temporary file, with 'suffix' for its suffix.
It will automatically be deleted when it is closed. Pass 'mode'
and 'bufsize' to fdopen. Returns a file object; the name of the
file is accessible as file.path."""
if 'b' in mode: binary = 1
else: binary = 0
(fd, name) = mkstemp(suffix, binary)
file = os.fdopen(fd, mode, bufsize)
return _TemporaryFileWrapper(file, name)
if os.name != 'posix':
# A file cannot be unlinked while open, so TemporaryFile
# degenerates to NamedTemporaryFile.
TemporaryFile = NamedTemporaryFile
else:
def TemporaryFile(mode='w+b', bufsize=-1, suffix=""):
"""Create a temporary file. It has no name and will not
survive being closed; the 'suffix' argument is ignored. Pass
'mode' and 'bufsize' to fdopen. Returns a file object."""
if 'b' in mode: binary = 1
else: binary = 0
(fd, name) = mkstemp(binary=binary)
file = os.fdopen(fd, mode, bufsize)
os.unlink(name)
return file
### Deprecated, user-visible interfaces.
def mktemp(suffix=""):
"""User-callable function to return a unique temporary file name."""
while 1:
name = _candidate_name(suffix)
if not os.path.exists(name):
return name
def gettempprefix():
"""Function to calculate a prefix of the filename to use.
This incorporates the current process id on systems that support such a
notion, so that concurrent processes don't generate the same prefix.
"""
global _template
return (_template % `os.getpid`) + '.'
### Threading gook.
try:
from thread import allocate_lock
except ImportError:
class _DummyMutex:
def acquire(self): pass
release = acquire
def allocate_lock():
return _DummyMutex()
del _DummyMutex
_init_once_lock = allocate_lock()
def _init_once(var, constructor):
"""If 'var' is not None, initialize it to the return value from
'constructor'. Do this exactly once, no matter how many threads
call this routine.
FIXME: How would I cause 'var' to be passed by reference to this
routine, so that the caller can write simply
_init_once(foo, make_foo)
instead of
foo = _init_once(foo, make_foo)
?"""
# Check once outside the lock, so we can avoid acquiring it if
# the variable has already been initialized.
if var is not None:
return var
try:
_init_once_lock.acquire()
# Check again inside the lock, in case someone else got
# here first.
if var is None:
var = constructor()
finally:
_init_once_lock.release()
return var
### Internal routines and data.
_seq = None
def _candidate_name(suffix):
"""Return a candidate temporary name in 'tempdir' (global) ending
with 'suffix'."""
# We have to make sure that _seq and tempdir are initialized only
# once, even in the presence of multiple threads of control.
global _seq
global tempdir
_seq = _init_once(_seq, _RandomFilenameSequence)
tempdir = _init_once(tempdir, _gettempdir)
# Most of the work is done by _RandomFilenameSequence.
return os.path.join(tempdir, _seq.get()) + suffix
class _RandomFilenameSequence:
characters = ( "abcdefghijklmnopqrstuvwxyz"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "0123456789-_")
def __init__(self):
self.mutex = allocate_lock()
self.rng = Random()
def get(self):
global _template
# Only one thread can call into the RNG at a time.
self.mutex.acquire()
c = self.characters
r = self.rng
letters = ''.join([r.choice(c), r.choice(c), r.choice(c),
r.choice(c), r.choice(c), r.choice(c)])
self.mutex.release()
return (_template % letters)
# XXX This tries to be not UNIX specific, but I don't know beans about
# how to choose a temp directory or filename on MS-DOS or other
# systems so it may have to be changed...
# _gettempdir deduces whether a candidate temp dir is usable by
# trying to create a file in it, and write to it. If that succeeds,
# great, it closes the file and unlinks it. There's a race, though:
# the *name* of the test file it tries is the same across all threads
# under most OSes (Linux is an exception), and letting multiple threads
# all try to open, write to, close, and unlink a single file can cause
# a variety of bogus errors (e.g., you cannot unlink a file under
# Windows if anyone has it open, and two threads cannot create the
# same file in O_EXCL mode under Unix). The simplest cure is to serialize
# calls to _gettempdir, which is done above in _candidate_name().
def _gettempdir():
"""Function to calculate the directory to use."""
try:
pwd = os.getcwd()
except (AttributeError, os.error):
pwd = os.curdir
attempdirs = ['/tmp', '/var/tmp', '/usr/tmp', pwd]
if os.name == 'nt':
attempdirs.insert(0, 'C:\\TEMP')
attempdirs.insert(0, '\\TEMP')
elif os.name == 'mac':
import macfs, MACFS
try:
refnum, dirid = macfs.FindFolder(MACFS.kOnSystemDisk,
MACFS.kTemporaryFolderType, 1)
dirname = macfs.FSSpec((refnum, dirid, '')).as_pathname()
attempdirs.insert(0, dirname)
except macfs.error:
pass
elif os.name == 'riscos':
scrapdir = os.getenv('Wimp$ScrapDir')
if scrapdir:
attempdirs.insert(0, scrapdir)
for envname in 'TMPDIR', 'TEMP', 'TMP':
if os.environ.has_key(envname):
attempdirs.insert(0, os.environ[envname])
testfile = gettempprefix() + 'test'
for dir in attempdirs:
try:
filename = os.path.join(dir, testfile)
fd = os.open(filename,
os.O_RDWR | os.O_CREAT | os.O_EXCL, 0700)
fp = os.fdopen(fd, 'w')
fp.write('blat')
fp.close()
os.unlink(filename)
del fp, fd
return dir
except IOError:
pass
msg = "Can't find a usable temporary directory amongst " + `attempdirs`
raise IOError, msg