[Python-checkins] bpo-40692: Run more test_concurrent_futures tests (GH-20239)

pablogsal webhook-mailer at python.org
Sun Feb 7 22:16:04 EST 2021


https://github.com/python/cpython/commit/bf2e7e55d7306b1e2fce7dce767e8df5ff42cf1c
commit: bf2e7e55d7306b1e2fce7dce767e8df5ff42cf1c
branch: master
author: Asheesh Laroia <github at asheesh.org>
committer: pablogsal <Pablogsal at gmail.com>
date: 2021-02-08T03:15:51Z
summary:

bpo-40692: Run more test_concurrent_futures tests (GH-20239)

In the case of multiprocessing.synchronize() being missing, the
test_concurrent_futures test suite now skips only the tests that
require multiprocessing.synchronize().

Validate that multiprocessing.synchronize exists as part of
_check_system_limits(), allowing ProcessPoolExecutor to raise
NotImplementedError during __init__, rather than crashing with
ImportError during __init__ when creating a lock imported from
multiprocessing.synchronize.

Use _check_system_limits() to disable tests of
ProcessPoolExecutor on systems without multiprocessing.synchronize.

Running the test suite without multiprocessing.synchronize reveals
that Lib/compileall.py crashes when it uses a ProcessPoolExecutor.
Therefore, change Lib/compileall.py to call _check_system_limits()
before creating the ProcessPoolExecutor.

Note that both Lib/compileall.py and Lib/test/test_compileall.py
were attempting to sanity-check ProcessPoolExecutor by expecting
ImportError. In multiprocessing.resource_tracker, sem_unlink() is also absent
on platforms where POSIX semaphores aren't available. Avoid using
sem_unlink() if it, too, does not exist.

Co-authored-by: Pablo Galindo <Pablogsal at gmail.com>

files:
A Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst
M Lib/compileall.py
M Lib/concurrent/futures/process.py
M Lib/multiprocessing/resource_tracker.py
M Lib/test/test_compileall.py
M Lib/test/test_concurrent_futures.py

diff --git a/Lib/compileall.py b/Lib/compileall.py
index fe7f450c55e1c..672cb43971869 100644
--- a/Lib/compileall.py
+++ b/Lib/compileall.py
@@ -84,12 +84,14 @@ def compile_dir(dir, maxlevels=None, ddir=None, force=False,
     if workers < 0:
         raise ValueError('workers must be greater or equal to 0')
     if workers != 1:
+        # Check if this is a system where ProcessPoolExecutor can function.
+        from concurrent.futures.process import _check_system_limits
         try:
-            # Only import when needed, as low resource platforms may
-            # fail to import it
-            from concurrent.futures import ProcessPoolExecutor
-        except ImportError:
+            _check_system_limits()
+        except NotImplementedError:
             workers = 1
+        else:
+            from concurrent.futures import ProcessPoolExecutor
     if maxlevels is None:
         maxlevels = sys.getrecursionlimit()
     files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels)
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 90bc98bf2ecd1..764719859f7ce 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -532,6 +532,14 @@ def _check_system_limits():
         if _system_limited:
             raise NotImplementedError(_system_limited)
     _system_limits_checked = True
+    try:
+        import multiprocessing.synchronize
+    except ImportError:
+        _system_limited = (
+            "This Python build lacks multiprocessing.synchronize, usually due "
+            "to named semaphores being unavailable on this platform."
+        )
+        raise NotImplementedError(_system_limited)
     try:
         nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
     except (AttributeError, ValueError):
diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py
index c9bfa9b82b6e6..cc42dbdda05b9 100644
--- a/Lib/multiprocessing/resource_tracker.py
+++ b/Lib/multiprocessing/resource_tracker.py
@@ -37,8 +37,16 @@
     import _multiprocessing
     import _posixshmem
 
+    # Use sem_unlink() to clean up named semaphores.
+    #
+    # sem_unlink() may be missing if the Python build process detected the
+    # absence of POSIX named semaphores. In that case, no named semaphores were
+    # ever opened, so no cleanup would be necessary.
+    if hasattr(_multiprocessing, 'sem_unlink'):
+        _CLEANUP_FUNCS.update({
+            'semaphore': _multiprocessing.sem_unlink,
+        })
     _CLEANUP_FUNCS.update({
-        'semaphore': _multiprocessing.sem_unlink,
         'shared_memory': _posixshmem.shm_unlink,
     })
 
