[Python-checkins] bpo-40275: Avoid importing socket in test.support (GH-19603)

Serhiy Storchaka webhook-mailer at python.org
Sat Apr 25 03:06:34 EDT 2020


https://github.com/python/cpython/commit/16994912c93e8e5db7365d48b75d67d3f70dd7b2
commit: 16994912c93e8e5db7365d48b75d67d3f70dd7b2
branch: master
author: Serhiy Storchaka <storchaka at gmail.com>
committer: GitHub <noreply at github.com>
date: 2020-04-25T10:06:29+03:00
summary:

bpo-40275: Avoid importing socket in test.support (GH-19603)

* Move socket related functions from test.support to socket_helper.
* Import socket, nntplib and urllib.error lazily in transient_internet().
* Remove importing multiprocess.

files:
A Lib/test/support/socket_helper.py
M Doc/library/test.rst
M Lib/test/_test_multiprocessing.py
M Lib/test/eintrdata/eintr_tester.py
M Lib/test/ssl_servers.py
M Lib/test/support/__init__.py
M Lib/test/test_asynchat.py
M Lib/test/test_asyncio/test_base_events.py
M Lib/test/test_asyncio/test_events.py
M Lib/test/test_asyncio/test_proactor_events.py
M Lib/test/test_asyncio/test_sendfile.py
M Lib/test/test_asyncio/test_server.py
M Lib/test/test_asyncio/test_sock_lowlevel.py
M Lib/test/test_asyncio/test_streams.py
M Lib/test/test_asyncio/test_unix_events.py
M Lib/test/test_asyncore.py
M Lib/test/test_ftplib.py
M Lib/test/test_httplib.py
M Lib/test/test_imaplib.py
M Lib/test/test_largefile.py
M Lib/test/test_logging.py
M Lib/test/test_nntplib.py
M Lib/test/test_os.py
M Lib/test/test_poplib.py
M Lib/test/test_robotparser.py
M Lib/test/test_selectors.py
M Lib/test/test_smtpd.py
M Lib/test/test_smtplib.py
M Lib/test/test_socket.py
M Lib/test/test_socketserver.py
M Lib/test/test_ssl.py
M Lib/test/test_stat.py
M Lib/test/test_support.py
M Lib/test/test_telnetlib.py
M Lib/test/test_timeout.py
M Lib/test/test_wsgiref.py
M Lib/test/test_xmlrpc.py

diff --git a/Doc/library/test.rst b/Doc/library/test.rst
index 0573c275981c7..1e6b1116212ef 100644
--- a/Doc/library/test.rst
+++ b/Doc/library/test.rst
@@ -348,11 +348,6 @@ The :mod:`test.support` module defines the following constants:
    :data:`SHORT_TIMEOUT`.
 
 
-.. data:: IPV6_ENABLED
-
-    Set to ``True`` if IPV6 is enabled on this host, ``False`` otherwise.
-
-
 .. data:: SAVEDCWD
 
    Set to :func:`os.getcwd`.
@@ -901,12 +896,6 @@ The :mod:`test.support` module defines the following functions:
    A decorator for running tests that require support for xattr.
 
 
-.. decorator:: skip_unless_bind_unix_socket
-
-   A decorator for running tests that require a functional bind() for Unix
-   sockets.
-
-
 .. decorator:: anticipate_failure(condition)
 
    A decorator to conditionally mark tests with
@@ -1157,31 +1146,6 @@ The :mod:`test.support` module defines the following functions:
    is raised.
 
 
-.. function:: bind_port(sock, host=HOST)
-
-   Bind the socket to a free port and return the port number.  Relies on
-   ephemeral ports in order to ensure we are using an unbound port.  This is
-   important as many tests may be running simultaneously, especially in a
-   buildbot environment.  This method raises an exception if the
-   ``sock.family`` is :const:`~socket.AF_INET` and ``sock.type`` is
-   :const:`~socket.SOCK_STREAM`, and the socket has
-   :const:`~socket.SO_REUSEADDR` or :const:`~socket.SO_REUSEPORT` set on it.
-   Tests should never set these socket options for TCP/IP sockets.
-   The only case for setting these options is testing multicasting via
-   multiple UDP sockets.
-
-   Additionally, if the :const:`~socket.SO_EXCLUSIVEADDRUSE` socket option is
-   available (i.e. on Windows), it will be set on the socket.  This will
-   prevent anyone else from binding to our host/port for the duration of the
-   test.
-
-
-.. function:: bind_unix_socket(sock, addr)
-
-   Bind a unix socket, raising :exc:`unittest.SkipTest` if
-   :exc:`PermissionError` is raised.
-
-
 .. function:: catch_threading_exception()
 
    Context manager catching :class:`threading.Thread` exception using
@@ -1243,29 +1207,6 @@ The :mod:`test.support` module defines the following functions:
    .. versionadded:: 3.8
 
 
