[Python-checkins] gh-74696: Do not change the current working directory in shutil.make_archive() if possible (GH-93160)

ambv webhook-mailer at python.org
Wed Jun 22 04:47:37 EDT 2022


https://github.com/python/cpython/commit/fda4b2f06364ae5ef91ecd9c09e2af380c8b0b4c
commit: fda4b2f06364ae5ef91ecd9c09e2af380c8b0b4c
branch: main
author: Serhiy Storchaka <storchaka at gmail.com>
committer: ambv <lukasz at langa.pl>
date: 2022-06-22T10:47:25+02:00
summary:

gh-74696: Do not change the current working directory in shutil.make_archive() if possible (GH-93160)

It is no longer changed when create a zip or tar archive.

It is still changed for custom archivers registered with shutil.register_archive_format()
if root_dir is not None.

Co-authored-by: Éric <merwok at netwok.org>
Co-authored-by: Łukasz Langa <lukasz at langa.pl>

files:
A Misc/NEWS.d/next/Library/2022-05-24-11-19-04.gh-issue-74696.-cnf-A.rst
M Doc/library/shutil.rst
M Lib/shutil.py
M Lib/test/test_shutil.py

diff --git a/Doc/library/shutil.rst b/Doc/library/shutil.rst
index 9a25b0d008bf5..e79caecf310f2 100644
--- a/Doc/library/shutil.rst
+++ b/Doc/library/shutil.rst
@@ -574,12 +574,18 @@ provided.  They rely on the :mod:`zipfile` and :mod:`tarfile` modules.
 
    .. note::
 
-      This function is not thread-safe.
+      This function is not thread-safe when custom archivers registered
+      with :func:`register_archive_format` are used.  In this case it
+      temporarily changes the current working directory of the process
+      to perform archiving.
 
    .. versionchanged:: 3.8
       The modern pax (POSIX.1-2001) format is now used instead of
       the legacy GNU format for archives created with ``format="tar"``.
 
+   .. versionchanged:: 3.10.6
+      This function is now made thread-safe during creation of standard
+      ``.zip`` and tar archives.
 
 .. function:: get_archive_formats()
 
diff --git a/Lib/shutil.py b/Lib/shutil.py
index 2cbd808abf2ff..f4128647861b8 100644
--- a/Lib/shutil.py
+++ b/Lib/shutil.py
@@ -897,7 +897,7 @@ def _get_uid(name):
     return None
 
 def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
-                  owner=None, group=None, logger=None):
+                  owner=None, group=None, logger=None, root_dir=None):
     """Create a (possibly compressed) tar file from all the files under
     'base_dir'.
 
@@ -954,14 +954,20 @@ def _set_uid_gid(tarinfo):
 
     if not dry_run:
         tar = tarfile.open(archive_name, 'w|%s' % tar_compression)
+        arcname = base_dir
+        if root_dir is not None:
+            base_dir = os.path.join(root_dir, base_dir)
         try:
-            tar.add(base_dir, filter=_set_uid_gid)
+            tar.add(base_dir, arcname, filter=_set_uid_gid)
         finally:
             tar.close()
 
+    if root_dir is not None:
+        archive_name = os.path.abspath(archive_name)
     return archive_name
 
-def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):
+def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0,
+                  logger=None, owner=None, group=None, root_dir=None):
     """Create a zip file from all the files under 'base_dir'.
 
     The output zip file will be named 'base_name' + ".zip".  Returns the
@@ -985,42 +991,60 @@ def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):
     if not dry_run:
         with zipfile.ZipFile(zip_filename, "w",
                              compression=zipfile.ZIP_DEFLATED) as zf:
-            path = os.path.normpath(base_dir)
-            if path != os.curdir:
-                zf.write(path, path)
+            arcname = os.path.normpath(base_dir)
+            if root_dir is not None:
+                base_dir = os.path.join(root_dir, base_dir)
+            base_dir = os.path.normpath(base_dir)
+            if arcname != os.curdir:
+                zf.write(base_dir, arcname)
                 if logger is not None:
