
Attached please find a rewritten and improved tmpfile.py. The major change is to make the temporary file names significantly harder to predict. This foils denial-of-service attacks, where a hostile program floods /tmp with files named @12345.NNNN to prevent process 12345 from creating any temp files. It also makes the race condition inherent in tmpfile.mktemp() somewhat harder to exploit. I also implemented three new interfaces: (fd, name) = mkstemp(suffix="", binary=1): Creates a temporary file, returning both an OS-level file descriptor open on it and its name. This is useful in situations where you need to know the name of the temporary file, but can't risk the race in mktemp. name = mkdtemp(suffix=""): Creates a temporary directory, without race. file = NamedTemporaryFile(mode='w+b', bufsize=-1, suffix=""): This is just the non-POSIX version of tmpfile.TemporaryFile() made available on all platforms, and with the .path attribute documented. It provides a convenient way to get a temporary file with a name, that will be automatically deleted on close, and with a high-level file object associated with it. Finally, I tore out a lot of the posix/not-posix conditionals, relying on the os module to provide open() and O_EXCL -- this should make all the recommended interfaces race-safe on non-posix systems, which they were not before. Comments? I would very much like to see something along these lines in 2.3; I have an application that needs to be reliable in the face of the aforementioned denial of service. Please note that I wound up removing all the top-level 'del foo' statements (cleaning up the namespace) as I could not figure out how to do them properly. I'm not a python guru. zw """Temporary files and filenames.""" import os from errno import EEXIST from random import Random __all__ = [ "TemporaryFile", "NamedTemporaryFile", # recommended (high level) "mkstemp", "mkdtemp", # recommended (low level) "mktemp", "gettempprefix", # deprecated "tempdir", "template" # control ] ### Parameters that the caller may set to override the defaults. tempdir = None # _template contains an appropriate pattern for the name of each # temporary file. if os.name == 'nt': _template = '~%s~' elif os.name in ('mac', 'riscos'): _template = 'Python-Tmp-%s' else: _template = 'pyt%s' # better ideas? ### Recommended, user-visible interfaces. _text_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL if os.name == 'posix': _bin_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL else: _bin_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL | os.O_BINARY def mkstemp(suffix="", binary=1): """Function to create a named temporary file, with 'suffix' for its suffix. Returns an OS-level handle to the file and the name, as a tuple. If 'binary' is 1, the file is opened in binary mode, otherwise text mode (if this is a meaningful concept for the operating system in use). In any case, the file is readable and writable only by the creating user, and executable by no one.""" if binary: flags = _bin_openflags else: flags = _text_openflags while 1: name = _candidate_name(suffix) try: fd = os.open(name, flags, 0600) return (fd, name) except OSError, e: if e.errno == EEXIST: continue # try again raise def mkdtemp(suffix=""): """Function to create a named temporary directory, with 'suffix' for its suffix. Returns the name of the directory. The directory is readable, writable, and searchable only by the creating user.""" while 1: name = _candidate_name(suffix) try: os.mkdir(name, 0700) return name except OSError, e: if e.errno == EEXIST: continue # try again raise class _TemporaryFileWrapper: """Temporary file wrapper This class provides a wrapper around files opened for temporary use. In particular, it seeks to automatically remove the file when it is no longer needed. """ # Cache the unlinker so we don't get spurious errors at shutdown # when the module-level "os" is None'd out. Note that this must # be referenced as self.unlink, because the name TemporaryFileWrapper # may also get None'd out before __del__ is called. unlink = os.unlink def __init__(self, file, path): self.file = file self.path = path self.close_called = 0 def close(self): if not self.close_called: self.close_called = 1 self.file.close() self.unlink(self.path) def __del__(self): self.close() def __getattr__(self, name): file = self.__dict__['file'] a = getattr(file, name) if type(a) != type(0): setattr(self, name, a) return a def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix=""): """Create a named temporary file, with 'suffix' for its suffix. It will automatically be deleted when it is closed. Pass 'mode' and 'bufsize' to fdopen. Returns a file object; the name of the file is accessible as file.path.""" if 'b' in mode: binary = 1 else: binary = 0 (fd, name) = mkstemp(suffix, binary) file = os.fdopen(fd, mode, bufsize) return _TemporaryFileWrapper(file, name) if os.name != 'posix': # A file cannot be unlinked while open, so TemporaryFile # degenerates to NamedTemporaryFile. TemporaryFile = NamedTemporaryFile else: def TemporaryFile(mode='w+b', bufsize=-1, suffix=""): """Create a temporary file. It has no name and will not survive being closed; the 'suffix' argument is ignored. Pass 'mode' and 'bufsize' to fdopen. Returns a file object.""" if 'b' in mode: binary = 1 else: binary = 0 (fd, name) = mkstemp(binary=binary) file = os.fdopen(fd, mode, bufsize) os.unlink(name) return file ### Deprecated, user-visible interfaces. def mktemp(suffix=""): """User-callable function to return a unique temporary file name.""" while 1: name = _candidate_name(suffix) if not os.path.exists(name): return name def gettempprefix(): """Function to calculate a prefix of the filename to use. This incorporates the current process id on systems that support such a notion, so that concurrent processes don't generate the same prefix. """ global _template return (_template % `os.getpid`) + '.' ### Threading gook. try: from thread import allocate_lock except ImportError: class _DummyMutex: def acquire(self): pass release = acquire def allocate_lock(): return _DummyMutex() del _DummyMutex _init_once_lock = allocate_lock() def _init_once(var, constructor): """If 'var' is not None, initialize it to the return value from 'constructor'. Do this exactly once, no matter how many threads call this routine. FIXME: How would I cause 'var' to be passed by reference to this routine, so that the caller can write simply _init_once(foo, make_foo) instead of foo = _init_once(foo, make_foo) ?""" # Check once outside the lock, so we can avoid acquiring it if # the variable has already been initialized. if var is not None: return var try: _init_once_lock.acquire() # Check again inside the lock, in case someone else got # here first. if var is None: var = constructor() finally: _init_once_lock.release() return var ### Internal routines and data. _seq = None def _candidate_name(suffix): """Return a candidate temporary name in 'tempdir' (global) ending with 'suffix'.""" # We have to make sure that _seq and tempdir are initialized only # once, even in the presence of multiple threads of control. global _seq global tempdir _seq = _init_once(_seq, _RandomFilenameSequence) tempdir = _init_once(tempdir, _gettempdir) # Most of the work is done by _RandomFilenameSequence. return os.path.join(tempdir, _seq.get()) + suffix class _RandomFilenameSequence: characters = ( "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "0123456789-_") def __init__(self): self.mutex = allocate_lock() self.rng = Random() def get(self): global _template # Only one thread can call into the RNG at a time. self.mutex.acquire() c = self.characters r = self.rng letters = ''.join([r.choice(c), r.choice(c), r.choice(c), r.choice(c), r.choice(c), r.choice(c)]) self.mutex.release() return (_template % letters) # XXX This tries to be not UNIX specific, but I don't know beans about # how to choose a temp directory or filename on MS-DOS or other # systems so it may have to be changed... # _gettempdir deduces whether a candidate temp dir is usable by # trying to create a file in it, and write to it. If that succeeds, # great, it closes the file and unlinks it. There's a race, though: # the *name* of the test file it tries is the same across all threads # under most OSes (Linux is an exception), and letting multiple threads # all try to open, write to, close, and unlink a single file can cause # a variety of bogus errors (e.g., you cannot unlink a file under # Windows if anyone has it open, and two threads cannot create the # same file in O_EXCL mode under Unix). The simplest cure is to serialize # calls to _gettempdir, which is done above in _candidate_name(). def _gettempdir(): """Function to calculate the directory to use.""" try: pwd = os.getcwd() except (AttributeError, os.error): pwd = os.curdir attempdirs = ['/tmp', '/var/tmp', '/usr/tmp', pwd] if os.name == 'nt': attempdirs.insert(0, 'C:\\TEMP') attempdirs.insert(0, '\\TEMP') elif os.name == 'mac': import macfs, MACFS try: refnum, dirid = macfs.FindFolder(MACFS.kOnSystemDisk, MACFS.kTemporaryFolderType, 1) dirname = macfs.FSSpec((refnum, dirid, '')).as_pathname() attempdirs.insert(0, dirname) except macfs.error: pass elif os.name == 'riscos': scrapdir = os.getenv('Wimp$ScrapDir') if scrapdir: attempdirs.insert(0, scrapdir) for envname in 'TMPDIR', 'TEMP', 'TMP': if os.environ.has_key(envname): attempdirs.insert(0, os.environ[envname]) testfile = gettempprefix() + 'test' for dir in attempdirs: try: filename = os.path.join(dir, testfile) fd = os.open(filename, os.O_RDWR | os.O_CREAT | os.O_EXCL, 0700) fp = os.fdopen(fd, 'w') fp.write('blat') fp.close() os.unlink(filename) del fp, fd return dir except IOError: pass msg = "Can't find a usable temporary directory amongst " + `attempdirs` raise IOError, msg