-.. function:: find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM)
-
-   Returns an unused port that should be suitable for binding.  This is
-   achieved by creating a temporary socket with the same family and type as
-   the ``sock`` parameter (default is :const:`~socket.AF_INET`,
-   :const:`~socket.SOCK_STREAM`),
-   and binding it to the specified host address (defaults to ``0.0.0.0``)
-   with the port set to 0, eliciting an unused ephemeral port from the OS.
-   The temporary socket is then closed and deleted, and the ephemeral port is
-   returned.
-
-   Either this method or :func:`bind_port` should be used for any tests
-   where a server socket needs to be bound to a particular port for the
-   duration of the test.
-   Which one to use depends on whether the calling code is creating a Python
-   socket, or if an unused port needs to be provided in a constructor
-   or passed to an external program (i.e. the ``-accept`` argument to
-   openssl's s_server mode).  Always prefer :func:`bind_port` over
-   :func:`find_unused_port` where possible.  Using a hard coded port is
-   discouraged since it can make multiple instances of the test impossible to
-   run simultaneously, which is a problem for buildbots.
-
-
 .. function:: load_package_tests(pkg_dir, loader, standard_tests, pattern)
 
    Generic implementation of the :mod:`unittest` ``load_tests`` protocol for
@@ -1481,6 +1422,77 @@ The :mod:`test.support` module defines the following classes:
    it will be raised in :meth:`!__fspath__`.
 
 
+:mod:`test.support.socket_helper` --- Utilities for socket tests
+================================================================
+
+.. module:: test.support.socket_helper
+   :synopsis: Support for socket tests.
+
+
+The :mod:`test.support.socket_helper` module provides support for socket tests.
+
+.. versionadded:: 3.9
+
+
+.. data:: IPV6_ENABLED
+
+    Set to ``True`` if IPv6 is enabled on this host, ``False`` otherwise.
+
+
+.. function:: find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM)
+
+   Returns an unused port that should be suitable for binding.  This is
+   achieved by creating a temporary socket with the same family and type as
+   the ``sock`` parameter (default is :const:`~socket.AF_INET`,
+   :const:`~socket.SOCK_STREAM`),
+   and binding it to the specified host address (defaults to ``0.0.0.0``)
+   with the port set to 0, eliciting an unused ephemeral port from the OS.
+   The temporary socket is then closed and deleted, and the ephemeral port is
+   returned.
+
+   Either this method or :func:`bind_port` should be used for any tests
+   where a server socket needs to be bound to a particular port for the
+   duration of the test.
+   Which one to use depends on whether the calling code is creating a Python
+   socket, or if an unused port needs to be provided in a constructor
+   or passed to an external program (i.e. the ``-accept`` argument to
+   openssl's s_server mode).  Always prefer :func:`bind_port` over
+   :func:`find_unused_port` where possible.  Using a hard coded port is
+   discouraged since it can make multiple instances of the test impossible to
+   run simultaneously, which is a problem for buildbots.
+
+
+.. function:: bind_port(sock, host=HOST)
+
+   Bind the socket to a free port and return the port number.  Relies on
+   ephemeral ports in order to ensure we are using an unbound port.  This is
+   important as many tests may be running simultaneously, especially in a
+   buildbot environment.  This method raises an exception if the
+   ``sock.family`` is :const:`~socket.AF_INET` and ``sock.type`` is
+   :const:`~socket.SOCK_STREAM`, and the socket has
+   :const:`~socket.SO_REUSEADDR` or :const:`~socket.SO_REUSEPORT` set on it.
+   Tests should never set these socket options for TCP/IP sockets.
+   The only case for setting these options is testing multicasting via
+   multiple UDP sockets.
+
+   Additionally, if the :const:`~socket.SO_EXCLUSIVEADDRUSE` socket option is
+   available (i.e. on Windows), it will be set on the socket.  This will
+   prevent anyone else from binding to our host/port for the duration of the
+   test.
+
+
+.. function:: bind_unix_socket(sock, addr)
+
+   Bind a unix socket, raising :exc:`unittest.SkipTest` if
+   :exc:`PermissionError` is raised.
+
+
+.. decorator:: skip_unless_bind_unix_socket
+
+   A decorator for running tests that require a functional ``bind()`` for Unix
+   sockets.
+
+
 :mod:`test.support.script_helper` --- Utilities for the Python execution tests
 ==============================================================================
 
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 376f5e33466a4..083ad536a051c 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -26,6 +26,7 @@
 import test.support
 import test.support.script_helper
 from test import support
+from test.support import socket_helper
 
 
 # Skip tests if _multiprocessing wasn't built.
@@ -2928,7 +2929,7 @@ def test_remote(self):
         authkey = os.urandom(32)
 
         manager = QueueManager(
-            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
+            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER
             )
         manager.start()
         self.addCleanup(manager.shutdown)
@@ -2965,7 +2966,7 @@ def _putter(cls, address, authkey):
     def test_rapid_restart(self):
         authkey = os.urandom(32)
         manager = QueueManager(
-            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
+            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER)
         try:
             srvr = manager.get_server()
             addr = srvr.address
@@ -3455,7 +3456,7 @@ def _listener(cls, conn, families):
             new_conn.close()
             l.close()
 
-        l = socket.create_server((test.support.HOST, 0))
+        l = socket.create_server((socket_helper.HOST, 0))
         conn.send(l.getsockname())
         new_conn, addr = l.accept()
         conn.send(new_conn)
@@ -4593,7 +4594,7 @@ def _child_test_wait_socket(cls, address, slow):
 
     def test_wait_socket(self, slow=False):
         from multiprocessing.connection import wait
-        l = socket.create_server((test.support.HOST, 0))
+        l = socket.create_server((socket_helper.HOST, 0))
         addr = l.getsockname()
         readers = []
         procs = []
diff --git a/Lib/test/eintrdata/eintr_tester.py b/Lib/test/eintrdata/eintr_tester.py
index 4e0e30596f5a0..606f31b091096 100644
--- a/Lib/test/eintrdata/eintr_tester.py
+++ b/Lib/test/eintrdata/eintr_tester.py
@@ -22,6 +22,7 @@
 import unittest
 
 from test import support
+from test.support import socket_helper
 
 @contextlib.contextmanager
 def kill_on_error(proc):
@@ -283,14 +284,14 @@ def test_sendmsg(self):
         self._test_send(lambda sock, data: sock.sendmsg([data]))
 
     def test_accept(self):
-        sock = socket.create_server((support.HOST, 0))
+        sock = socket.create_server((socket_helper.HOST, 0))
         self.addCleanup(sock.close)
         port = sock.getsockname()[1]
 
         code = '\n'.join((
             'import socket, time',
             '',
-            'host = %r' % support.HOST,
+            'host = %r' % socket_helper.HOST,
             'port = %s' % port,
             'sleep_time = %r' % self.sleep_time,
             '',
diff --git a/Lib/test/ssl_servers.py b/Lib/test/ssl_servers.py
index 2e7e2359f565b..a4bd7455d47e7 100644
--- a/Lib/test/ssl_servers.py
+++ b/Lib/test/ssl_servers.py
@@ -9,10 +9,11 @@
     SimpleHTTPRequestHandler, BaseHTTPRequestHandler)
 
 from test import support
+from test.support import socket_helper
 
 here = os.path.dirname(__file__)
 
-HOST = support.HOST
+HOST = socket_helper.HOST
 CERTFILE = os.path.join(here, 'keycert.pem')
 
 # This one's based on HTTPServer, which is based on socketserver
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 2b3a4147246ee..15cf45da18e2c 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -16,12 +16,10 @@
 import importlib.util
 import locale
 import logging.handlers
-import nntplib
 import os
 import platform
 import re
 import shutil
-import socket
 import stat
 import struct
 import subprocess
@@ -33,16 +31,10 @@
 import time
 import types
 import unittest
-import urllib.error
 import warnings
 
 from .testresult import get_test_runner
 
-try:
-    import multiprocessing.process
-except ImportError:
-    multiprocessing = None
-
 try:
     import zlib
 except ImportError:
@@ -98,14 +90,13 @@
     "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
     "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
     "anticipate_failure", "load_package_tests", "detect_api_mismatch",
-    "check__all__", "skip_unless_bind_unix_socket", "skip_if_buggy_ucrt_strfptime",
+    "check__all__", "skip_if_buggy_ucrt_strfptime",
     "ignore_warnings",
     # sys
     "is_jython", "is_android", "check_impl_detail", "unix_shell",
     "setswitchinterval",
     # network
-    "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource",
-    "bind_unix_socket",
+    "open_urlresource",
     # processes
     'temp_umask', "reap_children",
     # logging
@@ -727,135 +718,6 @@ def wrapper(*args, **kwargs):
     return decorator
 
 
-HOST = "localhost"
-HOSTv4 = "127.0.0.1"
-HOSTv6 = "::1"
-
-
-def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
-    """Returns an unused port that should be suitable for binding.  This is
-    achieved by creating a temporary socket with the same family and type as
-    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
-    the specified host address (defaults to 0.0.0.0) with the port set to 0,
-    eliciting an unused ephemeral port from the OS.  The temporary socket is
-    then closed and deleted, and the ephemeral port is returned.
-
-    Either this method or bind_port() should be used for any tests where a
-    server socket needs to be bound to a particular port for the duration of
-    the test.  Which one to use depends on whether the calling code is creating
-    a python socket, or if an unused port needs to be provided in a constructor
-    or passed to an external program (i.e. the -accept argument to openssl's
-    s_server mode).  Always prefer bind_port() over find_unused_port() where
-    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
-    socket is bound to a hard coded port, the ability to run multiple instances
-    of the test simultaneously on the same host is compromised, which makes the
-    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
-    may simply manifest as a failed test, which can be recovered from without
-    intervention in most cases, but on Windows, the entire python process can
-    completely and utterly wedge, requiring someone to log in to the buildbot
-    and manually kill the affected process.
-
-    (This is easy to reproduce on Windows, unfortunately, and can be traced to
-    the SO_REUSEADDR socket option having different semantics on Windows versus
-    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
-    listen and then accept connections on identical host/ports.  An EADDRINUSE
-    OSError will be raised at some point (depending on the platform and
-    the order bind and listen were called on each socket).
-
-    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
-    will ever be raised when attempting to bind two identical host/ports. When
-    accept() is called on each socket, the second caller's process will steal
-    the port from the first caller, leaving them both in an awkwardly wedged
-    state where they'll no longer respond to any signals or graceful kills, and
-    must be forcibly killed via OpenProcess()/TerminateProcess().
-
-    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
-    instead of SO_REUSEADDR, which effectively affords the same semantics as
-    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
-    Source world compared to Windows ones, this is a common mistake.  A quick
-    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
-    openssl.exe is called with the 's_server' option, for example. See
-    http://bugs.python.org/issue2550 for more info.  The following site also
-    has a very thorough description about the implications of both REUSEADDR
-    and EXCLUSIVEADDRUSE on Windows:
-    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
-
-    XXX: although this approach is a vast improvement on previous attempts to
-    elicit unused ports, it rests heavily on the assumption that the ephemeral
-    port returned to us by the OS won't immediately be dished back out to some
-    other process when we close and delete our temporary socket but before our
-    calling code has a chance to bind the returned port.  We can deal with this
-    issue if/when we come across it.
-    """
-
-    with socket.socket(family, socktype) as tempsock:
-        port = bind_port(tempsock)
-    del tempsock
-    return port
-
-def bind_port(sock, host=HOST):
-    """Bind the socket to a free port and return the port number.  Relies on
-    ephemeral ports in order to ensure we are using an unbound port.  This is
-    important as many tests may be running simultaneously, especially in a
-    buildbot environment.  This method raises an exception if the sock.family
-    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
-    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
-    for TCP/IP sockets.  The only case for setting these options is testing
-    multicasting via multiple UDP sockets.
-
-    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
-    on Windows), it will be set on the socket.  This will prevent anyone else
-    from bind()'ing to our host/port for the duration of the test.
-    """
-
-    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
-        if hasattr(socket, 'SO_REUSEADDR'):
-            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
-                raise TestFailed("tests should never set the SO_REUSEADDR "   \
-                                 "socket option on TCP/IP sockets!")
-        if hasattr(socket, 'SO_REUSEPORT'):
-            try:
-                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
-                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
-                                     "socket option on TCP/IP sockets!")
-            except OSError:
-                # Python's socket module was compiled using modern headers
-                # thus defining SO_REUSEPORT but this process is running
-                # under an older kernel that does not support SO_REUSEPORT.
-                pass
-        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
-            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
-
-    sock.bind((host, 0))
-    port = sock.getsockname()[1]
-    return port
-
-def bind_unix_socket(sock, addr):
-    """Bind a unix socket, raising SkipTest if PermissionError is raised."""
-    assert sock.family == socket.AF_UNIX
-    try:
-        sock.bind(addr)
-    except PermissionError:
-        sock.close()
-        raise unittest.SkipTest('cannot bind AF_UNIX sockets')
-
-def _is_ipv6_enabled():
-    """Check whether IPv6 is enabled on this host."""
-    if socket.has_ipv6:
-        sock = None
-        try:
-            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
-            sock.bind((HOSTv6, 0))
-            return True
-        except OSError:
-            pass
-        finally:
-            if sock:
-                sock.close()
-    return False
-
-IPV6_ENABLED = _is_ipv6_enabled()
-
 def system_must_validate_cert(f):
     """Skip the test on TLS certificate validation failures."""
     @functools.wraps(f)
@@ -1563,31 +1425,13 @@ def __exit__(self, type_=None, value=None, traceback=None):
 ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
 
 
-def get_socket_conn_refused_errs():
-    """
-    Get the different socket error numbers ('errno') which can be received
-    when a connection is refused.
-    """
-    errors = [errno.ECONNREFUSED]
-    if hasattr(errno, 'ENETUNREACH'):
-        # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
-        errors.append(errno.ENETUNREACH)
-    if hasattr(errno, 'EADDRNOTAVAIL'):
-        # bpo-31910: socket.create_connection() fails randomly
-        # with EADDRNOTAVAIL on Travis CI
-        errors.append(errno.EADDRNOTAVAIL)
-    if hasattr(errno, 'EHOSTUNREACH'):
-        # bpo-37583: The destination host cannot be reached
-        errors.append(errno.EHOSTUNREACH)
-    if not IPV6_ENABLED:
-        errors.append(errno.EAFNOSUPPORT)
-    return errors
-
-
 @contextlib.contextmanager
 def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()):
     """Return a context manager that raises ResourceDenied when various issues
     with the Internet connection manifest themselves as exceptions."""
+    import socket
+    import nntplib
+    import urllib.error
     if timeout is _NOT_SET:
         timeout = INTERNET_TIMEOUT
 
@@ -2754,28 +2598,6 @@ def skip_if_pgo_task(test):
     msg = "Not run for (non-extended) PGO task"
     return test if ok else unittest.skip(msg)(test)
 
-_bind_nix_socket_error = None
-def skip_unless_bind_unix_socket(test):
-    """Decorator for tests requiring a functional bind() for unix sockets."""
-    if not hasattr(socket, 'AF_UNIX'):
-        return unittest.skip('No UNIX Sockets')(test)
-    global _bind_nix_socket_error
-    if _bind_nix_socket_error is None:
-        path = TESTFN + "can_bind_unix_socket"
-        with socket.socket(socket.AF_UNIX) as sock:
-            try:
-                sock.bind(path)
-                _bind_nix_socket_error = False
-            except OSError as e:
-                _bind_nix_socket_error = e
-            finally:
-                unlink(path)
-    if _bind_nix_socket_error:
-        msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
-        return unittest.skip(msg)(test)
-    else:
-        return test
-
 
 def fs_is_case_insensitive(directory):
     """Detects if the file system for the specified directory is case-insensitive."""