-                    logger.info("adding '%s'", path)
+                    logger.info("adding '%s'", base_dir)
             for dirpath, dirnames, filenames in os.walk(base_dir):
+                arcdirpath = dirpath
+                if root_dir is not None:
+                    arcdirpath = os.path.relpath(arcdirpath, root_dir)
+                arcdirpath = os.path.normpath(arcdirpath)
                 for name in sorted(dirnames):
-                    path = os.path.normpath(os.path.join(dirpath, name))
-                    zf.write(path, path)
+                    path = os.path.join(dirpath, name)
+                    arcname = os.path.join(arcdirpath, name)
+                    zf.write(path, arcname)
                     if logger is not None:
                         logger.info("adding '%s'", path)
                 for name in filenames:
-                    path = os.path.normpath(os.path.join(dirpath, name))
+                    path = os.path.join(dirpath, name)
+                    path = os.path.normpath(path)
                     if os.path.isfile(path):
-                        zf.write(path, path)
+                        arcname = os.path.join(arcdirpath, name)
+                        zf.write(path, arcname)
                         if logger is not None:
                             logger.info("adding '%s'", path)
 
+    if root_dir is not None:
+        zip_filename = os.path.abspath(zip_filename)
     return zip_filename
 
+# Maps the name of the archive format to a tuple containing:
+# * the archiving function
+# * extra keyword arguments
+# * description
+# * does it support the root_dir argument?
 _ARCHIVE_FORMATS = {
-    'tar':   (_make_tarball, [('compress', None)], "uncompressed tar file"),
+    'tar':   (_make_tarball, [('compress', None)],
+              "uncompressed tar file", True),
 }
 
 if _ZLIB_SUPPORTED:
     _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')],
-                                "gzip'ed tar-file")
-    _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file")
+                                "gzip'ed tar-file", True)
+    _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file", True)
 
 if _BZ2_SUPPORTED:
     _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
-                                "bzip2'ed tar-file")
+                                "bzip2'ed tar-file", True)
 
 if _LZMA_SUPPORTED:
     _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')],
-                                "xz'ed tar-file")
+                                "xz'ed tar-file", True)
 
 def get_archive_formats():
     """Returns a list of supported formats for archiving and unarchiving.
@@ -1051,7 +1075,7 @@ def register_archive_format(name, function, extra_args=None, description=''):
         if not isinstance(element, (tuple, list)) or len(element) !=2:
             raise TypeError('extra_args elements are : (arg_name, value)')
 
-    _ARCHIVE_FORMATS[name] = (function, extra_args, description)
+    _ARCHIVE_FORMATS[name] = (function, extra_args, description, False)
 
 def unregister_archive_format(name):
     del _ARCHIVE_FORMATS[name]
@@ -1075,36 +1099,38 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
     uses the current owner and group.
     """
     sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir)
-    save_cwd = os.getcwd()
-    if root_dir is not None:
-        if logger is not None:
-            logger.debug("changing into '%s'", root_dir)
-        base_name = os.path.abspath(base_name)
-        if not dry_run:
-            os.chdir(root_dir)
-
-    if base_dir is None:
-        base_dir = os.curdir
-
-    kwargs = {'dry_run': dry_run, 'logger': logger}
-
     try:
         format_info = _ARCHIVE_FORMATS[format]
     except KeyError:
         raise ValueError("unknown archive format '%s'" % format) from None
 
+    kwargs = {'dry_run': dry_run, 'logger': logger,
+              'owner': owner, 'group': group}
+
     func = format_info[0]
     for arg, val in format_info[1]:
         kwargs[arg] = val
 