diff --git a/Lib/test/test_compileall.py b/Lib/test/test_compileall.py
index be1149a87faef..fa24b3c5a11dd 100644
--- a/Lib/test/test_compileall.py
+++ b/Lib/test/test_compileall.py
@@ -16,10 +16,14 @@
 import unittest
 
 from unittest import mock, skipUnless
+from concurrent.futures import ProcessPoolExecutor
 try:
-    from concurrent.futures import ProcessPoolExecutor
+    # compileall relies on ProcessPoolExecutor if ProcessPoolExecutor exists
+    # and it can function.
+    from concurrent.futures.process import _check_system_limits
+    _check_system_limits()
     _have_multiprocessing = True
-except ImportError:
+except NotImplementedError:
     _have_multiprocessing = False
 
 from test import support
@@ -188,6 +192,7 @@ def test_compile_dir_pathlike(self):
         self.assertRegex(line, r'Listing ([^WindowsPath|PosixPath].*)')
         self.assertTrue(os.path.isfile(self.bc_path))
 
+    @skipUnless(_have_multiprocessing, "requires multiprocessing")
     @mock.patch('concurrent.futures.ProcessPoolExecutor')
     def test_compile_pool_called(self, pool_mock):
         compileall.compile_dir(self.directory, quiet=True, workers=5)
@@ -198,11 +203,13 @@ def test_compile_workers_non_positive(self):
                                     "workers must be greater or equal to 0"):
             compileall.compile_dir(self.directory, workers=-1)
 
+    @skipUnless(_have_multiprocessing, "requires multiprocessing")
     @mock.patch('concurrent.futures.ProcessPoolExecutor')
     def test_compile_workers_cpu_count(self, pool_mock):
         compileall.compile_dir(self.directory, quiet=True, workers=0)
         self.assertEqual(pool_mock.call_args[1]['max_workers'], None)
 
+    @skipUnless(_have_multiprocessing, "requires multiprocessing")
     @mock.patch('concurrent.futures.ProcessPoolExecutor')
     @mock.patch('compileall.compile_file')
     def test_compile_one_worker(self, compile_file_mock, pool_mock):
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index a182b14fb9bc0..99651f5f4ed4d 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -4,8 +4,6 @@
 
 # Skip tests if _multiprocessing wasn't built.
 import_helper.import_module('_multiprocessing')
-# Skip tests if sem_open implementation is broken.
-support.skip_if_broken_multiprocessing_synchronize()
 
 from test.support import hashlib_helper
 from test.support.script_helper import assert_python_ok
@@ -27,7 +25,7 @@
 from concurrent.futures._base import (
     PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
     BrokenExecutor)
-from concurrent.futures.process import BrokenProcessPool
+from concurrent.futures.process import BrokenProcessPool, _check_system_limits
 from multiprocessing import get_context
 
 import multiprocessing.process
@@ -161,6 +159,10 @@ class ProcessPoolForkMixin(ExecutorMixin):
     ctx = "fork"
 
     def get_context(self):
+        try:
+            _check_system_limits()
+        except NotImplementedError:
+            self.skipTest("ProcessPoolExecutor unavailable on this system")
         if sys.platform == "win32":
             self.skipTest("require unix system")
         return super().get_context()
@@ -170,12 +172,23 @@ class ProcessPoolSpawnMixin(ExecutorMixin):
     executor_type = futures.ProcessPoolExecutor
     ctx = "spawn"
 
+    def get_context(self):
+        try:
+            _check_system_limits()
+        except NotImplementedError:
+            self.skipTest("ProcessPoolExecutor unavailable on this system")
+        return super().get_context()
+
 
 class ProcessPoolForkserverMixin(ExecutorMixin):
     executor_type = futures.ProcessPoolExecutor
     ctx = "forkserver"
 
     def get_context(self):
+        try:
+            _check_system_limits()
+        except NotImplementedError:
+            self.skipTest("ProcessPoolExecutor unavailable on this system")
         if sys.platform == "win32":
             self.skipTest("require unix system")
         return super().get_context()
diff --git a/Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst b/Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst
new file mode 100644
index 0000000000000..b92dcdd00affc
--- /dev/null
+++ b/Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst	
@@ -0,0 +1 @@
+In the :class:`concurrent.futures.ProcessPoolExecutor`, validate that :func:`multiprocess.synchronize` is available on a given platform and rely on that check in the :mod:`concurrent.futures` test suite so we can run tests that are unrelated to :class:`ProcessPoolExecutor` on those platforms.



More information about the Python-checkins mailing list