diff --git a/Lib/test/support/socket_helper.py b/Lib/test/support/socket_helper.py
new file mode 100644
index 0000000000000..5f4a7f19a3223
--- /dev/null
+++ b/Lib/test/support/socket_helper.py
@@ -0,0 +1,177 @@
+import errno
+import socket
+import unittest
+
+HOST = "localhost"
+HOSTv4 = "127.0.0.1"
+HOSTv6 = "::1"
+
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+    """Returns an unused port that should be suitable for binding.  This is
+    achieved by creating a temporary socket with the same family and type as
+    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+    the specified host address (defaults to 0.0.0.0) with the port set to 0,
+    eliciting an unused ephemeral port from the OS.  The temporary socket is
+    then closed and deleted, and the ephemeral port is returned.
+
+    Either this method or bind_port() should be used for any tests where a
+    server socket needs to be bound to a particular port for the duration of
+    the test.  Which one to use depends on whether the calling code is creating
+    a python socket, or if an unused port needs to be provided in a constructor
+    or passed to an external program (i.e. the -accept argument to openssl's
+    s_server mode).  Always prefer bind_port() over find_unused_port() where
+    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
+    socket is bound to a hard coded port, the ability to run multiple instances
+    of the test simultaneously on the same host is compromised, which makes the
+    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+    may simply manifest as a failed test, which can be recovered from without
+    intervention in most cases, but on Windows, the entire python process can
+    completely and utterly wedge, requiring someone to log in to the buildbot
+    and manually kill the affected process.
+
+    (This is easy to reproduce on Windows, unfortunately, and can be traced to
+    the SO_REUSEADDR socket option having different semantics on Windows versus
+    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+    listen and then accept connections on identical host/ports.  An EADDRINUSE
+    OSError will be raised at some point (depending on the platform and
+    the order bind and listen were called on each socket).
+
+    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+    will ever be raised when attempting to bind two identical host/ports. When
+    accept() is called on each socket, the second caller's process will steal
+    the port from the first caller, leaving them both in an awkwardly wedged
+    state where they'll no longer respond to any signals or graceful kills, and
+    must be forcibly killed via OpenProcess()/TerminateProcess().
+
+    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+    instead of SO_REUSEADDR, which effectively affords the same semantics as
+    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
+    Source world compared to Windows ones, this is a common mistake.  A quick
+    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+    openssl.exe is called with the 's_server' option, for example. See
+    http://bugs.python.org/issue2550 for more info.  The following site also
+    has a very thorough description about the implications of both REUSEADDR
+    and EXCLUSIVEADDRUSE on Windows:
+    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+    XXX: although this approach is a vast improvement on previous attempts to
+    elicit unused ports, it rests heavily on the assumption that the ephemeral
+    port returned to us by the OS won't immediately be dished back out to some
+    other process when we close and delete our temporary socket but before our
+    calling code has a chance to bind the returned port.  We can deal with this
+    issue if/when we come across it.
+    """
+
+    with socket.socket(family, socktype) as tempsock:
+        port = bind_port(tempsock)
+    del tempsock
+    return port
+
+def bind_port(sock, host=HOST):
+    """Bind the socket to a free port and return the port number.  Relies on
+    ephemeral ports in order to ensure we are using an unbound port.  This is
+    important as many tests may be running simultaneously, especially in a
+    buildbot environment.  This method raises an exception if the sock.family
+    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
+    for TCP/IP sockets.  The only case for setting these options is testing
+    multicasting via multiple UDP sockets.
+
+    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+    on Windows), it will be set on the socket.  This will prevent anyone else
+    from bind()'ing to our host/port for the duration of the test.
+    """
+
+    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+        if hasattr(socket, 'SO_REUSEADDR'):
+            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+                raise TestFailed("tests should never set the SO_REUSEADDR "   \
+                                 "socket option on TCP/IP sockets!")
+        if hasattr(socket, 'SO_REUSEPORT'):
+            try:
+                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
+                                     "socket option on TCP/IP sockets!")
+            except OSError:
+                # Python's socket module was compiled using modern headers
+                # thus defining SO_REUSEPORT but this process is running
+                # under an older kernel that does not support SO_REUSEPORT.
+                pass
+        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+    sock.bind((host, 0))
+    port = sock.getsockname()[1]
+    return port
+
+def bind_unix_socket(sock, addr):
+    """Bind a unix socket, raising SkipTest if PermissionError is raised."""
+    assert sock.family == socket.AF_UNIX
+    try:
+        sock.bind(addr)
+    except PermissionError:
+        sock.close()
+        raise unittest.SkipTest('cannot bind AF_UNIX sockets')
+
+def _is_ipv6_enabled():
+    """Check whether IPv6 is enabled on this host."""
+    if socket.has_ipv6:
+        sock = None
+        try:
+            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+            sock.bind((HOSTv6, 0))
+            return True
+        except OSError:
+            pass
+        finally:
+            if sock:
+                sock.close()
+    return False
+
+IPV6_ENABLED = _is_ipv6_enabled()
+
+
+_bind_nix_socket_error = None
+def skip_unless_bind_unix_socket(test):
+    """Decorator for tests requiring a functional bind() for unix sockets."""
+    if not hasattr(socket, 'AF_UNIX'):
+        return unittest.skip('No UNIX Sockets')(test)
+    global _bind_nix_socket_error
+    if _bind_nix_socket_error is None:
+        from test.support import TESTFN, unlink
+        path = TESTFN + "can_bind_unix_socket"
+        with socket.socket(socket.AF_UNIX) as sock:
+            try:
+                sock.bind(path)
+                _bind_nix_socket_error = False
+            except OSError as e:
+                _bind_nix_socket_error = e
+            finally:
+                unlink(path)
+    if _bind_nix_socket_error:
+        msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
+        return unittest.skip(msg)(test)
+    else:
+        return test
+
+
+def get_socket_conn_refused_errs():
+    """
+    Get the different socket error numbers ('errno') which can be received
+    when a connection is refused.
+    """
+    errors = [errno.ECONNREFUSED]
+    if hasattr(errno, 'ENETUNREACH'):
+        # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
+        errors.append(errno.ENETUNREACH)
+    if hasattr(errno, 'EADDRNOTAVAIL'):
+        # bpo-31910: socket.create_connection() fails randomly
+        # with EADDRNOTAVAIL on Travis CI
+        errors.append(errno.EADDRNOTAVAIL)
+    if hasattr(errno, 'EHOSTUNREACH'):
+        # bpo-37583: The destination host cannot be reached
+        errors.append(errno.EHOSTUNREACH)
+    if not IPV6_ENABLED:
+        errors.append(errno.EAFNOSUPPORT)
+    return errors
diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py
index ce8505777a2ef..004d368d76312 100644
--- a/Lib/test/test_asynchat.py
+++ b/Lib/test/test_asynchat.py
@@ -1,6 +1,7 @@
 # test asynchat
 
 from test import support
+from test.support import socket_helper
 
 import asynchat
 import asyncore
@@ -12,7 +13,7 @@
 import unittest
 import unittest.mock
 
-HOST = support.HOST
+HOST = socket_helper.HOST
 SERVER_QUIT = b'QUIT\n'
 
 
@@ -25,7 +26,7 @@ def __init__(self, event):
         threading.Thread.__init__(self)
         self.event = event
         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.port = support.bind_port(self.sock)
+        self.port = socket_helper.bind_port(self.sock)
         # This will be set if the client wants us to wait before echoing
         # data back.
         self.start_resend_event = None
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
index fc832d3467131..4adcbf46cc9eb 100644
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -17,6 +17,7 @@
 from test.test_asyncio import utils as test_utils
 from test import support
 from test.support.script_helper import assert_python_ok
+from test.support import socket_helper
 
 
 MOCK_ANY = mock.ANY
@@ -91,7 +92,7 @@ def test_ipaddr_info(self):
         self.assertIsNone(
             base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
 
-        if support.IPV6_ENABLED:
+        if socket_helper.IPV6_ENABLED:
             # IPv4 address with family IPv6.
             self.assertIsNone(
                 base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
@@ -1156,7 +1157,7 @@ def test_create_server_stream_bittype(self):
             srv.close()
             self.loop.run_until_complete(srv.wait_closed())
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'no IPv6 support')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
     def test_create_server_ipv6(self):
         async def main():
             with self.assertWarns(DeprecationWarning):
@@ -1288,7 +1289,7 @@ def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
             t.close()
             test_utils.run_briefly(self.loop)  # allow transport to close
 
-        if support.IPV6_ENABLED:
+        if socket_helper.IPV6_ENABLED:
             sock.family = socket.AF_INET6
             coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
             t, p = self.loop.run_until_complete(coro)
@@ -1307,7 +1308,7 @@ def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
                 t.close()
                 test_utils.run_briefly(self.loop)  # allow transport to close
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'no IPv6 support')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
     @unittest.skipIf(sys.platform.startswith('aix'),
                     "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX")
     @patch_socket
@@ -1639,7 +1640,7 @@ def test_create_datagram_endpoint_socket_err(self, m_socket):
         self.assertRaises(
             OSError, self.loop.run_until_complete, coro)
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
     def test_create_datagram_endpoint_no_matching_family(self):
         coro = self.loop.create_datagram_endpoint(
             asyncio.DatagramProtocol,
@@ -1700,7 +1701,7 @@ def test_create_datagram_endpoint_sock_unix(self):
         self.loop.run_until_complete(protocol.done)
         self.assertEqual('CLOSED', protocol.state)
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_create_datagram_endpoint_existing_sock_unix(self):
         with test_utils.unix_socket_path() as path:
             sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM)
@@ -2015,7 +2016,7 @@ def prepare(self):
         sock = self.make_socket()
         proto = self.MyProto(self.loop)
         server = self.run_loop(self.loop.create_server(
-            lambda: proto, support.HOST, 0, family=socket.AF_INET))
+            lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET))
         addr = server.sockets[0].getsockname()
 
         for _ in range(10):
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 1f71c1f0979e0..aa4daca0e4e03 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -32,6 +32,7 @@
 from asyncio import selector_events
 from test.test_asyncio import utils as test_utils
 from test import support
+from test.support import socket_helper
 from test.support import ALWAYS_EQ, LARGEST, SMALLEST
 
 
@@ -517,7 +518,7 @@ def test_create_connection(self):
                 lambda: MyProto(loop=self.loop), *httpd.address)
             self._basetest_create_connection(conn_fut)
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_create_unix_connection(self):
         # Issue #20682: On Mac OS X Tiger, getsockname() returns a
         # zero-length address for UNIX socket.
@@ -619,7 +620,7 @@ def test_create_ssl_connection(self):
             self._test_create_ssl_connection(httpd, create_connection,
                                              peername=httpd.address)
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     @unittest.skipIf(ssl is None, 'No ssl module')
     def test_create_ssl_unix_connection(self):
         # Issue #20682: On Mac OS X Tiger, getsockname() returns a
@@ -638,7 +639,7 @@ def test_create_ssl_unix_connection(self):
 
     def test_create_connection_local_addr(self):
         with test_utils.run_test_server() as httpd:
-            port = support.find_unused_port()
+            port = socket_helper.find_unused_port()
             f = self.loop.create_connection(
                 lambda: MyProto(loop=self.loop),
                 *httpd.address, local_addr=(httpd.address[0], port))
@@ -843,7 +844,7 @@ def _make_unix_server(self, factory, **kwargs):
 
         return server, path
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_create_unix_server(self):
         proto = MyProto(loop=self.loop)
         server, path = self._make_unix_server(lambda: proto)
@@ -935,7 +936,7 @@ def test_create_server_ssl(self):
         # stop serving
         server.close()
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     @unittest.skipIf(ssl is None, 'No ssl module')
     def test_create_unix_server_ssl(self):
         proto = MyProto(loop=self.loop)
@@ -995,7 +996,7 @@ def test_create_server_ssl_verify_failed(self):
         self.assertIsNone(proto.transport)
         server.close()
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     @unittest.skipIf(ssl is None, 'No ssl module')
     def test_create_unix_server_ssl_verify_failed(self):
         proto = MyProto(loop=self.loop)
@@ -1055,7 +1056,7 @@ def test_create_server_ssl_match_failed(self):
         self.assertIsNone(proto.transport)
         server.close()
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     @unittest.skipIf(ssl is None, 'No ssl module')
     def test_create_unix_server_ssl_verified(self):
         proto = MyProto(loop=self.loop)
@@ -1148,7 +1149,7 @@ def test_create_server_addr_in_use(self):
 
         server.close()
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
     def test_create_server_dual_stack(self):
         f_proto = self.loop.create_future()
 
@@ -1160,7 +1161,7 @@ def connection_made(self, transport):
         try_count = 0
         while True:
             try:
-                port = support.find_unused_port()
+                port = socket_helper.find_unused_port()
                 f = self.loop.create_server(TestMyProto, host=None, port=port)
                 server = self.loop.run_until_complete(f)
             except OSError as ex:
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
index 007039a7cdf5d..b5d1df93efd65 100644
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -13,6 +13,7 @@
 from asyncio.proactor_events import _ProactorDuplexPipeTransport
 from asyncio.proactor_events import _ProactorDatagramTransport
 from test import support
+from test.support import socket_helper
 from test.test_asyncio import utils as test_utils
 
 
@@ -950,7 +951,7 @@ def run_loop(self, coro):
     def prepare(self):
         sock = self.make_socket()
         proto = self.MyProto(self.loop)
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         srv_sock = self.make_socket(cleanup=False)
         srv_sock.bind(('127.0.0.1', port))
         server = self.run_loop(self.loop.create_server(
diff --git a/Lib/test/test_asyncio/test_sendfile.py b/Lib/test/test_asyncio/test_sendfile.py
index 3b7f784c5ee3a..dbce199a9b8e1 100644
--- a/Lib/test/test_asyncio/test_sendfile.py
+++ b/Lib/test/test_asyncio/test_sendfile.py
@@ -10,6 +10,7 @@
 from asyncio import constants
 from unittest import mock
 from test import support
+from test.support import socket_helper
 from test.test_asyncio import utils as test_utils
 
 try:
@@ -163,9 +164,9 @@ def reduce_send_buffer_size(self, sock, transport=None):
 
     def prepare_socksendfile(self):
         proto = MyProto(self.loop)
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         srv_sock = self.make_socket(cleanup=False)
-        srv_sock.bind((support.HOST, port))
+        srv_sock.bind((socket_helper.HOST, port))
         server = self.run_loop(self.loop.create_server(
             lambda: proto, sock=srv_sock))
         self.reduce_receive_buffer_size(srv_sock)
@@ -240,7 +241,7 @@ class SendfileMixin(SendfileBase):
     # Note: sendfile via SSL transport is equal to sendfile fallback
 
     def prepare_sendfile(self, *, is_ssl=False, close_after=0):
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         srv_proto = MySendfileProto(loop=self.loop,
                                     close_after=close_after)
         if is_ssl:
@@ -252,17 +253,17 @@ def prepare_sendfile(self, *, is_ssl=False, close_after=0):
             srv_ctx = None
             cli_ctx = None
         srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        srv_sock.bind((support.HOST, port))
+        srv_sock.bind((socket_helper.HOST, port))
         server = self.run_loop(self.loop.create_server(
             lambda: srv_proto, sock=srv_sock, ssl=srv_ctx))
         self.reduce_receive_buffer_size(srv_sock)
 
         if is_ssl:
-            server_hostname = support.HOST
+            server_hostname = socket_helper.HOST
         else:
             server_hostname = None
         cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        cli_sock.connect((support.HOST, port))
+        cli_sock.connect((socket_helper.HOST, port))
 
         cli_proto = MySendfileProto(loop=self.loop)
         tr, pr = self.run_loop(self.loop.create_connection(
diff --git a/Lib/test/test_asyncio/test_server.py b/Lib/test/test_asyncio/test_server.py
index d47ccc027677b..006ead29562cf 100644
--- a/Lib/test/test_asyncio/test_server.py
+++ b/Lib/test/test_asyncio/test_server.py
@@ -4,6 +4,7 @@
 import unittest
 
 from test import support
+from test.support import socket_helper
 from test.test_asyncio import utils as test_utils
 from test.test_asyncio import functional as func_tests
 
@@ -47,7 +48,7 @@ async def main(srv):
 
         with self.assertWarns(DeprecationWarning):
             srv = self.loop.run_until_complete(asyncio.start_server(
-                serve, support.HOSTv4, 0, loop=self.loop, start_serving=False))
+                serve, socket_helper.HOSTv4, 0, loop=self.loop, start_serving=False))
 
         self.assertFalse(srv.is_serving())
 
@@ -73,7 +74,7 @@ class SelectorStartServerTests(BaseStartServer, unittest.TestCase):
     def new_loop(self):
         return asyncio.SelectorEventLoop()
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_start_unix_server_1(self):
         HELLO_MSG = b'1' * 1024 * 5 + b'\n'
         started = threading.Event()
diff --git a/Lib/test/test_asyncio/test_sock_lowlevel.py b/Lib/test/test_asyncio/test_sock_lowlevel.py
index 89c2af9f5ec08..2f2d5a454973b 100644
--- a/Lib/test/test_asyncio/test_sock_lowlevel.py
+++ b/Lib/test/test_asyncio/test_sock_lowlevel.py
@@ -5,6 +5,7 @@
 from itertools import cycle, islice
 from test.test_asyncio import utils as test_utils
 from test import support
+from test.support import socket_helper
 
 
 class MyProto(asyncio.Protocol):
@@ -225,7 +226,7 @@ def test_huge_content_recvinto(self):
             self.loop.run_until_complete(
                 self._basetest_huge_content_recvinto(httpd.address))
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_unix_sock_client_ops(self):
         with test_utils.run_test_unix_server() as httpd:
             sock = socket.socket(socket.AF_UNIX)
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index 12bd536911dbb..1e9d115661d08 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -9,7 +9,7 @@
 import threading
 import unittest
 from unittest import mock
-from test import support
+from test.support import socket_helper
 try:
     import ssl
 except ImportError:
@@ -66,7 +66,7 @@ def test_open_connection(self):
                                                loop=self.loop)
             self._basetest_open_connection(conn_fut)
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_open_unix_connection(self):
         with test_utils.run_test_unix_server() as httpd:
             conn_fut = asyncio.open_unix_connection(httpd.address,
@@ -99,7 +99,7 @@ def test_open_connection_no_loop_ssl(self):
 
             self._basetest_open_connection_no_loop_ssl(conn_fut)
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     @unittest.skipIf(ssl is None, 'No ssl module')
     def test_open_unix_connection_no_loop_ssl(self):
         with test_utils.run_test_unix_server(use_ssl=True) as httpd:
@@ -130,7 +130,7 @@ def test_open_connection_error(self):
                                                loop=self.loop)
             self._basetest_open_connection_error(conn_fut)
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_open_unix_connection_error(self):
         with test_utils.run_test_unix_server() as httpd:
             conn_fut = asyncio.open_unix_connection(httpd.address,
@@ -653,7 +653,7 @@ async def client(addr):
 
         self.assertEqual(messages, [])
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_start_unix_server(self):
 
         class MyServer:
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
index 5487b7afef832..10bd46dea1991 100644
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -15,6 +15,7 @@
 import unittest
 from unittest import mock
 from test import support
+from test.support import socket_helper
 
 if sys.platform == 'win32':
     raise unittest.SkipTest('UNIX only')
@@ -273,7 +274,7 @@ def setUp(self):
         self.loop = asyncio.SelectorEventLoop()
         self.set_event_loop(self.loop)
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_create_unix_server_existing_path_sock(self):
         with test_utils.unix_socket_path() as path:
             sock = socket.socket(socket.AF_UNIX)
@@ -286,7 +287,7 @@ def test_create_unix_server_existing_path_sock(self):
             srv.close()
             self.loop.run_until_complete(srv.wait_closed())
 
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_create_unix_server_pathlib(self):
         with test_utils.unix_socket_path() as path:
             path = pathlib.Path(path)
@@ -344,7 +345,7 @@ def test_create_unix_server_path_dgram(self):
 
     @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
                          'no socket.SOCK_NONBLOCK (linux only)')
-    @support.skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_create_unix_server_path_stream_bittype(self):
         sock = socket.socket(
             socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
@@ -497,12 +498,12 @@ def run_loop(self, coro):
     def prepare(self):
         sock = self.make_socket()
         proto = self.MyProto(self.loop)
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         srv_sock = self.make_socket(cleanup=False)
-        srv_sock.bind((support.HOST, port))
+        srv_sock.bind((socket_helper.HOST, port))
         server = self.run_loop(self.loop.create_server(
             lambda: proto, sock=srv_sock))
-        self.run_loop(self.loop.sock_connect(sock, (support.HOST, port)))
+        self.run_loop(self.loop.sock_connect(sock, (socket_helper.HOST, port)))
         self.run_loop(proto._ready)
 
         def cleanup():
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index 6c84ac47a65dc..3c3abe4191788 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -10,6 +10,7 @@
 import threading
 
 from test import support
+from test.support import socket_helper
 from io import BytesIO
 
 if support.PGO:
@@ -91,7 +92,7 @@ def bind_af_aware(sock, addr):
     if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
         # Make sure the path doesn't exist.
         support.unlink(addr)
-        support.bind_unix_socket(sock, addr)
+        socket_helper.bind_unix_socket(sock, addr)
     else:
         sock.bind(addr)
 
@@ -327,7 +328,7 @@ def test_send(self):
         evt = threading.Event()
         sock = socket.socket()
         sock.settimeout(3)
-        port = support.bind_port(sock)
+        port = socket_helper.bind_port(sock)
 
         cap = BytesIO()
         args = (evt, cap, sock)
@@ -341,7 +342,7 @@ def test_send(self):
             data = b"Suppose there isn't a 16-ton weight?"
             d = dispatcherwithsend_noread()
             d.create_socket()
-            d.connect((support.HOST, port))
+            d.connect((socket_helper.HOST, port))
 
             # give time for socket to connect
             time.sleep(0.1)
@@ -791,12 +792,12 @@ def test_quick_connect(self):
 
 class TestAPI_UseIPv4Sockets(BaseTestAPI):
     family = socket.AF_INET
-    addr = (support.HOST, 0)
+    addr = (socket_helper.HOST, 0)
 
- at unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required')
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required')
 class TestAPI_UseIPv6Sockets(BaseTestAPI):
     family = socket.AF_INET6
-    addr = (support.HOSTv6, 0)
+    addr = (socket_helper.HOSTv6, 0)
 
 @unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
 class TestAPI_UseUnixSockets(BaseTestAPI):
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py
index cf30a3df35f12..e424076d7d317 100644
--- a/Lib/test/test_ftplib.py
+++ b/Lib/test/test_ftplib.py
@@ -19,7 +19,8 @@
 
 from unittest import TestCase, skipUnless
 from test import support
-from test.support import HOST, HOSTv6
+from test.support import socket_helper
+from test.support.socket_helper import HOST, HOSTv6
 
 TIMEOUT = support.LOOPBACK_TIMEOUT
 DEFAULT_ENCODING = 'utf-8'
@@ -751,7 +752,7 @@ def is_client_connected():
 
     def test_source_address(self):
         self.client.quit()
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         try:
             self.client.connect(self.server.host, self.server.port,
                                 source_address=(HOST, port))
@@ -763,7 +764,7 @@ def test_source_address(self):
             raise
 
     def test_source_address_passive_connection(self):
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         self.client.source_address = (HOST, port)
         try:
             with self.client.transfercmd('list') as sock:
@@ -816,7 +817,7 @@ def test_encoding_param(self):
         self.assertEqual(DEFAULT_ENCODING, client.encoding)
 
 
- at skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
+ at skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
 class TestIPv6Environment(TestCase):
 
     def setUp(self):
@@ -1007,7 +1008,7 @@ def setUp(self):
         self.evt = threading.Event()
         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.sock.settimeout(20)
-        self.port = support.bind_port(self.sock)
+        self.port = socket_helper.bind_port(self.sock)
         self.server_thread = threading.Thread(target=self.server)
         self.server_thread.daemon = True
         self.server_thread.start()
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py
index 77d43359f3026..6b7a9dedf1a2a 100644
--- a/Lib/test/test_httplib.py
+++ b/Lib/test/test_httplib.py
@@ -13,6 +13,7 @@
 TestCase = unittest.TestCase
 
 from test import support
+from test.support import socket_helper
 
 here = os.path.dirname(__file__)
 # Self-signed cert file for 'localhost'
@@ -42,7 +43,7 @@
 trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
 chunked_end = "\r\n"
 
-HOST = support.HOST
+HOST = socket_helper.HOST
 
 class FakeSocket:
     def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
@@ -1463,8 +1464,8 @@ def test_client_constants(self):
 class SourceAddressTest(TestCase):
     def setUp(self):
         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.port = support.bind_port(self.serv)
-        self.source_port = support.find_unused_port()
+        self.port = socket_helper.bind_port(self.serv)
+        self.source_port = socket_helper.find_unused_port()
         self.serv.listen()
         self.conn = None
 
@@ -1496,7 +1497,7 @@ class TimeoutTest(TestCase):
 
     def setUp(self):
         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        TimeoutTest.PORT = support.bind_port(self.serv)
+        TimeoutTest.PORT = socket_helper.bind_port(self.serv)
         self.serv.listen()
 
     def tearDown(self):
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py
index 91aa77126a28c..764566695170a 100644
--- a/Lib/test/test_imaplib.py
+++ b/Lib/test/test_imaplib.py
@@ -1,4 +1,5 @@
 from test import support
+from test.support import socket_helper
 
 from contextlib import contextmanager
 import imaplib
@@ -82,7 +83,7 @@ def test_imap4_host_default_value(self):
                 pass
 
         # This is the exception that should be raised.
-        expected_errnos = support.get_socket_conn_refused_errs()
+        expected_errnos = socket_helper.get_socket_conn_refused_errs()
         with self.assertRaises(OSError) as cm:
             imaplib.IMAP4()
         self.assertIn(cm.exception.errno, expected_errnos)
@@ -210,7 +211,7 @@ def handle_error(self, request, client_address):
                 raise
 
         self.addCleanup(self._cleanup)
-        self.server = self.server_class((support.HOST, 0), imap_handler)
+        self.server = self.server_class((socket_helper.HOST, 0), imap_handler)
         self.thread = threading.Thread(
             name=self._testMethodName+'-server',
             target=self.server.serve_forever,
@@ -600,7 +601,7 @@ def reap_server(self, server, thread):
 
     @contextmanager
     def reaped_server(self, hdlr):
-        server, thread = self.make_server((support.HOST, 0), hdlr)
+        server, thread = self.make_server((socket_helper.HOST, 0), hdlr)
         try:
             yield server
         finally:
diff --git a/Lib/test/test_largefile.py b/Lib/test/test_largefile.py
index c254b047e1ec5..a99b4ba89acbf 100644
--- a/Lib/test/test_largefile.py
+++ b/Lib/test/test_largefile.py
@@ -8,8 +8,9 @@
 import socket
 import shutil
 import threading
-from test.support import TESTFN, requires, unlink, bigmemtest, find_unused_port
+from test.support import TESTFN, requires, unlink, bigmemtest
 from test.support import SHORT_TIMEOUT
+from test.support import socket_helper
 import io  # C implementation of io
 import _pyio as pyio # Python implementation of io
 
@@ -219,7 +220,7 @@ def run(sock):
     # bit more tolerance.
     @skip_no_disk_space(TESTFN, size * 2.5)
     def test_it(self):
-        port = find_unused_port()
+        port = socket_helper.find_unused_port()
         with socket.create_server(("", port)) as sock:
             self.tcp_server(sock)
             with socket.create_connection(("127.0.0.1", port)) as client:
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index 99e74ebff6087..241ed2c91de13 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -43,6 +43,7 @@
 import tempfile
 from test.support.script_helper import assert_python_ok, assert_python_failure
 from test import support
+from test.support import socket_helper
 import textwrap
 import threading
 import time
@@ -1053,10 +1054,10 @@ class SMTPHandlerTest(BaseTest):
 
     def test_basic(self):
         sockmap = {}
-        server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001,
+        server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001,
                                 sockmap)
         server.start()
-        addr = (support.HOST, server.port)
+        addr = (socket_helper.HOST, server.port)
         h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
                                          timeout=self.TIMEOUT)
         self.assertEqual(h.toaddrs, ['you'])
@@ -1921,7 +1922,7 @@ def tearDown(self):
         SysLogHandlerTest.tearDown(self)
         support.unlink(self.address)
 
- at unittest.skipUnless(support.IPV6_ENABLED,
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED,
                      'IPv6 support required for this test.')
 class IPv6SysLogHandlerTest(SysLogHandlerTest):
 
diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py
index fdd76f9e9b355..2a5a0b97eea63 100644
--- a/Lib/test/test_nntplib.py
+++ b/Lib/test/test_nntplib.py
@@ -10,6 +10,7 @@
 import threading
 
 from test import support
+from test.support import socket_helper
 from nntplib import NNTP, GroupInfo
 import nntplib
 from unittest.mock import patch
@@ -1555,14 +1556,14 @@ def nntp_class(*pos, **kw):
 class LocalServerTests(unittest.TestCase):
     def setUp(self):
         sock = socket.socket()
-        port = support.bind_port(sock)
+        port = socket_helper.bind_port(sock)
         sock.listen()
         self.background = threading.Thread(
             target=self.run_server, args=(sock,))
         self.background.start()
         self.addCleanup(self.background.join)
 
-        self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__()
+        self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__()
         self.addCleanup(self.nntp.__exit__, None, None, None)
 
     def run_server(self, sock):
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 74aef47243428..362ba9e1042cb 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -30,6 +30,7 @@
 import uuid
 import warnings
 from test import support
+from test.support import socket_helper
 from platform import win32_is_iot
 
 try:
@@ -3171,7 +3172,7 @@ def tearDownClass(cls):
         support.unlink(support.TESTFN)
 
     def setUp(self):
-        self.server = SendfileTestServer((support.HOST, 0))
+        self.server = SendfileTestServer((socket_helper.HOST, 0))
         self.server.start()
         self.client = socket.socket()
         self.client.connect((self.server.host, self.server.port))
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
index 7f06d1950e1e1..d4877b1fbbc6b 100644
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -13,8 +13,9 @@
 
 from unittest import TestCase, skipUnless
 from test import support as test_support
+from test.support import socket_helper
 
-HOST = test_support.HOST
+HOST = socket_helper.HOST
 PORT = 0
 
 SUPPORTS_SSL = False
@@ -480,7 +481,7 @@ def setUp(self):
         self.evt = threading.Event()
         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.sock.settimeout(60)  # Safety net. Look issue 11812
-        self.port = test_support.bind_port(self.sock)
+        self.port = socket_helper.bind_port(self.sock)
         self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock))
         self.thread.daemon = True
         self.thread.start()