-    if format != 'zip':
-        kwargs['owner'] = owner
-        kwargs['group'] = group
+    if base_dir is None:
+        base_dir = os.curdir
+
+    support_root_dir = format_info[3]
+    save_cwd = None
+    if root_dir is not None:
+        if support_root_dir:
+            kwargs['root_dir'] = root_dir
+        else:
+            save_cwd = os.getcwd()
+            if logger is not None:
+                logger.debug("changing into '%s'", root_dir)
+            base_name = os.path.abspath(base_name)
+            if not dry_run:
+                os.chdir(root_dir)
 
     try:
         filename = func(base_name, base_dir, **kwargs)
     finally:
-        if root_dir is not None:
+        if save_cwd is not None:
             if logger is not None:
                 logger.debug("changing back to '%s'", save_cwd)
             os.chdir(save_cwd)
@@ -1217,6 +1243,11 @@ def _unpack_tarfile(filename, extract_dir):
     finally:
         tarobj.close()
 
+# Maps the name of the unpack format to a tuple containing:
+# * extensions
+# * the unpacking function
+# * extra keyword arguments
+# * description
 _UNPACK_FORMATS = {
     'tar':   (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
     'zip':   (['.zip'], _unpack_zipfile, [], "ZIP file"),
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
index 18421fca9c21b..6fa07399007a3 100644
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -51,6 +51,9 @@
 except ImportError:
     _winapi = None
 
+no_chdir = unittest.mock.patch('os.chdir',
+        side_effect=AssertionError("shouldn't call os.chdir()"))
+
 def _fake_rename(*args, **kwargs):
     # Pretend the destination path is on a different filesystem.
     raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
@@ -1342,7 +1345,7 @@ def test_make_tarball(self):
         work_dir = os.path.dirname(tmpdir2)
         rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
 
-        with os_helper.change_cwd(work_dir):
+        with os_helper.change_cwd(work_dir), no_chdir:
             base_name = os.path.abspath(rel_base_name)
             tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
 
@@ -1356,7 +1359,7 @@ def test_make_tarball(self):
                                    './file1', './file2', './sub/file3'])
 
         # trying an uncompressed one
-        with os_helper.change_cwd(work_dir):
+        with os_helper.change_cwd(work_dir), no_chdir:
             tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
         self.assertEqual(tarball, base_name + '.tar')
         self.assertTrue(os.path.isfile(tarball))
@@ -1392,7 +1395,8 @@ def _create_files(self, base_dir='dist'):
     def test_tarfile_vs_tar(self):
         root_dir, base_dir = self._create_files()
         base_name = os.path.join(self.mkdtemp(), 'archive')
-        tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
+        with no_chdir:
+            tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
 
         # check if the compressed tarball was created
         self.assertEqual(tarball, base_name + '.tar.gz')
@@ -1409,13 +1413,15 @@ def test_tarfile_vs_tar(self):
         self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
 
         # trying an uncompressed one
-        tarball = make_archive(base_name, 'tar', root_dir, base_dir)
+        with no_chdir:
+            tarball = make_archive(base_name, 'tar', root_dir, base_dir)
         self.assertEqual(tarball, base_name + '.tar')
         self.assertTrue(os.path.isfile(tarball))
 
         # now for a dry_run
-        tarball = make_archive(base_name, 'tar', root_dir, base_dir,
-                               dry_run=True)
+        with no_chdir:
+            tarball = make_archive(base_name, 'tar', root_dir, base_dir,
+                                   dry_run=True)
         self.assertEqual(tarball, base_name + '.tar')
         self.assertTrue(os.path.isfile(tarball))
 
@@ -1431,7 +1437,7 @@ def test_make_zipfile(self):
         work_dir = os.path.dirname(tmpdir2)
         rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
 
-        with os_helper.change_cwd(work_dir):
+        with os_helper.change_cwd(work_dir), no_chdir:
             base_name = os.path.abspath(rel_base_name)
             res = make_archive(rel_base_name, 'zip', root_dir)
 
@@ -1444,7 +1450,7 @@ def test_make_zipfile(self):
                      'dist/file1', 'dist/file2', 'dist/sub/file3',
                      'outer'])
 
