[Python-checkins] bpo-26660, bpo-35144: Fix permission errors in TemporaryDirectory cleanup. (GH-10320)

Serhiy Storchaka webhook-mailer at python.org
Fri May 31 04:30:41 EDT 2019


https://github.com/python/cpython/commit/e9b51c0ad81da1da11ae65840ac8b50a8521373c
commit: e9b51c0ad81da1da11ae65840ac8b50a8521373c
branch: master
author: Serhiy Storchaka <storchaka at gmail.com>
committer: GitHub <noreply at github.com>
date: 2019-05-31T11:30:37+03:00
summary:

bpo-26660, bpo-35144: Fix permission errors in TemporaryDirectory cleanup. (GH-10320)

TemporaryDirectory.cleanup() failed when non-writeable or non-searchable
files or directories were created inside a temporary directory.

files:
A Misc/NEWS.d/next/Library/2018-11-04-16-39-46.bpo-26660.RdXz8a.rst
M Lib/shutil.py
M Lib/tempfile.py
M Lib/test/test_tempfile.py

diff --git a/Lib/shutil.py b/Lib/shutil.py
index dae916b41605..6486cd6e5d28 100644
--- a/Lib/shutil.py
+++ b/Lib/shutil.py
@@ -584,11 +584,16 @@ def _rmtree_safe_fd(topfd, path, onerror):
         fullname = os.path.join(path, entry.name)
         try:
             is_dir = entry.is_dir(follow_symlinks=False)
-            if is_dir:
-                orig_st = entry.stat(follow_symlinks=False)
-                is_dir = stat.S_ISDIR(orig_st.st_mode)
         except OSError:
             is_dir = False
+        else:
+            if is_dir:
+                try:
+                    orig_st = entry.stat(follow_symlinks=False)
+                    is_dir = stat.S_ISDIR(orig_st.st_mode)
+                except OSError:
+                    onerror(os.lstat, fullname, sys.exc_info())
+                    continue
         if is_dir:
             try:
                 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd)
diff --git a/Lib/tempfile.py b/Lib/tempfile.py
index a66d6f3750cb..e8b111eae223 100644
--- a/Lib/tempfile.py
+++ b/Lib/tempfile.py
@@ -777,9 +777,39 @@ def __init__(self, suffix=None, prefix=None, dir=None):
             self, self._cleanup, self.name,
             warn_message="Implicitly cleaning up {!r}".format(self))
 
+    @classmethod
+    def _rmtree(cls, name):
+        def onerror(func, path, exc_info):
+            if issubclass(exc_info[0], PermissionError):
+                def resetperms(path):
+                    try:
+                        _os.chflags(path, 0)
+                    except AttributeError:
+                        pass
+                    _os.chmod(path, 0o700)
+
+                try:
+                    if path != name:
+                        resetperms(_os.path.dirname(path))
+                    resetperms(path)
+
+                    try:
+                        _os.unlink(path)
+                    # PermissionError is raised on FreeBSD for directories
+                    except (IsADirectoryError, PermissionError):
+                        cls._rmtree(path)
+                except FileNotFoundError:
+                    pass
+            elif issubclass(exc_info[0], FileNotFoundError):
+                pass
+            else:
+                raise
+
+        _shutil.rmtree(name, onerror=onerror)
+
     @classmethod
     def _cleanup(cls, name, warn_message):
-        _shutil.rmtree(name)
+        cls._rmtree(name)
         _warnings.warn(warn_message, ResourceWarning)
 
     def __repr__(self):
@@ -793,4 +823,4 @@ def __exit__(self, exc, value, tb):
 
     def cleanup(self):
         if self._finalizer.detach():
-            _shutil.rmtree(self.name)
+            self._rmtree(self.name)
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
index 489141d6ad72..bd4db839331b 100644
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -1297,19 +1297,25 @@ def __exit__(self, *exc_info):
 class TestTemporaryDirectory(BaseTestCase):
     """Test TemporaryDirectory()."""
 