diff --git a/Lib/test/test_robotparser.py b/Lib/test/test_robotparser.py
index f28d8be5f3c4e..9d4764ece2fd2 100644
--- a/Lib/test/test_robotparser.py
+++ b/Lib/test/test_robotparser.py
@@ -4,6 +4,7 @@
 import unittest
 import urllib.robotparser
 from test import support
+from test.support import socket_helper
 from http.server import BaseHTTPRequestHandler, HTTPServer
 
 
@@ -312,7 +313,7 @@ def setUp(self):
         # clear _opener global variable
         self.addCleanup(urllib.request.urlcleanup)
 
-        self.server = HTTPServer((support.HOST, 0), RobotHandler)
+        self.server = HTTPServer((socket_helper.HOST, 0), RobotHandler)
 
         self.t = threading.Thread(
             name='HTTPServer serving',
@@ -332,7 +333,7 @@ def tearDown(self):
     @support.reap_threads
     def testPasswordProtectedSite(self):
         addr = self.server.server_address
-        url = 'http://' + support.HOST + ':' + str(addr[1])
+        url = 'http://' + socket_helper.HOST + ':' + str(addr[1])
         robots_url = url + "/robots.txt"
         parser = urllib.robotparser.RobotFileParser()
         parser.set_url(url)
diff --git a/Lib/test/test_selectors.py b/Lib/test/test_selectors.py
index c449155c4b49f..2274c39a79a53 100644
--- a/Lib/test/test_selectors.py
+++ b/Lib/test/test_selectors.py
@@ -6,6 +6,7 @@
 import socket
 import sys
 from test import support
+from test.support import socket_helper
 from time import sleep
 import unittest
 import unittest.mock
@@ -22,7 +23,7 @@
 else:
     def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
         with socket.socket(family, type, proto) as l:
-            l.bind((support.HOST, 0))
+            l.bind((socket_helper.HOST, 0))
             l.listen()
             c = socket.socket(family, type, proto)
             try:
diff --git a/Lib/test/test_smtpd.py b/Lib/test/test_smtpd.py
index a9f7d5a3b8bc1..3be7739743940 100644
--- a/Lib/test/test_smtpd.py
+++ b/Lib/test/test_smtpd.py
@@ -1,6 +1,7 @@
 import unittest
 import textwrap
 from test import support, mock_socket
+from test.support import socket_helper
 import socket
 import io
 import smtpd
@@ -38,7 +39,7 @@ def setUp(self):
         smtpd.socket = asyncore.socket = mock_socket
 
     def test_process_message_unimplemented(self):
-        server = smtpd.SMTPServer((support.HOST, 0), ('b', 0),
+        server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0),
                                   decode_data=True)
         conn, addr = server.accept()
         channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
@@ -57,7 +58,7 @@ def test_decode_data_and_enable_SMTPUTF8_raises(self):
         self.assertRaises(
             ValueError,
             smtpd.SMTPServer,
-            (support.HOST, 0),
+            (socket_helper.HOST, 0),
             ('b', 0),
             enable_SMTPUTF8=True,
             decode_data=True)
@@ -87,7 +88,7 @@ def write_line(line):
         write_line(b'.')
 
     def test_process_message_with_decode_data_true(self):
-        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
+        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
                                        decode_data=True)
         conn, addr = server.accept()
         channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
@@ -104,7 +105,7 @@ def test_process_message_with_decode_data_true(self):
              """))
 
     def test_process_message_with_decode_data_false(self):
-        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0))
+        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0))
         conn, addr = server.accept()
         channel = smtpd.SMTPChannel(server, conn, addr)
         with support.captured_stdout() as s:
@@ -120,7 +121,7 @@ def test_process_message_with_decode_data_false(self):
              """))
 
     def test_process_message_with_enable_SMTPUTF8_true(self):
-        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
+        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
                                        enable_SMTPUTF8=True)
         conn, addr = server.accept()
         channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