-        with os_helper.change_cwd(work_dir):
+        with os_helper.change_cwd(work_dir), no_chdir:
             base_name = os.path.abspath(rel_base_name)
             res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
 
@@ -1462,7 +1468,8 @@ def test_make_zipfile(self):
     def test_zipfile_vs_zip(self):
         root_dir, base_dir = self._create_files()
         base_name = os.path.join(self.mkdtemp(), 'archive')
-        archive = make_archive(base_name, 'zip', root_dir, base_dir)
+        with no_chdir:
+            archive = make_archive(base_name, 'zip', root_dir, base_dir)
 
         # check if ZIP file  was created
         self.assertEqual(archive, base_name + '.zip')
@@ -1488,7 +1495,8 @@ def test_zipfile_vs_zip(self):
     def test_unzip_zipfile(self):
         root_dir, base_dir = self._create_files()
         base_name = os.path.join(self.mkdtemp(), 'archive')
-        archive = make_archive(base_name, 'zip', root_dir, base_dir)
+        with no_chdir:
+            archive = make_archive(base_name, 'zip', root_dir, base_dir)
 
         # check if ZIP file  was created
         self.assertEqual(archive, base_name + '.zip')
@@ -1546,7 +1554,7 @@ def test_tarfile_root_owner(self):
         base_name = os.path.join(self.mkdtemp(), 'archive')
         group = grp.getgrgid(0)[0]
         owner = pwd.getpwuid(0)[0]
-        with os_helper.change_cwd(root_dir):
+        with os_helper.change_cwd(root_dir), no_chdir:
             archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
                                         owner=owner, group=group)
 
@@ -1564,23 +1572,30 @@ def test_tarfile_root_owner(self):
 
     def test_make_archive_cwd(self):
         current_dir = os.getcwd()
+        root_dir = self.mkdtemp()
         def _breaks(*args, **kw):
             raise RuntimeError()
+        dirs = []
+        def _chdir(path):
+            dirs.append(path)
+            orig_chdir(path)
 
         register_archive_format('xxx', _breaks, [], 'xxx file')
         try:
-            try:
-                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
-            except Exception:
-                pass
+            with support.swap_attr(os, 'chdir', _chdir) as orig_chdir:
+                try:
+                    make_archive('xxx', 'xxx', root_dir=root_dir)
+                except Exception:
+                    pass
             self.assertEqual(os.getcwd(), current_dir)
+            self.assertEqual(dirs, [root_dir, current_dir])
         finally:
             unregister_archive_format('xxx')
 
     def test_make_tarfile_in_curdir(self):
         # Issue #21280
         root_dir = self.mkdtemp()
-        with os_helper.change_cwd(root_dir):
+        with os_helper.change_cwd(root_dir), no_chdir:
             self.assertEqual(make_archive('test', 'tar'), 'test.tar')
             self.assertTrue(os.path.isfile('test.tar'))
 
@@ -1588,7 +1603,7 @@ def test_make_tarfile_in_curdir(self):
     def test_make_zipfile_in_curdir(self):
         # Issue #21280
         root_dir = self.mkdtemp()
-        with os_helper.change_cwd(root_dir):
+        with os_helper.change_cwd(root_dir), no_chdir:
             self.assertEqual(make_archive('test', 'zip'), 'test.zip')
             self.assertTrue(os.path.isfile('test.zip'))
 
diff --git a/Misc/NEWS.d/next/Library/2022-05-24-11-19-04.gh-issue-74696.-cnf-A.rst b/Misc/NEWS.d/next/Library/2022-05-24-11-19-04.gh-issue-74696.-cnf-A.rst
new file mode 100644
index 0000000000000..5b2e460e9ea07
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-05-24-11-19-04.gh-issue-74696.-cnf-A.rst
@@ -0,0 +1,2 @@
+:func:`shutil.make_archive` no longer temporarily changes the current
+working directory during creation of standard ``.zip`` or tar archives.



More information about the Python-checkins mailing list