-    def do_create(self, dir=None, pre="", suf="", recurse=1):
+    def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1):
         if dir is None:
             dir = tempfile.gettempdir()
         tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf)
         self.nameCheck(tmp.name, dir, pre, suf)
-        # Create a subdirectory and some files
-        if recurse:
-            d1 = self.do_create(tmp.name, pre, suf, recurse-1)
-            d1.name = None
-        with open(os.path.join(tmp.name, "test.txt"), "wb") as f:
-            f.write(b"Hello world!")
+        self.do_create2(tmp.name, recurse, dirs, files)
         return tmp
 
+    def do_create2(self, path, recurse=1, dirs=1, files=1):
+        # Create subdirectories and some files
+        if recurse:
+            for i in range(dirs):
+                name = os.path.join(path, "dir%d" % i)
+                os.mkdir(name)
+                self.do_create2(name, recurse-1, dirs, files)
+        for i in range(files):
+            with open(os.path.join(path, "test%d.txt" % i), "wb") as f:
+                f.write(b"Hello world!")
+
     def test_mkdtemp_failure(self):
         # Check no additional exception if mkdtemp fails
         # Previously would raise AttributeError instead
@@ -1349,7 +1355,7 @@ def test_cleanup_with_symlink_to_a_directory(self):
                          "TemporaryDirectory %s exists after cleanup" % d1.name)
         self.assertTrue(os.path.exists(d2.name),
                         "Directory pointed to by a symlink was deleted")
-        self.assertEqual(os.listdir(d2.name), ['test.txt'],
+        self.assertEqual(os.listdir(d2.name), ['test0.txt'],
                          "Contents of the directory pointed to by a symlink "
                          "were deleted")
         d2.cleanup()
@@ -1384,7 +1390,7 @@ def test_del_on_shutdown(self):
 
                     tmp2 = os.path.join(tmp.name, 'test_dir')
                     os.mkdir(tmp2)
-                    with open(os.path.join(tmp2, "test.txt"), "w") as f:
+                    with open(os.path.join(tmp2, "test0.txt"), "w") as f:
                         f.write("Hello world!")
 
                     {mod}.tmp = tmp
@@ -1452,6 +1458,33 @@ def test_context_manager(self):
             self.assertEqual(name, d.name)
         self.assertFalse(os.path.exists(name))
 
+    def test_modes(self):
+        for mode in range(8):
+            mode <<= 6
+            with self.subTest(mode=format(mode, '03o')):
+                d = self.do_create(recurse=3, dirs=2, files=2)
+                with d:
+                    # Change files and directories mode recursively.
+                    for root, dirs, files in os.walk(d.name, topdown=False):
+                        for name in files:
+                            os.chmod(os.path.join(root, name), mode)
+                        os.chmod(root, mode)
+                    d.cleanup()
+                self.assertFalse(os.path.exists(d.name))
+
+    @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags')
+    def test_flags(self):
+        flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
+        d = self.do_create(recurse=3, dirs=2, files=2)
+        with d:
+            # Change files and directories flags recursively.
+            for root, dirs, files in os.walk(d.name, topdown=False):
+                for name in files:
+                    os.chflags(os.path.join(root, name), flags)
+                os.chflags(root, flags)
+            d.cleanup()
+        self.assertFalse(os.path.exists(d.name))
+
 
 if __name__ == "__main__":
     unittest.main()
diff --git a/Misc/NEWS.d/next/Library/2018-11-04-16-39-46.bpo-26660.RdXz8a.rst b/Misc/NEWS.d/next/Library/2018-11-04-16-39-46.bpo-26660.RdXz8a.rst
new file mode 100644
index 000000000000..4448bf6b0164
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-11-04-16-39-46.bpo-26660.RdXz8a.rst
@@ -0,0 +1,4 @@
+Fixed permission errors in :class:`~tempfile.TemporaryDirectory` clean up.
+Previously ``TemporaryDirectory.cleanup()`` failed when non-writeable or
+non-searchable files or directories were created inside a temporary
+directory.



More information about the Python-checkins mailing list