@@ -137,7 +138,7 @@ def test_process_message_with_enable_SMTPUTF8_true(self):
              """))
 
     def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
-        server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
+        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
                                        enable_SMTPUTF8=True)
         conn, addr = server.accept()
         channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
@@ -168,13 +169,13 @@ def tearDown(self):
         asyncore.close_all()
         asyncore.socket = smtpd.socket = socket
 
-    @unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
     def test_socket_uses_IPv6(self):
-        server = smtpd.SMTPServer((support.HOSTv6, 0), (support.HOSTv4, 0))
+        server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0))
         self.assertEqual(server.socket.family, socket.AF_INET6)
 
     def test_socket_uses_IPv4(self):
-        server = smtpd.SMTPServer((support.HOSTv4, 0), (support.HOSTv6, 0))
+        server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0))
         self.assertEqual(server.socket.family, socket.AF_INET)
 
 
@@ -197,7 +198,7 @@ def write_line(self, channel, line):
         channel.handle_read()
 
     def test_params_rejected(self):
-        server = DummyServer((support.HOST, 0), ('b', 0))
+        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
         conn, addr = server.accept()
         channel = smtpd.SMTPChannel(server, conn, addr)
         self.write_line(channel, b'EHLO example')
@@ -206,7 +207,7 @@ def test_params_rejected(self):
         self.assertEqual(channel.socket.last, self.error_response)
 
     def test_nothing_accepted(self):
-        server = DummyServer((support.HOST, 0), ('b', 0))
+        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
         conn, addr = server.accept()
         channel = smtpd.SMTPChannel(server, conn, addr)
         self.write_line(channel, b'EHLO example')
@@ -234,7 +235,7 @@ def write_line(self, channel, line):
         channel.handle_read()
 
     def test_with_decode_data_true(self):
-        server = DummyServer((support.HOST, 0), ('b', 0), decode_data=True)
+        server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True)
         conn, addr = server.accept()
         channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
         self.write_line(channel, b'EHLO example')
@@ -250,7 +251,7 @@ def test_with_decode_data_true(self):
         self.assertEqual(channel.socket.last, b'250 OK\r\n')
 
     def test_with_decode_data_false(self):
-        server = DummyServer((support.HOST, 0), ('b', 0))
+        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
         conn, addr = server.accept()
         channel = smtpd.SMTPChannel(server, conn, addr)
         self.write_line(channel, b'EHLO example')
@@ -271,7 +272,7 @@ def test_with_decode_data_false(self):
         self.assertEqual(channel.socket.last, b'250 OK\r\n')
 
     def test_with_enable_smtputf8_true(self):
-        server = DummyServer((support.HOST, 0), ('b', 0), enable_SMTPUTF8=True)
+        server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True)
         conn, addr = server.accept()
         channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
         self.write_line(channel, b'EHLO example')
@@ -286,7 +287,7 @@ def setUp(self):
         smtpd.socket = asyncore.socket = mock_socket
         self.old_debugstream = smtpd.DEBUGSTREAM
         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
-        self.server = DummyServer((support.HOST, 0), ('b', 0),
+        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
                                   decode_data=True)
         conn, addr = self.server.accept()
         self.channel = smtpd.SMTPChannel(self.server, conn, addr,
@@ -304,7 +305,7 @@ def write_line(self, line):
     def test_broken_connect(self):
         self.assertRaises(
             DummyDispatcherBroken, BrokenDummyServer,
-            (support.HOST, 0), ('b', 0), decode_data=True)
+            (socket_helper.HOST, 0), ('b', 0), decode_data=True)
 
     def test_decode_data_and_enable_SMTPUTF8_raises(self):
         self.assertRaises(
@@ -758,13 +759,13 @@ def test_attribute_deprecations(self):
         with support.check_warnings(('', DeprecationWarning)):
             self.channel._SMTPChannel__addr = 'spam'
 
- at unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
 class SMTPDChannelIPv6Test(SMTPDChannelTest):
     def setUp(self):
         smtpd.socket = asyncore.socket = mock_socket
         self.old_debugstream = smtpd.DEBUGSTREAM
         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
-        self.server = DummyServer((support.HOSTv6, 0), ('b', 0),
+        self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0),
                                   decode_data=True)
         conn, addr = self.server.accept()
         self.channel = smtpd.SMTPChannel(self.server, conn, addr,
@@ -776,7 +777,7 @@ def setUp(self):
         smtpd.socket = asyncore.socket = mock_socket
         self.old_debugstream = smtpd.DEBUGSTREAM
         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
-        self.server = DummyServer((support.HOST, 0), ('b', 0),
+        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
                                   decode_data=True)
         conn, addr = self.server.accept()
         # Set DATA size limit to 32 bytes for easy testing
@@ -831,7 +832,7 @@ def setUp(self):
         smtpd.socket = asyncore.socket = mock_socket
         self.old_debugstream = smtpd.DEBUGSTREAM
         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
-        self.server = DummyServer((support.HOST, 0), ('b', 0))
+        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0))
         conn, addr = self.server.accept()
         self.channel = smtpd.SMTPChannel(self.server, conn, addr)
 
@@ -873,7 +874,7 @@ def setUp(self):
         smtpd.socket = asyncore.socket = mock_socket
         self.old_debugstream = smtpd.DEBUGSTREAM
         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
-        self.server = DummyServer((support.HOST, 0), ('b', 0),
+        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
                                   decode_data=True)
         conn, addr = self.server.accept()
         # Set decode_data to True
@@ -916,7 +917,7 @@ def setUp(self):
         smtpd.socket = asyncore.socket = mock_socket
         self.old_debugstream = smtpd.DEBUGSTREAM
         self.debug = smtpd.DEBUGSTREAM = io.StringIO()
-        self.server = DummyServer((support.HOST, 0), ('b', 0),
+        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
                                   enable_SMTPUTF8=True)
         conn, addr = self.server.accept()
         self.channel = smtpd.SMTPChannel(self.server, conn, addr,
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index 067c01c10c1b3..d1ffb368a4f6f 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -20,11 +20,12 @@
 
 import unittest
 from test import support, mock_socket
-from test.support import HOST
+from test.support import socket_helper
 from test.support import threading_setup, threading_cleanup, join_thread
 from test.support import requires_hashdigest
 from unittest.mock import Mock
 
+HOST = socket_helper.HOST
 
 if sys.platform == 'darwin':
     # select.poll returns a select.POLLHUP at the end of the tests
@@ -271,7 +272,7 @@ def testBasic(self):
 
     def testSourceAddress(self):
         # connect
-        src_port = support.find_unused_port()
+        src_port = socket_helper.find_unused_port()
         try:
             smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
                                 timeout=support.LOOPBACK_TIMEOUT,
@@ -711,7 +712,7 @@ def setUp(self):
         self.evt = threading.Event()
         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.sock.settimeout(15)
-        self.port = support.bind_port(self.sock)
+        self.port = socket_helper.bind_port(self.sock)
         servargs = (self.evt, self.respdata, self.sock)
         self.thread = threading.Thread(target=server, args=servargs)
         self.thread.start()
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 6e4e4fe4c3532..a70e28219ed23 100755
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -1,5 +1,6 @@
 import unittest
 from test import support
+from test.support import socket_helper
 
 import errno
 import io
@@ -34,7 +35,7 @@
 except ImportError:
     fcntl = None
 
-HOST = support.HOST
+HOST = socket_helper.HOST
 # test unicode string and carriage return
 MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8')
 
@@ -161,7 +162,7 @@ class SocketTCPTest(unittest.TestCase):
 
     def setUp(self):
         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.port = support.bind_port(self.serv)
+        self.port = socket_helper.bind_port(self.serv)
         self.serv.listen()
 
     def tearDown(self):
@@ -172,7 +173,7 @@ class SocketUDPTest(unittest.TestCase):
 
     def setUp(self):
         self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        self.port = support.bind_port(self.serv)
+        self.port = socket_helper.bind_port(self.serv)
 
     def tearDown(self):
         self.serv.close()
@@ -182,7 +183,7 @@ class SocketUDPLITETest(SocketUDPTest):
 
     def setUp(self):
         self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
-        self.port = support.bind_port(self.serv)
+        self.port = socket_helper.bind_port(self.serv)
 
 class ThreadSafeCleanupTestCase(unittest.TestCase):
     """Subclass of unittest.TestCase with thread-safe cleanup methods.
@@ -265,7 +266,7 @@ def setUp(self):
         self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
         self.addCleanup(self.serv.close)
         try:
-            self.port = support.bind_port(self.serv)
+            self.port = socket_helper.bind_port(self.serv)
         except OSError:
             self.skipTest('unable to bind RDS socket')
 
@@ -682,7 +683,7 @@ def setUp(self):
 
     def bindSock(self, sock):
         path = tempfile.mktemp(dir=self.dir_path)
-        support.bind_unix_socket(sock, path)
+        socket_helper.bind_unix_socket(sock, path)
         self.addCleanup(support.unlink, path)
 
 class UnixStreamBase(UnixSocketTestBase):
@@ -702,7 +703,7 @@ def setUp(self):
         self.port = self.serv_addr[1]
 
     def bindSock(self, sock):
-        support.bind_port(sock, host=self.host)
+        socket_helper.bind_port(sock, host=self.host)
 
 class TCPTestBase(InetTestBase):
     """Base class for TCP-over-IPv4 tests."""
@@ -733,7 +734,7 @@ def newSocket(self):
 class Inet6TestBase(InetTestBase):
     """Base class for IPv6 socket tests."""
 
-    host = support.HOSTv6
+    host = socket_helper.HOSTv6
 
 class UDP6TestBase(Inet6TestBase):
     """Base class for UDP-over-IPv6 tests."""
@@ -966,12 +967,12 @@ def testHostnameRes(self):
             self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
 
     def test_host_resolution(self):
-        for addr in [support.HOSTv4, '10.0.0.1', '255.255.255.255']:
+        for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']:
             self.assertEqual(socket.gethostbyname(addr), addr)
 
-        # we don't test support.HOSTv6 because there's a chance it doesn't have
+        # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have
         # a matching name entry (e.g. 'ip6-localhost')
-        for host in [support.HOSTv4]:
+        for host in [socket_helper.HOSTv4]:
             self.assertIn(host, socket.gethostbyaddr(host)[2])
 
     def test_host_resolution_bad_address(self):
@@ -1334,7 +1335,7 @@ def testStringToIPv6(self):
 
     def testSockName(self):
         # Testing getsockname()
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.addCleanup(sock.close)
         sock.bind(("0.0.0.0", port))
@@ -1400,7 +1401,7 @@ def testNewAttributes(self):
     def test_getsockaddrarg(self):
         sock = socket.socket()
         self.addCleanup(sock.close)
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         big_port = port + 65536
         neg_port = port - 65536
         self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
@@ -1408,7 +1409,7 @@ def test_getsockaddrarg(self):
         # Since find_unused_port() is inherently subject to race conditions, we
         # call it a couple times if necessary.
         for i in itertools.count():
-            port = support.find_unused_port()
+            port = socket_helper.find_unused_port()
             try:
                 sock.bind((HOST, port))
             except OSError as e:
@@ -1461,7 +1462,7 @@ def testGetaddrinfo(self):
         socket.getaddrinfo('localhost', 80)
         socket.getaddrinfo('127.0.0.1', 80)
         socket.getaddrinfo(None, 80)
-        if support.IPV6_ENABLED:
+        if socket_helper.IPV6_ENABLED:
             socket.getaddrinfo('::1', 80)
         # port can be a string service name such as "http", a numeric
         # port number or None
@@ -1674,14 +1675,14 @@ def test_listen_backlog_overflow(self):
             srv.bind((HOST, 0))
             self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
     def test_flowinfo(self):
         self.assertRaises(OverflowError, socket.getnameinfo,
-                          (support.HOSTv6, 0, 0xffffffff), 0)
+                          (socket_helper.HOSTv6, 0, 0xffffffff), 0)
         with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
-            self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10))
+            self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10))
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
     def test_getaddrinfo_ipv6_basic(self):
         ((*_, sockaddr),) = socket.getaddrinfo(
             'ff02::1de:c0:face:8D',  # Note capital letter `D`.
@@ -1691,7 +1692,7 @@ def test_getaddrinfo_ipv6_basic(self):
         )
         self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0))
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
     @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
     @unittest.skipIf(AIX, 'Symbolic scope id does not work')
     def test_getaddrinfo_ipv6_scopeid_symbolic(self):
@@ -1706,7 +1707,7 @@ def test_getaddrinfo_ipv6_scopeid_symbolic(self):
         # Note missing interface name part in IPv6 address
         self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
     @unittest.skipUnless(
         sys.platform == 'win32',
         'Numeric scope id does not work or undocumented')
@@ -1723,7 +1724,7 @@ def test_getaddrinfo_ipv6_scopeid_numeric(self):
         # Note missing interface name part in IPv6 address
         self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
     @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
     @unittest.skipIf(AIX, 'Symbolic scope id does not work')
     def test_getnameinfo_ipv6_scopeid_symbolic(self):
@@ -1733,7 +1734,7 @@ def test_getnameinfo_ipv6_scopeid_symbolic(self):
         nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
         self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234'))
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
     @unittest.skipUnless( sys.platform == 'win32',
         'Numeric scope id does not work or undocumented')
     def test_getnameinfo_ipv6_scopeid_numeric(self):
@@ -1826,19 +1827,19 @@ def _test_socket_fileno(self, s, family, stype):
     def test_socket_fileno(self):
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.addCleanup(s.close)
-        s.bind((support.HOST, 0))
+        s.bind((socket_helper.HOST, 0))
         self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM)
 
         if hasattr(socket, "SOCK_DGRAM"):
             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
             self.addCleanup(s.close)
-            s.bind((support.HOST, 0))
+            s.bind((socket_helper.HOST, 0))
             self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM)
 
-        if support.IPV6_ENABLED:
+        if socket_helper.IPV6_ENABLED:
             s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
             self.addCleanup(s.close)
-            s.bind((support.HOSTv6, 0, 0, 0))
+            s.bind((socket_helper.HOSTv6, 0, 0, 0))
             self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM)
 
         if hasattr(socket, "AF_UNIX"):
@@ -2214,12 +2215,12 @@ def testUnbound(self):
 
     def testBindSock(self):
         with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
-            support.bind_port(s, host=s.getsockname()[0])
+            socket_helper.bind_port(s, host=s.getsockname()[0])
             self.assertNotEqual(s.getsockname()[1], 0)
 
     def testInvalidBindSock(self):
         with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
-            self.assertRaises(OSError, support.bind_port, s, host=-2)
+            self.assertRaises(OSError, socket_helper.bind_port, s, host=-2)
 
     def testAutoBindSock(self):
         with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
@@ -4094,25 +4095,25 @@ def checkRecvmsgAddress(self, addr1, addr2):
         self.assertEqual(addr1[:-1], addr2[:-1])
 
 @requireAttrs(socket.socket, "sendmsg")
- at unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
 @requireSocket("AF_INET6", "SOCK_DGRAM")
 class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase):
     pass
 
 @requireAttrs(socket.socket, "recvmsg")
- at unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
 @requireSocket("AF_INET6", "SOCK_DGRAM")
 class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase):
     pass
 
 @requireAttrs(socket.socket, "recvmsg_into")
- at unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
 @requireSocket("AF_INET6", "SOCK_DGRAM")
 class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase):
     pass
 
 @requireAttrs(socket.socket, "recvmsg")
- at unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
 @requireAttrs(socket, "IPPROTO_IPV6")
 @requireSocket("AF_INET6", "SOCK_DGRAM")
 class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
@@ -4120,7 +4121,7 @@ class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
     pass
 
 @requireAttrs(socket.socket, "recvmsg_into")
- at unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
 @requireAttrs(socket, "IPPROTO_IPV6")
 @requireSocket("AF_INET6", "SOCK_DGRAM")
 class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin,
@@ -4167,7 +4168,7 @@ def checkRecvmsgAddress(self, addr1, addr2):
         self.assertEqual(addr1[:-1], addr2[:-1])
 
 @requireAttrs(socket.socket, "sendmsg")
- at unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
 @unittest.skipUnless(HAVE_SOCKET_UDPLITE,
           'UDPLITE sockets required for this test.')
 @requireSocket("AF_INET6", "SOCK_DGRAM")
@@ -4175,7 +4176,7 @@ class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBas
     pass
 
 @requireAttrs(socket.socket, "recvmsg")
- at unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
 @unittest.skipUnless(HAVE_SOCKET_UDPLITE,
           'UDPLITE sockets required for this test.')
 @requireSocket("AF_INET6", "SOCK_DGRAM")
@@ -4183,7 +4184,7 @@ class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase):
     pass
 
 @requireAttrs(socket.socket, "recvmsg_into")
- at unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
 @unittest.skipUnless(HAVE_SOCKET_UDPLITE,
           'UDPLITE sockets required for this test.')
 @requireSocket("AF_INET6", "SOCK_DGRAM")
@@ -4191,7 +4192,7 @@ class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase):
     pass
 
 @requireAttrs(socket.socket, "recvmsg")
- at unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
 @unittest.skipUnless(HAVE_SOCKET_UDPLITE,
           'UDPLITE sockets required for this test.')
 @requireAttrs(socket, "IPPROTO_IPV6")
@@ -4201,7 +4202,7 @@ class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest,
     pass
 
 @requireAttrs(socket.socket, "recvmsg_into")
- at unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
+ at unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
 @unittest.skipUnless(HAVE_SOCKET_UDPLITE,
           'UDPLITE sockets required for this test.')
 @requireAttrs(socket, "IPPROTO_IPV6")
@@ -4999,7 +5000,7 @@ def mocked_socket_module(self):
             socket.socket = old_socket
 
     def test_connect(self):
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.addCleanup(cli.close)
         with self.assertRaises(OSError) as cm:
@@ -5009,7 +5010,7 @@ def test_connect(self):
     def test_create_connection(self):
         # Issue #9792: errors raised by create_connection() should have
         # a proper errno attribute.
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         with self.assertRaises(OSError) as cm:
             socket.create_connection((HOST, port))
 
@@ -5027,7 +5028,7 @@ def test_create_connection(self):
         # On Solaris, ENETUNREACH is returned in this circumstance instead
         # of ECONNREFUSED.  So, if that errno exists, add it to our list of
         # expected errnos.
-        expected_errnos = support.get_socket_conn_refused_errs()
+        expected_errnos = socket_helper.get_socket_conn_refused_errs()
         self.assertIn(cm.exception.errno, expected_errnos)
 
     def test_create_connection_timeout(self):
@@ -5039,7 +5040,7 @@ def test_create_connection_timeout(self):
             except socket.timeout:
                 pass
             except OSError as exc:
-                if support.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT:
+                if socket_helper.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT:
                     raise
             else:
                 self.fail('socket.timeout not raised')
@@ -5052,7 +5053,7 @@ def __init__(self, methodName='runTest'):
         ThreadableTest.__init__(self)
 
     def clientSetUp(self):
-        self.source_port = support.find_unused_port()
+        self.source_port = socket_helper.find_unused_port()
 
     def clientTearDown(self):
         self.cli.close()
@@ -5338,7 +5339,7 @@ def encoded(self, path):
     def bind(self, sock, path):
         # Bind the socket
         try:
-            support.bind_unix_socket(sock, path)
+            socket_helper.bind_unix_socket(sock, path)
         except OSError as e:
             if str(e) == "AF_UNIX path too long":
                 self.skipTest(
@@ -6328,11 +6329,11 @@ def test_new_tcp_flags(self):
 class CreateServerTest(unittest.TestCase):
 
     def test_address(self):
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         with socket.create_server(("127.0.0.1", port)) as sock:
             self.assertEqual(sock.getsockname()[0], "127.0.0.1")
             self.assertEqual(sock.getsockname()[1], port)
-        if support.IPV6_ENABLED:
+        if socket_helper.IPV6_ENABLED:
             with socket.create_server(("::1", port),
                                       family=socket.AF_INET6) as sock:
                 self.assertEqual(sock.getsockname()[0], "::1")
@@ -6342,7 +6343,7 @@ def test_family_and_type(self):
         with socket.create_server(("127.0.0.1", 0)) as sock:
             self.assertEqual(sock.family, socket.AF_INET)
             self.assertEqual(sock.type, socket.SOCK_STREAM)
-        if support.IPV6_ENABLED:
+        if socket_helper.IPV6_ENABLED:
             with socket.create_server(("::1", 0), family=socket.AF_INET6) as s:
                 self.assertEqual(s.family, socket.AF_INET6)
                 self.assertEqual(sock.type, socket.SOCK_STREAM)
@@ -6362,14 +6363,14 @@ def test_reuse_port(self):
     @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or
                      not hasattr(_socket, 'IPV6_V6ONLY'),
                      "IPV6_V6ONLY option not supported")
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
     def test_ipv6_only_default(self):
         with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock:
             assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
 
     @unittest.skipIf(not socket.has_dualstack_ipv6(),
                      "dualstack_ipv6 not supported")
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
     def test_dualstack_ipv6_family(self):
         with socket.create_server(("::1", 0), family=socket.AF_INET6,
                                   dualstack_ipv6=True) as sock:
@@ -6411,14 +6412,14 @@ def echo_client(self, addr, family):
             self.assertEqual(sock.recv(1024), b'foo')
 
     def test_tcp4(self):
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         with socket.create_server(("", port)) as sock:
             self.echo_server(sock)
             self.echo_client(("127.0.0.1", port), socket.AF_INET)
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
     def test_tcp6(self):
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         with socket.create_server(("", port),
                                   family=socket.AF_INET6) as sock:
             self.echo_server(sock)
@@ -6428,9 +6429,9 @@ def test_tcp6(self):
 
     @unittest.skipIf(not socket.has_dualstack_ipv6(),
                      "dualstack_ipv6 not supported")
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
     def test_dual_stack_client_v4(self):
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         with socket.create_server(("", port), family=socket.AF_INET6,
                                   dualstack_ipv6=True) as sock:
             self.echo_server(sock)
@@ -6438,9 +6439,9 @@ def test_dual_stack_client_v4(self):
 
     @unittest.skipIf(not socket.has_dualstack_ipv6(),
                      "dualstack_ipv6 not supported")
-    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
     def test_dual_stack_client_v6(self):
-        port = support.find_unused_port()
+        port = socket_helper.find_unused_port()
         with socket.create_server(("", port), family=socket.AF_INET6,
                                   dualstack_ipv6=True) as sock:
             self.echo_server(sock)
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
index f818df0b22f6a..c663cc95889c9 100644
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -15,12 +15,13 @@
 
 import test.support
 from test.support import reap_children, reap_threads, verbose
+from test.support import socket_helper
 
 
 test.support.requires("network")
 
 TEST_STR = b"hello world\n"
-HOST = test.support.HOST
+HOST = socket_helper.HOST
 
 HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
 requires_unix_sockets = unittest.skipUnless(HAVE_UNIX_SOCKETS,
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 4184665b2b158..dafdb6c08092b 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -4,6 +4,7 @@
 import unittest
 import unittest.mock
 from test import support
+from test.support import socket_helper
 import socket
 import select
 import time
@@ -33,7 +34,7 @@
 Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32'
 
 PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
-HOST = support.HOST
+HOST = socket_helper.HOST
 IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
 IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
 IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
@@ -762,7 +763,7 @@ def fail(cert, hostname):
         fail(cert, 'example.net')
 
         # -- IPv6 matching --
-        if support.IPV6_ENABLED:
+        if socket_helper.IPV6_ENABLED:
             cert = {'subject': ((('commonName', 'example.com'),),),
                     'subjectAltName': (
                         ('DNS', 'example.com'),
@@ -845,7 +846,7 @@ def fail(cert, hostname):
                 ssl._inet_paton(invalid)
         for ipaddr in ['127.0.0.1', '192.168.0.1']:
             self.assertTrue(ssl._inet_paton(ipaddr))
-        if support.IPV6_ENABLED:
+        if socket_helper.IPV6_ENABLED:
             for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
                 self.assertTrue(ssl._inet_paton(ipaddr))
 
@@ -1073,7 +1074,7 @@ def local_february_name():
     def test_connect_ex_error(self):
         server = socket.socket(socket.AF_INET)
         self.addCleanup(server.close)
-        port = support.bind_port(server)  # Reserve port but don't listen
+        port = socket_helper.bind_port(server)  # Reserve port but don't listen
         s = test_wrap_socket(socket.socket(socket.AF_INET),
                             cert_reqs=ssl.CERT_REQUIRED)
         self.addCleanup(s.close)
@@ -2256,7 +2257,7 @@ def test_timeout_connect_ex(self):
                 self.skipTest("REMOTE_HOST responded too quickly")
             self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
 
-    @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
+    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'Needs IPv6')
     def test_get_server_certificate_ipv6(self):
         with support.transient_internet('ipv6.google.com'):
             _test_get_server_certificate(self, 'ipv6.google.com', 443)
@@ -2511,7 +2512,7 @@ def __init__(self, certificate=None, ssl_version=None,
         self.connectionchatty = connectionchatty
         self.starttls_server = starttls_server
         self.sock = socket.socket()
-        self.port = support.bind_port(self.sock)
+        self.port = socket_helper.bind_port(self.sock)
         self.flag = None
         self.active = False
         self.selected_npn_protocols = []
@@ -2624,7 +2625,7 @@ def handle_error(self):
         def __init__(self, certfile):
             self.certfile = certfile
             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            self.port = support.bind_port(sock, '')
+            self.port = socket_helper.bind_port(sock, '')
             asyncore.dispatcher.__init__(self, sock)
             self.listen(5)
 
@@ -3144,7 +3145,7 @@ def test_rude_shutdown(self):
         listener_gone = threading.Event()
 
         s = socket.socket()
-        port = support.bind_port(s, HOST)
+        port = socket_helper.bind_port(s, HOST)
 
         # `listener` runs in a thread.  It sits in an accept() until
         # the main thread connects.  Then it rudely closes the socket,
@@ -3640,7 +3641,7 @@ def test_handshake_timeout(self):
         # Issue #5103: SSL handshake must respect the socket timeout
         server = socket.socket(socket.AF_INET)
         host = "127.0.0.1"
-        port = support.bind_port(server)
+        port = socket_helper.bind_port(server)
         started = threading.Event()
         finish = False
 
@@ -3694,7 +3695,7 @@ def test_server_accept(self):
         context.load_cert_chain(SIGNED_CERTFILE)
         server = socket.socket(socket.AF_INET)
         host = "127.0.0.1"
-        port = support.bind_port(server)
+        port = socket_helper.bind_port(server)
         server = context.wrap_socket(server, server_side=True)
         self.assertTrue(server.server_side)
 
diff --git a/Lib/test/test_stat.py b/Lib/test/test_stat.py
index be01db2ed5574..d9e4ffde658db 100644
--- a/Lib/test/test_stat.py
+++ b/Lib/test/test_stat.py
@@ -2,8 +2,8 @@
 import os
 import socket
 import sys
-from test.support import (TESTFN, import_fresh_module,
-                          skip_unless_bind_unix_socket)
+from test.support import socket_helper
+from test.support import TESTFN, import_fresh_module
 
 c_stat = import_fresh_module('stat', fresh=['_stat'])
 py_stat = import_fresh_module('stat', blocked=['_stat'])
@@ -193,7 +193,7 @@ def test_devices(self):
                 self.assertS_IS("BLK", st_mode)
                 break
 
-    @skip_unless_bind_unix_socket
+    @socket_helper.skip_unless_bind_unix_socket
     def test_socket(self):
         with socket.socket(socket.AF_UNIX) as s:
             s.bind(TESTFN)
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index dee1db7d6d7c8..606e57003ed71 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -14,6 +14,7 @@
 import unittest
 from test import support
 from test.support import script_helper
+from test.support import socket_helper
 
 TESTFN = support.TESTFN
 
@@ -91,17 +92,17 @@ def test_forget(self):
             support.rmtree('__pycache__')
 
     def test_HOST(self):
-        s = socket.create_server((support.HOST, 0))
+        s = socket.create_server((socket_helper.HOST, 0))
         s.close()
 
     def test_find_unused_port(self):
-        port = support.find_unused_port()
-        s = socket.create_server((support.HOST, port))
+        port = socket_helper.find_unused_port()
+        s = socket.create_server((socket_helper.HOST, port))
         s.close()
 
     def test_bind_port(self):
         s = socket.socket()
-        support.bind_port(s)
+        socket_helper.bind_port(s)
         s.listen()
         s.close()
 
diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py
index 414c328b79f95..7633901c96c84 100644
--- a/Lib/test/test_telnetlib.py
+++ b/Lib/test/test_telnetlib.py
@@ -5,9 +5,10 @@
 import contextlib
 
 from test import support
+from test.support import socket_helper
 import unittest
 
-HOST = support.HOST
+HOST = socket_helper.HOST
 
 def server(evt, serv):
     serv.listen()
@@ -26,7 +27,7 @@ def setUp(self):
         self.evt = threading.Event()
         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.sock.settimeout(60)  # Safety net. Look issue 11812
-        self.port = support.bind_port(self.sock)
+        self.port = socket_helper.bind_port(self.sock)
         self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
         self.thread.setDaemon(True)
         self.thread.start()
diff --git a/Lib/test/test_timeout.py b/Lib/test/test_timeout.py
index ff41b13a6c839..c0952c75e9913 100644
--- a/Lib/test/test_timeout.py
+++ b/Lib/test/test_timeout.py
@@ -3,6 +3,7 @@
 import functools
 import unittest
 from test import support
+from test.support import socket_helper
 
 # This requires the 'network' resource as given on the regrtest command line.
 skip_expected = not support.is_resource_enabled('network')
@@ -110,7 +111,7 @@ class TimeoutTestCase(unittest.TestCase):
     # solution.
     fuzz = 2.0
 
-    localhost = support.HOST
+    localhost = socket_helper.HOST
 
     def setUp(self):
         raise NotImplementedError()
@@ -240,14 +241,14 @@ def testRecvTimeout(self):
 
     def testAcceptTimeout(self):
         # Test accept() timeout
-        support.bind_port(self.sock, self.localhost)
+        socket_helper.bind_port(self.sock, self.localhost)
         self.sock.listen()
         self._sock_operation(1, 1.5, 'accept')
 
     def testSend(self):
         # Test send() timeout
         with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
-            support.bind_port(serv, self.localhost)
+            socket_helper.bind_port(serv, self.localhost)
             serv.listen()
             self.sock.connect(serv.getsockname())
             # Send a lot of data in order to bypass buffering in the TCP stack.
@@ -256,7 +257,7 @@ def testSend(self):
     def testSendto(self):
         # Test sendto() timeout
         with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
-            support.bind_port(serv, self.localhost)
+            socket_helper.bind_port(serv, self.localhost)
             serv.listen()
             self.sock.connect(serv.getsockname())
             # The address argument is ignored since we already connected.
@@ -266,7 +267,7 @@ def testSendto(self):
     def testSendall(self):
         # Test sendall() timeout
         with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
-            support.bind_port(serv, self.localhost)
+            socket_helper.bind_port(serv, self.localhost)
             serv.listen()
             self.sock.connect(serv.getsockname())
             # Send a lot of data in order to bypass buffering in the TCP stack.
@@ -285,7 +286,7 @@ def tearDown(self):
     def testRecvfromTimeout(self):
         # Test recvfrom() timeout
         # Prevent "Address already in use" socket exceptions
-        support.bind_port(self.sock, self.localhost)
+        socket_helper.bind_port(self.sock, self.localhost)
         self._sock_operation(1, 1.5, 'recvfrom', 1024)
 
 
diff --git a/Lib/test/test_wsgiref.py b/Lib/test/test_wsgiref.py
index 6af45145a7927..4bf5d39e619f6 100644
--- a/Lib/test/test_wsgiref.py
+++ b/Lib/test/test_wsgiref.py
@@ -1,5 +1,6 @@
 from unittest import mock
 from test import support
+from test.support import socket_helper
 from test.test_httpservers import NoLogRequestHandler
 from unittest import TestCase
 from wsgiref.util import setup_testing_defaults
@@ -263,7 +264,7 @@ def app(environ, start_response):
         class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler):
             pass
 
-        server = make_server(support.HOST, 0, app, handler_class=WsgiHandler)
+        server = make_server(socket_helper.HOST, 0, app, handler_class=WsgiHandler)
         self.addCleanup(server.server_close)
         interrupted = threading.Event()
 
diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py
index e5c3496ec548e..f68af527eae85 100644
--- a/Lib/test/test_xmlrpc.py
+++ b/Lib/test/test_xmlrpc.py
@@ -15,6 +15,7 @@
 import io
 import contextlib
 from test import support
+from test.support import socket_helper
 from test.support import ALWAYS_EQ, LARGEST, SMALLEST
 
 try:
@@ -334,7 +335,7 @@ def run_server():
             server.handle_request()  # First request and attempt at second
             server.handle_request()  # Retried second request
 
-        server = http.server.HTTPServer((support.HOST, 0), RequestHandler)
+        server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler)
         self.addCleanup(server.server_close)
         thread = threading.Thread(target=run_server)
         thread.start()



More information about the Python-checkins mailing list