[Python-checkins] cpython (2.7): Issue #18702: All skipped tests now reported as skipped.
serhiy.storchaka
python-checkins at python.org
Sun Nov 3 22:16:13 CET 2013
http://hg.python.org/cpython/rev/2d330f7908e7
changeset: 86897:2d330f7908e7
branch: 2.7
parent: 86884:a34889a30d52
user: Serhiy Storchaka <storchaka at gmail.com>
date: Sun Nov 03 23:15:46 2013 +0200
summary:
Issue #18702: All skipped tests now reported as skipped.
files:
Lib/test/test_array.py | 17 +-
Lib/test/test_compileall.py | 3 +-
Lib/test/test_csv.py | 135 +++---
Lib/test/test_enumerate.py | 3 +-
Lib/test/test_ftplib.py | 26 +-
Lib/test/test_mailbox.py | 36 +-
Lib/test/test_math.py | 59 +-
Lib/test/test_mmap.py | 142 +++---
Lib/test/test_os.py | 231 ++++++------
Lib/test/test_poplib.py | 28 +-
Lib/test/test_posix.py | 401 +++++++++++----------
Lib/test/test_set.py | 8 +-
Lib/test/test_shutil.py | 110 +++--
Lib/test/test_socket.py | 64 +-
Lib/test/test_socketserver.py | 85 ++--
Lib/test/test_sys.py | 17 +-
Lib/test/test_warnings.py | 3 +-
Lib/test/test_zlib.py | 155 ++++----
Misc/NEWS | 2 +
19 files changed, 783 insertions(+), 742 deletions(-)
diff --git a/Lib/test/test_array.py b/Lib/test/test_array.py
--- a/Lib/test/test_array.py
+++ b/Lib/test/test_array.py
@@ -9,6 +9,7 @@
from weakref import proxy
import array, cStringIO
from cPickle import loads, dumps, HIGHEST_PROTOCOL
+import sys
class ArraySubclass(array.array):
pass
@@ -772,15 +773,15 @@
s = None
self.assertRaises(ReferenceError, len, p)
+ @unittest.skipUnless(hasattr(sys, 'getrefcount'),
+ 'test needs sys.getrefcount()')
def test_bug_782369(self):
- import sys
- if hasattr(sys, "getrefcount"):
- for i in range(10):
- b = array.array('B', range(64))
- rc = sys.getrefcount(10)
- for i in range(10):
- b = array.array('B', range(64))
- self.assertEqual(rc, sys.getrefcount(10))
+ for i in range(10):
+ b = array.array('B', range(64))
+ rc = sys.getrefcount(10)
+ for i in range(10):
+ b = array.array('B', range(64))
+ self.assertEqual(rc, sys.getrefcount(10))
def test_subclass_with_kwargs(self):
# SF bug #1486663 -- this used to erroneously raise a TypeError
diff --git a/Lib/test/test_compileall.py b/Lib/test/test_compileall.py
--- a/Lib/test/test_compileall.py
+++ b/Lib/test/test_compileall.py
@@ -31,11 +31,10 @@
compare = struct.pack('<4sl', imp.get_magic(), mtime)
return data, compare
+ @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def recreation_check(self, metadata):
"""Check that compileall recreates bytecode when the new metadata is
used."""
- if not hasattr(os, 'stat'):
- return
py_compile.compile(self.source_path)
self.assertEqual(*self.data())
with open(self.bc_path, 'rb') as file:
diff --git a/Lib/test/test_csv.py b/Lib/test/test_csv.py
--- a/Lib/test/test_csv.py
+++ b/Lib/test/test_csv.py
@@ -1014,78 +1014,77 @@
dialect = sniffer.sniff(self.sample9)
self.assertTrue(dialect.doublequote)
-if not hasattr(sys, "gettotalrefcount"):
- if test_support.verbose: print "*** skipping leakage tests ***"
-else:
- class NUL:
- def write(s, *args):
- pass
- writelines = write
+class NUL:
+ def write(s, *args):
+ pass
+ writelines = write
- class TestLeaks(unittest.TestCase):
- def test_create_read(self):
- delta = 0
- lastrc = sys.gettotalrefcount()
- for i in xrange(20):
- gc.collect()
- self.assertEqual(gc.garbage, [])
- rc = sys.gettotalrefcount()
- csv.reader(["a,b,c\r\n"])
- csv.reader(["a,b,c\r\n"])
- csv.reader(["a,b,c\r\n"])
- delta = rc-lastrc
- lastrc = rc
- # if csv.reader() leaks, last delta should be 3 or more
- self.assertEqual(delta < 3, True)
+ at unittest.skipUnless(hasattr(sys, "gettotalrefcount"),
+ 'requires sys.gettotalrefcount()')
+class TestLeaks(unittest.TestCase):
+ def test_create_read(self):
+ delta = 0
+ lastrc = sys.gettotalrefcount()
+ for i in xrange(20):
+ gc.collect()
+ self.assertEqual(gc.garbage, [])
+ rc = sys.gettotalrefcount()
+ csv.reader(["a,b,c\r\n"])
+ csv.reader(["a,b,c\r\n"])
+ csv.reader(["a,b,c\r\n"])
+ delta = rc-lastrc
+ lastrc = rc
+ # if csv.reader() leaks, last delta should be 3 or more
+ self.assertEqual(delta < 3, True)
- def test_create_write(self):
- delta = 0
- lastrc = sys.gettotalrefcount()
- s = NUL()
- for i in xrange(20):
- gc.collect()
- self.assertEqual(gc.garbage, [])
- rc = sys.gettotalrefcount()
- csv.writer(s)
- csv.writer(s)
- csv.writer(s)
- delta = rc-lastrc
- lastrc = rc
- # if csv.writer() leaks, last delta should be 3 or more
- self.assertEqual(delta < 3, True)
+ def test_create_write(self):
+ delta = 0
+ lastrc = sys.gettotalrefcount()
+ s = NUL()
+ for i in xrange(20):
+ gc.collect()
+ self.assertEqual(gc.garbage, [])
+ rc = sys.gettotalrefcount()
+ csv.writer(s)
+ csv.writer(s)
+ csv.writer(s)
+ delta = rc-lastrc
+ lastrc = rc
+ # if csv.writer() leaks, last delta should be 3 or more
+ self.assertEqual(delta < 3, True)
- def test_read(self):
- delta = 0
- rows = ["a,b,c\r\n"]*5
- lastrc = sys.gettotalrefcount()
- for i in xrange(20):
- gc.collect()
- self.assertEqual(gc.garbage, [])
- rc = sys.gettotalrefcount()
- rdr = csv.reader(rows)
- for row in rdr:
- pass
- delta = rc-lastrc
- lastrc = rc
- # if reader leaks during read, delta should be 5 or more
- self.assertEqual(delta < 5, True)
+ def test_read(self):
+ delta = 0
+ rows = ["a,b,c\r\n"]*5
+ lastrc = sys.gettotalrefcount()
+ for i in xrange(20):
+ gc.collect()
+ self.assertEqual(gc.garbage, [])
+ rc = sys.gettotalrefcount()
+ rdr = csv.reader(rows)
+ for row in rdr:
+ pass
+ delta = rc-lastrc
+ lastrc = rc
+ # if reader leaks during read, delta should be 5 or more
+ self.assertEqual(delta < 5, True)
- def test_write(self):
- delta = 0
- rows = [[1,2,3]]*5
- s = NUL()
- lastrc = sys.gettotalrefcount()
- for i in xrange(20):
- gc.collect()
- self.assertEqual(gc.garbage, [])
- rc = sys.gettotalrefcount()
- writer = csv.writer(s)
- for row in rows:
- writer.writerow(row)
- delta = rc-lastrc
- lastrc = rc
- # if writer leaks during write, last delta should be 5 or more
- self.assertEqual(delta < 5, True)
+ def test_write(self):
+ delta = 0
+ rows = [[1,2,3]]*5
+ s = NUL()
+ lastrc = sys.gettotalrefcount()
+ for i in xrange(20):
+ gc.collect()
+ self.assertEqual(gc.garbage, [])
+ rc = sys.gettotalrefcount()
+ writer = csv.writer(s)
+ for row in rows:
+ writer.writerow(row)
+ delta = rc-lastrc
+ lastrc = rc
+ # if writer leaks during write, last delta should be 5 or more
+ self.assertEqual(delta < 5, True)
# commented out for now - csv module doesn't yet support Unicode
## class TestUnicode(unittest.TestCase):
diff --git a/Lib/test/test_enumerate.py b/Lib/test/test_enumerate.py
--- a/Lib/test/test_enumerate.py
+++ b/Lib/test/test_enumerate.py
@@ -188,11 +188,10 @@
self.assertRaises(TypeError, reversed)
self.assertRaises(TypeError, reversed, [], 'extra')
+ @unittest.skipUnless(hasattr(sys, 'getrefcount'), 'test needs sys.getrefcount()')
def test_bug1229429(self):
# this bug was never in reversed, it was in
# PyObject_CallMethod, and reversed_new calls that sometimes.
- if not hasattr(sys, "getrefcount"):
- return
def f():
pass
r = f.__reversed__ = object()
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py
--- a/Lib/test/test_ftplib.py
+++ b/Lib/test/test_ftplib.py
@@ -15,7 +15,7 @@
except ImportError:
ssl = None
-from unittest import TestCase
+from unittest import TestCase, SkipTest, skipUnless
from test import test_support
from test.test_support import HOST, HOSTv6
threading = test_support.import_module('threading')
@@ -579,8 +579,16 @@
self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
+ at skipUnless(socket.has_ipv6, "IPv6 not enabled")
class TestIPv6Environment(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ try:
+ DummyFTPServer((HOST, 0), af=socket.AF_INET6)
+ except socket.error:
+ raise SkipTest("IPv6 not enabled")
+
def setUp(self):
self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6)
self.server.start()
@@ -615,6 +623,7 @@
retr()
+ at skipUnless(ssl, "SSL not available")
class TestTLS_FTPClassMixin(TestFTPClass):
"""Repeat TestFTPClass tests starting the TLS layer for both control
and data connections first.
@@ -630,6 +639,7 @@
self.client.prot_p()
+ at skipUnless(ssl, "SSL not available")
class TestTLS_FTPClass(TestCase):
"""Specific TLS_FTP class tests."""
@@ -783,17 +793,9 @@
def test_main():
- tests = [TestFTPClass, TestTimeouts]
- if socket.has_ipv6:
- try:
- DummyFTPServer((HOST, 0), af=socket.AF_INET6)
- except socket.error:
- pass
- else:
- tests.append(TestIPv6Environment)
-
- if ssl is not None:
- tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass])
+ tests = [TestFTPClass, TestTimeouts,
+ TestIPv6Environment,
+ TestTLS_FTPClassMixin, TestTLS_FTPClass]
thread_info = test_support.threading_setup()
try:
diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -772,10 +772,10 @@
for msg in self._box:
pass
+ @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
+ @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_file_permissions(self):
# Verify that message files are created without execute permissions
- if not hasattr(os, "stat") or not hasattr(os, "umask"):
- return
msg = mailbox.MaildirMessage(self._template % 0)
orig_umask = os.umask(0)
try:
@@ -786,12 +786,11 @@
mode = os.stat(path).st_mode
self.assertEqual(mode & 0111, 0)
+ @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
+ @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_folder_file_perms(self):
# From bug #3228, we want to verify that the file created inside a Maildir
# subfolder isn't marked as executable.
- if not hasattr(os, "stat") or not hasattr(os, "umask"):
- return
-
orig_umask = os.umask(0)
try:
subfolder = self._box.add_folder('subfolder')
@@ -991,24 +990,25 @@
_factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
+ @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
+ @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_file_perms(self):
# From bug #3228, we want to verify that the mailbox file isn't executable,
# even if the umask is set to something that would leave executable bits set.
# We only run this test on platforms that support umask.
- if hasattr(os, 'umask') and hasattr(os, 'stat'):
- try:
- old_umask = os.umask(0077)
- self._box.close()
- os.unlink(self._path)
- self._box = mailbox.mbox(self._path, create=True)
- self._box.add('')
- self._box.close()
- finally:
- os.umask(old_umask)
+ try:
+ old_umask = os.umask(0077)
+ self._box.close()
+ os.unlink(self._path)
+ self._box = mailbox.mbox(self._path, create=True)
+ self._box.add('')
+ self._box.close()
+ finally:
+ os.umask(old_umask)
- st = os.stat(self._path)
- perms = st.st_mode
- self.assertFalse((perms & 0111)) # Execute bits should all be off.
+ st = os.stat(self._path)
+ perms = st.st_mode
+ self.assertFalse((perms & 0111)) # Execute bits should all be off.
def test_terminating_newline(self):
message = email.message.Message()
diff --git a/Lib/test/test_math.py b/Lib/test/test_math.py
--- a/Lib/test/test_math.py
+++ b/Lib/test/test_math.py
@@ -906,38 +906,37 @@
# still fails this part of the test on some platforms. For now, we only
# *run* test_exceptions() in verbose mode, so that this isn't normally
# tested.
+ @unittest.skipUnless(verbose, 'requires verbose mode')
+ def test_exceptions(self):
+ try:
+ x = math.exp(-1000000000)
+ except:
+ # mathmodule.c is failing to weed out underflows from libm, or
+ # we've got an fp format with huge dynamic range
+ self.fail("underflowing exp() should not have raised "
+ "an exception")
+ if x != 0:
+ self.fail("underflowing exp() should have returned 0")
- if verbose:
- def test_exceptions(self):
- try:
- x = math.exp(-1000000000)
- except:
- # mathmodule.c is failing to weed out underflows from libm, or
- # we've got an fp format with huge dynamic range
- self.fail("underflowing exp() should not have raised "
- "an exception")
- if x != 0:
- self.fail("underflowing exp() should have returned 0")
+ # If this fails, probably using a strict IEEE-754 conforming libm, and x
+ # is +Inf afterwards. But Python wants overflows detected by default.
+ try:
+ x = math.exp(1000000000)
+ except OverflowError:
+ pass
+ else:
+ self.fail("overflowing exp() didn't trigger OverflowError")
- # If this fails, probably using a strict IEEE-754 conforming libm, and x
- # is +Inf afterwards. But Python wants overflows detected by default.
- try:
- x = math.exp(1000000000)
- except OverflowError:
- pass
- else:
- self.fail("overflowing exp() didn't trigger OverflowError")
-
- # If this fails, it could be a puzzle. One odd possibility is that
- # mathmodule.c's macros are getting confused while comparing
- # Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE
- # as a result (and so raising OverflowError instead).
- try:
- x = math.sqrt(-1.0)
- except ValueError:
- pass
- else:
- self.fail("sqrt(-1) didn't raise ValueError")
+ # If this fails, it could be a puzzle. One odd possibility is that
+ # mathmodule.c's macros are getting confused while comparing
+ # Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE
+ # as a result (and so raising OverflowError instead).
+ try:
+ x = math.sqrt(-1.0)
+ except ValueError:
+ pass
+ else:
+ self.fail("sqrt(-1) didn't raise ValueError")
@requires_IEEE_754
def test_testfile(self):
diff --git a/Lib/test/test_mmap.py b/Lib/test/test_mmap.py
--- a/Lib/test/test_mmap.py
+++ b/Lib/test/test_mmap.py
@@ -320,26 +320,25 @@
mf.close()
f.close()
+ @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_entire_file(self):
# test mapping of entire file by passing 0 for map length
- if hasattr(os, "stat"):
- f = open(TESTFN, "w+")
+ f = open(TESTFN, "w+")
- f.write(2**16 * 'm') # Arbitrary character
- f.close()
+ f.write(2**16 * 'm') # Arbitrary character
+ f.close()
- f = open(TESTFN, "rb+")
- mf = mmap.mmap(f.fileno(), 0)
- self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
- self.assertEqual(mf.read(2**16), 2**16 * "m")
- mf.close()
- f.close()
+ f = open(TESTFN, "rb+")
+ mf = mmap.mmap(f.fileno(), 0)
+ self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
+ self.assertEqual(mf.read(2**16), 2**16 * "m")
+ mf.close()
+ f.close()
+ @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_length_0_offset(self):
# Issue #10916: test mapping of remainder of file by passing 0 for
# map length with an offset doesn't cause a segfault.
- if not hasattr(os, "stat"):
- self.skipTest("needs os.stat")
# NOTE: allocation granularity is currently 65536 under Win64,
# and therefore the minimum offset alignment.
with open(TESTFN, "wb") as f:
@@ -352,12 +351,10 @@
finally:
mf.close()
+ @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_length_0_large_offset(self):
# Issue #10959: test mapping of a file by passing 0 for
# map length with a large offset doesn't cause a segfault.
- if not hasattr(os, "stat"):
- self.skipTest("needs os.stat")
-
with open(TESTFN, "wb") as f:
f.write(115699 * b'm') # Arbitrary character
@@ -538,9 +535,8 @@
return mmap.mmap.__new__(klass, -1, *args, **kwargs)
anon_mmap(PAGESIZE)
+ @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
def test_prot_readonly(self):
- if not hasattr(mmap, 'PROT_READ'):
- return
mapsize = 10
open(TESTFN, "wb").write("a"*mapsize)
f = open(TESTFN, "rb")
@@ -584,66 +580,68 @@
m.seek(8)
self.assertRaises(ValueError, m.write, "bar")
- if os.name == 'nt':
- def test_tagname(self):
- data1 = "0123456789"
- data2 = "abcdefghij"
- assert len(data1) == len(data2)
+ @unittest.skipUnless(os.name == 'nt', 'requires Windows')
+ def test_tagname(self):
+ data1 = "0123456789"
+ data2 = "abcdefghij"
+ assert len(data1) == len(data2)
- # Test same tag
- m1 = mmap.mmap(-1, len(data1), tagname="foo")
- m1[:] = data1
- m2 = mmap.mmap(-1, len(data2), tagname="foo")
- m2[:] = data2
- self.assertEqual(m1[:], data2)
- self.assertEqual(m2[:], data2)
- m2.close()
- m1.close()
+ # Test same tag
+ m1 = mmap.mmap(-1, len(data1), tagname="foo")
+ m1[:] = data1
+ m2 = mmap.mmap(-1, len(data2), tagname="foo")
+ m2[:] = data2
+ self.assertEqual(m1[:], data2)
+ self.assertEqual(m2[:], data2)
+ m2.close()
+ m1.close()
- # Test different tag
- m1 = mmap.mmap(-1, len(data1), tagname="foo")
- m1[:] = data1
- m2 = mmap.mmap(-1, len(data2), tagname="boo")
- m2[:] = data2
- self.assertEqual(m1[:], data1)
- self.assertEqual(m2[:], data2)
- m2.close()
- m1.close()
+ # Test different tag
+ m1 = mmap.mmap(-1, len(data1), tagname="foo")
+ m1[:] = data1
+ m2 = mmap.mmap(-1, len(data2), tagname="boo")
+ m2[:] = data2
+ self.assertEqual(m1[:], data1)
+ self.assertEqual(m2[:], data2)
+ m2.close()
+ m1.close()
- def test_crasher_on_windows(self):
- # Should not crash (Issue 1733986)
- m = mmap.mmap(-1, 1000, tagname="foo")
- try:
- mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
- except:
- pass
- m.close()
+ @unittest.skipUnless(os.name == 'nt', 'requires Windows')
+ def test_crasher_on_windows(self):
+ # Should not crash (Issue 1733986)
+ m = mmap.mmap(-1, 1000, tagname="foo")
+ try:
+ mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
+ except:
+ pass
+ m.close()
- # Should not crash (Issue 5385)
- open(TESTFN, "wb").write("x"*10)
- f = open(TESTFN, "r+b")
- m = mmap.mmap(f.fileno(), 0)
- f.close()
- try:
- m.resize(0) # will raise WindowsError
- except:
- pass
- try:
- m[:]
- except:
- pass
- m.close()
+ # Should not crash (Issue 5385)
+ open(TESTFN, "wb").write("x"*10)
+ f = open(TESTFN, "r+b")
+ m = mmap.mmap(f.fileno(), 0)
+ f.close()
+ try:
+ m.resize(0) # will raise WindowsError
+ except:
+ pass
+ try:
+ m[:]
+ except:
+ pass
+ m.close()
- def test_invalid_descriptor(self):
- # socket file descriptors are valid, but out of range
- # for _get_osfhandle, causing a crash when validating the
- # parameters to _get_osfhandle.
- s = socket.socket()
- try:
- with self.assertRaises(mmap.error):
- m = mmap.mmap(s.fileno(), 10)
- finally:
- s.close()
+ @unittest.skipUnless(os.name == 'nt', 'requires Windows')
+ def test_invalid_descriptor(self):
+ # socket file descriptors are valid, but out of range
+ # for _get_osfhandle, causing a crash when validating the
+ # parameters to _get_osfhandle.
+ s = socket.socket()
+ try:
+ with self.assertRaises(mmap.error):
+ m = mmap.mmap(s.fileno(), 10)
+ finally:
+ s.close()
class LargeMmapTests(unittest.TestCase):
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -83,9 +83,8 @@
open(name, "w")
self.files.append(name)
+ @unittest.skipUnless(hasattr(os, 'tempnam'), 'test needs os.tempnam()')
def test_tempnam(self):
- if not hasattr(os, "tempnam"):
- return
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
r"test_os$")
@@ -99,9 +98,8 @@
self.assertTrue(os.path.basename(name)[:3] == "pfx")
self.check_tempfile(name)
+ @unittest.skipUnless(hasattr(os, 'tmpfile'), 'test needs os.tmpfile()')
def test_tmpfile(self):
- if not hasattr(os, "tmpfile"):
- return
# As with test_tmpnam() below, the Windows implementation of tmpfile()
# attempts to create a file in the root directory of the current drive.
# On Vista and Server 2008, this test will always fail for normal users
@@ -150,9 +148,8 @@
fp.close()
self.assertTrue(s == "foobar")
+ @unittest.skipUnless(hasattr(os, 'tmpnam'), 'test needs os.tmpnam()')
def test_tmpnam(self):
- if not hasattr(os, "tmpnam"):
- return
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
r"test_os$")
@@ -193,10 +190,8 @@
os.unlink(self.fname)
os.rmdir(test_support.TESTFN)
+ @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_stat_attributes(self):
- if not hasattr(os, "stat"):
- return
-
import stat
result = os.stat(self.fname)
@@ -256,10 +251,8 @@
pass
+ @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
def test_statvfs_attributes(self):
- if not hasattr(os, "statvfs"):
- return
-
try:
result = os.statvfs(self.fname)
except OSError, e:
@@ -311,10 +304,10 @@
st2 = os.stat(test_support.TESTFN)
self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
- # Restrict test to Win32, since there is no guarantee other
+ # Restrict tests to Win32, since there is no guarantee other
# systems support centiseconds
- if sys.platform == 'win32':
- def get_file_system(path):
+ def get_file_system(path):
+ if sys.platform == 'win32':
root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
import ctypes
kernel32 = ctypes.windll.kernel32
@@ -322,25 +315,31 @@
if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)):
return buf.value
- if get_file_system(test_support.TESTFN) == "NTFS":
- def test_1565150(self):
- t1 = 1159195039.25
- os.utime(self.fname, (t1, t1))
- self.assertEqual(os.stat(self.fname).st_mtime, t1)
+ @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
+ @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
+ "requires NTFS")
+ def test_1565150(self):
+ t1 = 1159195039.25
+ os.utime(self.fname, (t1, t1))
+ self.assertEqual(os.stat(self.fname).st_mtime, t1)
- def test_large_time(self):
- t1 = 5000000000 # some day in 2128
- os.utime(self.fname, (t1, t1))
- self.assertEqual(os.stat(self.fname).st_mtime, t1)
+ @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
+ @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
+ "requires NTFS")
+ def test_large_time(self):
+ t1 = 5000000000 # some day in 2128
+ os.utime(self.fname, (t1, t1))
+ self.assertEqual(os.stat(self.fname).st_mtime, t1)
- def test_1686475(self):
- # Verify that an open file can be stat'ed
- try:
- os.stat(r"c:\pagefile.sys")
- except WindowsError, e:
- if e.errno == 2: # file does not exist; cannot run test
- return
- self.fail("Could not stat pagefile.sys")
+ @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
+ def test_1686475(self):
+ # Verify that an open file can be stat'ed
+ try:
+ os.stat(r"c:\pagefile.sys")
+ except WindowsError, e:
+ if e.errno == 2: # file does not exist; cannot run test
+ return
+ self.fail("Could not stat pagefile.sys")
from test import mapping_tests
@@ -598,6 +597,7 @@
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
+ at unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32ErrorTests(unittest.TestCase):
def test_rename(self):
self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak")
@@ -644,121 +644,118 @@
self.fail("%r didn't raise a OSError with a bad file descriptor"
% f)
+ @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
def test_isatty(self):
- if hasattr(os, "isatty"):
- self.assertEqual(os.isatty(test_support.make_bad_fd()), False)
+ self.assertEqual(os.isatty(test_support.make_bad_fd()), False)
+ @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
def test_closerange(self):
- if hasattr(os, "closerange"):
- fd = test_support.make_bad_fd()
- # Make sure none of the descriptors we are about to close are
- # currently valid (issue 6542).
- for i in range(10):
- try: os.fstat(fd+i)
- except OSError:
- pass
- else:
- break
- if i < 2:
- raise unittest.SkipTest(
- "Unable to acquire a range of invalid file descriptors")
- self.assertEqual(os.closerange(fd, fd + i-1), None)
+ fd = test_support.make_bad_fd()
+ # Make sure none of the descriptors we are about to close are
+ # currently valid (issue 6542).
+ for i in range(10):
+ try: os.fstat(fd+i)
+ except OSError:
+ pass
+ else:
+ break
+ if i < 2:
+ raise unittest.SkipTest(
+ "Unable to acquire a range of invalid file descriptors")
+ self.assertEqual(os.closerange(fd, fd + i-1), None)
+ @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
def test_dup2(self):
- if hasattr(os, "dup2"):
- self.check(os.dup2, 20)
+ self.check(os.dup2, 20)
+ @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
def test_fchmod(self):
- if hasattr(os, "fchmod"):
- self.check(os.fchmod, 0)
+ self.check(os.fchmod, 0)
+ @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
def test_fchown(self):
- if hasattr(os, "fchown"):
- self.check(os.fchown, -1, -1)
+ self.check(os.fchown, -1, -1)
+ @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
def test_fpathconf(self):
- if hasattr(os, "fpathconf"):
- self.check(os.fpathconf, "PC_NAME_MAX")
+ self.check(os.fpathconf, "PC_NAME_MAX")
+ @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
def test_ftruncate(self):
- if hasattr(os, "ftruncate"):
- self.check(os.ftruncate, 0)
+ self.check(os.ftruncate, 0)
+ @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
def test_lseek(self):
- if hasattr(os, "lseek"):
- self.check(os.lseek, 0, 0)
+ self.check(os.lseek, 0, 0)
+ @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
def test_read(self):
- if hasattr(os, "read"):
- self.check(os.read, 1)
+ self.check(os.read, 1)
+ @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
def test_tcsetpgrpt(self):
- if hasattr(os, "tcsetpgrp"):
- self.check(os.tcsetpgrp, 0)
+ self.check(os.tcsetpgrp, 0)
+ @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
def test_write(self):
- if hasattr(os, "write"):
- self.check(os.write, " ")
+ self.check(os.write, " ")
-if sys.platform != 'win32':
- class Win32ErrorTests(unittest.TestCase):
- pass
+ at unittest.skipIf(sys.platform == "win32", "Posix specific tests")
+class PosixUidGidTests(unittest.TestCase):
+ @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
+ def test_setuid(self):
+ if os.getuid() != 0:
+ self.assertRaises(os.error, os.setuid, 0)
+ self.assertRaises(OverflowError, os.setuid, 1<<32)
- class PosixUidGidTests(unittest.TestCase):
- if hasattr(os, 'setuid'):
- def test_setuid(self):
- if os.getuid() != 0:
- self.assertRaises(os.error, os.setuid, 0)
- self.assertRaises(OverflowError, os.setuid, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
+ def test_setgid(self):
+ if os.getuid() != 0:
+ self.assertRaises(os.error, os.setgid, 0)
+ self.assertRaises(OverflowError, os.setgid, 1<<32)
- if hasattr(os, 'setgid'):
- def test_setgid(self):
- if os.getuid() != 0:
- self.assertRaises(os.error, os.setgid, 0)
- self.assertRaises(OverflowError, os.setgid, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
+ def test_seteuid(self):
+ if os.getuid() != 0:
+ self.assertRaises(os.error, os.seteuid, 0)
+ self.assertRaises(OverflowError, os.seteuid, 1<<32)
- if hasattr(os, 'seteuid'):
- def test_seteuid(self):
- if os.getuid() != 0:
- self.assertRaises(os.error, os.seteuid, 0)
- self.assertRaises(OverflowError, os.seteuid, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
+ def test_setegid(self):
+ if os.getuid() != 0:
+ self.assertRaises(os.error, os.setegid, 0)
+ self.assertRaises(OverflowError, os.setegid, 1<<32)
- if hasattr(os, 'setegid'):
- def test_setegid(self):
- if os.getuid() != 0:
- self.assertRaises(os.error, os.setegid, 0)
- self.assertRaises(OverflowError, os.setegid, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
+ def test_setreuid(self):
+ if os.getuid() != 0:
+ self.assertRaises(os.error, os.setreuid, 0, 0)
+ self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
+ self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
- if hasattr(os, 'setreuid'):
- def test_setreuid(self):
- if os.getuid() != 0:
- self.assertRaises(os.error, os.setreuid, 0, 0)
- self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
- self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
+ def test_setreuid_neg1(self):
+ # Needs to accept -1. We run this in a subprocess to avoid
+ # altering the test runner's process state (issue8045).
+ subprocess.check_call([
+ sys.executable, '-c',
+ 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
- def test_setreuid_neg1(self):
- # Needs to accept -1. We run this in a subprocess to avoid
- # altering the test runner's process state (issue8045).
- subprocess.check_call([
- sys.executable, '-c',
- 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
+ @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
+ def test_setregid(self):
+ if os.getuid() != 0:
+ self.assertRaises(os.error, os.setregid, 0, 0)
+ self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
+ self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
- if hasattr(os, 'setregid'):
- def test_setregid(self):
- if os.getuid() != 0:
- self.assertRaises(os.error, os.setregid, 0, 0)
- self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
- self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
+ def test_setregid_neg1(self):
+ # Needs to accept -1. We run this in a subprocess to avoid
+ # altering the test runner's process state (issue8045).
+ subprocess.check_call([
+ sys.executable, '-c',
+ 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
- def test_setregid_neg1(self):
- # Needs to accept -1. We run this in a subprocess to avoid
- # altering the test runner's process state (issue8045).
- subprocess.check_call([
- sys.executable, '-c',
- 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
-else:
- class PosixUidGidTests(unittest.TestCase):
- pass
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32KillTests(unittest.TestCase):
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -11,7 +11,7 @@
import time
import errno
-from unittest import TestCase
+from unittest import TestCase, skipUnless
from test import test_support
from test.test_support import HOST
threading = test_support.import_module('threading')
@@ -263,17 +263,20 @@
else:
DummyPOP3Handler.handle_read(self)
- class TestPOP3_SSLClass(TestPOP3Class):
- # repeat previous tests by using poplib.POP3_SSL
+requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
- def setUp(self):
- self.server = DummyPOP3Server((HOST, 0))
- self.server.handler = DummyPOP3_SSLHandler
- self.server.start()
- self.client = poplib.POP3_SSL(self.server.host, self.server.port)
+ at requires_ssl
+class TestPOP3_SSLClass(TestPOP3Class):
+ # repeat previous tests by using poplib.POP3_SSL
- def test__all__(self):
- self.assertIn('POP3_SSL', poplib.__all__)
+ def setUp(self):
+ self.server = DummyPOP3Server((HOST, 0))
+ self.server.handler = DummyPOP3_SSLHandler
+ self.server.start()
+ self.client = poplib.POP3_SSL(self.server.host, self.server.port)
+
+ def test__all__(self):
+ self.assertIn('POP3_SSL', poplib.__all__)
class TestTimeouts(TestCase):
@@ -331,9 +334,8 @@
def test_main():
- tests = [TestPOP3Class, TestTimeouts]
- if SUPPORTS_SSL:
- tests.append(TestPOP3_SSLClass)
+ tests = [TestPOP3Class, TestTimeouts,
+ TestPOP3_SSLClass]
thread_info = test_support.threading_setup()
try:
test_support.run_unittest(*tests)
diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py
--- a/Lib/test/test_posix.py
+++ b/Lib/test/test_posix.py
@@ -53,47 +53,55 @@
posix_func()
self.assertRaises(TypeError, posix_func, 1)
- if hasattr(posix, 'getresuid'):
- def test_getresuid(self):
- user_ids = posix.getresuid()
- self.assertEqual(len(user_ids), 3)
- for val in user_ids:
- self.assertGreaterEqual(val, 0)
+ @unittest.skipUnless(hasattr(posix, 'getresuid'),
+ 'test needs posix.getresuid()')
+ def test_getresuid(self):
+ user_ids = posix.getresuid()
+ self.assertEqual(len(user_ids), 3)
+ for val in user_ids:
+ self.assertGreaterEqual(val, 0)
- if hasattr(posix, 'getresgid'):
- def test_getresgid(self):
- group_ids = posix.getresgid()
- self.assertEqual(len(group_ids), 3)
- for val in group_ids:
- self.assertGreaterEqual(val, 0)
+ @unittest.skipUnless(hasattr(posix, 'getresgid'),
+ 'test needs posix.getresgid()')
+ def test_getresgid(self):
+ group_ids = posix.getresgid()
+ self.assertEqual(len(group_ids), 3)
+ for val in group_ids:
+ self.assertGreaterEqual(val, 0)
- if hasattr(posix, 'setresuid'):
- def test_setresuid(self):
- current_user_ids = posix.getresuid()
- self.assertIsNone(posix.setresuid(*current_user_ids))
- # -1 means don't change that value.
- self.assertIsNone(posix.setresuid(-1, -1, -1))
+ @unittest.skipUnless(hasattr(posix, 'setresuid'),
+ 'test needs posix.setresuid()')
+ def test_setresuid(self):
+ current_user_ids = posix.getresuid()
+ self.assertIsNone(posix.setresuid(*current_user_ids))
+ # -1 means don't change that value.
+ self.assertIsNone(posix.setresuid(-1, -1, -1))
- def test_setresuid_exception(self):
- # Don't do this test if someone is silly enough to run us as root.
- current_user_ids = posix.getresuid()
- if 0 not in current_user_ids:
- new_user_ids = (current_user_ids[0]+1, -1, -1)
- self.assertRaises(OSError, posix.setresuid, *new_user_ids)
+ @unittest.skipUnless(hasattr(posix, 'setresuid'),
+ 'test needs posix.setresuid()')
+ def test_setresuid_exception(self):
+ # Don't do this test if someone is silly enough to run us as root.
+ current_user_ids = posix.getresuid()
+ if 0 not in current_user_ids:
+ new_user_ids = (current_user_ids[0]+1, -1, -1)
+ self.assertRaises(OSError, posix.setresuid, *new_user_ids)
- if hasattr(posix, 'setresgid'):
- def test_setresgid(self):
- current_group_ids = posix.getresgid()
- self.assertIsNone(posix.setresgid(*current_group_ids))
- # -1 means don't change that value.
- self.assertIsNone(posix.setresgid(-1, -1, -1))
+ @unittest.skipUnless(hasattr(posix, 'setresgid'),
+ 'test needs posix.setresgid()')
+ def test_setresgid(self):
+ current_group_ids = posix.getresgid()
+ self.assertIsNone(posix.setresgid(*current_group_ids))
+ # -1 means don't change that value.
+ self.assertIsNone(posix.setresgid(-1, -1, -1))
- def test_setresgid_exception(self):
- # Don't do this test if someone is silly enough to run us as root.
- current_group_ids = posix.getresgid()
- if 0 not in current_group_ids:
- new_group_ids = (current_group_ids[0]+1, -1, -1)
- self.assertRaises(OSError, posix.setresgid, *new_group_ids)
+ @unittest.skipUnless(hasattr(posix, 'setresgid'),
+ 'test needs posix.setresgid()')
+ def test_setresgid_exception(self):
+ # Don't do this test if someone is silly enough to run us as root.
+ current_group_ids = posix.getresgid()
+ if 0 not in current_group_ids:
+ new_group_ids = (current_group_ids[0]+1, -1, -1)
+ self.assertRaises(OSError, posix.setresgid, *new_group_ids)
@unittest.skipUnless(hasattr(posix, 'initgroups'),
"test needs os.initgroups()")
@@ -120,107 +128,118 @@
else:
self.fail("Expected OSError to be raised by initgroups")
+ @unittest.skipUnless(hasattr(posix, 'statvfs'),
+ 'test needs posix.statvfs()')
def test_statvfs(self):
- if hasattr(posix, 'statvfs'):
- self.assertTrue(posix.statvfs(os.curdir))
+ self.assertTrue(posix.statvfs(os.curdir))
+ @unittest.skipUnless(hasattr(posix, 'fstatvfs'),
+ 'test needs posix.fstatvfs()')
def test_fstatvfs(self):
- if hasattr(posix, 'fstatvfs'):
- fp = open(test_support.TESTFN)
- try:
- self.assertTrue(posix.fstatvfs(fp.fileno()))
- finally:
- fp.close()
+ fp = open(test_support.TESTFN)
+ try:
+ self.assertTrue(posix.fstatvfs(fp.fileno()))
+ finally:
+ fp.close()
+ @unittest.skipUnless(hasattr(posix, 'ftruncate'),
+ 'test needs posix.ftruncate()')
def test_ftruncate(self):
- if hasattr(posix, 'ftruncate'):
- fp = open(test_support.TESTFN, 'w+')
- try:
- # we need to have some data to truncate
- fp.write('test')
- fp.flush()
- posix.ftruncate(fp.fileno(), 0)
- finally:
- fp.close()
+ fp = open(test_support.TESTFN, 'w+')
+ try:
+ # we need to have some data to truncate
+ fp.write('test')
+ fp.flush()
+ posix.ftruncate(fp.fileno(), 0)
+ finally:
+ fp.close()
+ @unittest.skipUnless(hasattr(posix, 'dup'),
+ 'test needs posix.dup()')
def test_dup(self):
- if hasattr(posix, 'dup'):
- fp = open(test_support.TESTFN)
- try:
- fd = posix.dup(fp.fileno())
- self.assertIsInstance(fd, int)
- os.close(fd)
- finally:
- fp.close()
+ fp = open(test_support.TESTFN)
+ try:
+ fd = posix.dup(fp.fileno())
+ self.assertIsInstance(fd, int)
+ os.close(fd)
+ finally:
+ fp.close()
+ @unittest.skipUnless(hasattr(posix, 'confstr'),
+ 'test needs posix.confstr()')
def test_confstr(self):
- if hasattr(posix, 'confstr'):
- self.assertRaises(ValueError, posix.confstr, "CS_garbage")
- self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
+ self.assertRaises(ValueError, posix.confstr, "CS_garbage")
+ self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
+ @unittest.skipUnless(hasattr(posix, 'dup2'),
+ 'test needs posix.dup2()')
def test_dup2(self):
- if hasattr(posix, 'dup2'):
- fp1 = open(test_support.TESTFN)
- fp2 = open(test_support.TESTFN)
- try:
- posix.dup2(fp1.fileno(), fp2.fileno())
- finally:
- fp1.close()
- fp2.close()
+ fp1 = open(test_support.TESTFN)
+ fp2 = open(test_support.TESTFN)
+ try:
+ posix.dup2(fp1.fileno(), fp2.fileno())
+ finally:
+ fp1.close()
+ fp2.close()
def fdopen_helper(self, *args):
fd = os.open(test_support.TESTFN, os.O_RDONLY)
fp2 = posix.fdopen(fd, *args)
fp2.close()
+ @unittest.skipUnless(hasattr(posix, 'fdopen'),
+ 'test needs posix.fdopen()')
def test_fdopen(self):
- if hasattr(posix, 'fdopen'):
- self.fdopen_helper()
- self.fdopen_helper('r')
- self.fdopen_helper('r', 100)
+ self.fdopen_helper()
+ self.fdopen_helper('r')
+ self.fdopen_helper('r', 100)
+ @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'),
+ 'test needs posix.O_EXLOCK')
def test_osexlock(self):
- if hasattr(posix, "O_EXLOCK"):
+ fd = os.open(test_support.TESTFN,
+ os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
+ self.assertRaises(OSError, os.open, test_support.TESTFN,
+ os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
+ os.close(fd)
+
+ if hasattr(posix, "O_SHLOCK"):
fd = os.open(test_support.TESTFN,
- os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
+ os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
self.assertRaises(OSError, os.open, test_support.TESTFN,
os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
os.close(fd)
- if hasattr(posix, "O_SHLOCK"):
- fd = os.open(test_support.TESTFN,
- os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
- self.assertRaises(OSError, os.open, test_support.TESTFN,
- os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
- os.close(fd)
+ @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'),
+ 'test needs posix.O_SHLOCK')
+ def test_osshlock(self):
+ fd1 = os.open(test_support.TESTFN,
+ os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
+ fd2 = os.open(test_support.TESTFN,
+ os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
+ os.close(fd2)
+ os.close(fd1)
- def test_osshlock(self):
- if hasattr(posix, "O_SHLOCK"):
- fd1 = os.open(test_support.TESTFN,
+ if hasattr(posix, "O_EXLOCK"):
+ fd = os.open(test_support.TESTFN,
os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
- fd2 = os.open(test_support.TESTFN,
- os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
- os.close(fd2)
- os.close(fd1)
+ self.assertRaises(OSError, os.open, test_support.TESTFN,
+ os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
+ os.close(fd)
- if hasattr(posix, "O_EXLOCK"):
- fd = os.open(test_support.TESTFN,
- os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
- self.assertRaises(OSError, os.open, test_support.TESTFN,
- os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
- os.close(fd)
+ @unittest.skipUnless(hasattr(posix, 'fstat'),
+ 'test needs posix.fstat()')
+ def test_fstat(self):
+ fp = open(test_support.TESTFN)
+ try:
+ self.assertTrue(posix.fstat(fp.fileno()))
+ finally:
+ fp.close()
- def test_fstat(self):
- if hasattr(posix, 'fstat'):
- fp = open(test_support.TESTFN)
- try:
- self.assertTrue(posix.fstat(fp.fileno()))
- finally:
- fp.close()
-
+ @unittest.skipUnless(hasattr(posix, 'stat'),
+ 'test needs posix.stat()')
def test_stat(self):
- if hasattr(posix, 'stat'):
- self.assertTrue(posix.stat(test_support.TESTFN))
+ self.assertTrue(posix.stat(test_support.TESTFN))
def _test_all_chown_common(self, chown_func, first_param, stat_func):
"""Common code for chown, fchown and lchown tests."""
@@ -313,59 +332,62 @@
self._test_all_chown_common(posix.lchown, test_support.TESTFN,
getattr(posix, 'lstat', None))
+ @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()')
def test_chdir(self):
- if hasattr(posix, 'chdir'):
- posix.chdir(os.curdir)
- self.assertRaises(OSError, posix.chdir, test_support.TESTFN)
+ posix.chdir(os.curdir)
+ self.assertRaises(OSError, posix.chdir, test_support.TESTFN)
+ @unittest.skipUnless(hasattr(posix, 'lsdir'), 'test needs posix.lsdir()')
def test_lsdir(self):
- if hasattr(posix, 'lsdir'):
- self.assertIn(test_support.TESTFN, posix.lsdir(os.curdir))
+ self.assertIn(test_support.TESTFN, posix.lsdir(os.curdir))
+ @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()')
def test_access(self):
- if hasattr(posix, 'access'):
- self.assertTrue(posix.access(test_support.TESTFN, os.R_OK))
+ self.assertTrue(posix.access(test_support.TESTFN, os.R_OK))
+ @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()')
def test_umask(self):
- if hasattr(posix, 'umask'):
- old_mask = posix.umask(0)
- self.assertIsInstance(old_mask, int)
- posix.umask(old_mask)
+ old_mask = posix.umask(0)
+ self.assertIsInstance(old_mask, int)
+ posix.umask(old_mask)
+ @unittest.skipUnless(hasattr(posix, 'strerror'),
+ 'test needs posix.strerror()')
def test_strerror(self):
- if hasattr(posix, 'strerror'):
- self.assertTrue(posix.strerror(0))
+ self.assertTrue(posix.strerror(0))
+ @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()')
def test_pipe(self):
- if hasattr(posix, 'pipe'):
- reader, writer = posix.pipe()
- os.close(reader)
- os.close(writer)
+ reader, writer = posix.pipe()
+ os.close(reader)
+ os.close(writer)
+ @unittest.skipUnless(hasattr(posix, 'tempnam'),
+ 'test needs posix.tempnam()')
def test_tempnam(self):
- if hasattr(posix, 'tempnam'):
- with warnings.catch_warnings():
- warnings.filterwarnings("ignore", "tempnam", DeprecationWarning)
- self.assertTrue(posix.tempnam())
- self.assertTrue(posix.tempnam(os.curdir))
- self.assertTrue(posix.tempnam(os.curdir, 'blah'))
+ with warnings.catch_warnings():
+ warnings.filterwarnings("ignore", "tempnam", DeprecationWarning)
+ self.assertTrue(posix.tempnam())
+ self.assertTrue(posix.tempnam(os.curdir))
+ self.assertTrue(posix.tempnam(os.curdir, 'blah'))
+ @unittest.skipUnless(hasattr(posix, 'tmpfile'),
+ 'test needs posix.tmpfile()')
def test_tmpfile(self):
- if hasattr(posix, 'tmpfile'):
- with warnings.catch_warnings():
- warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning)
- fp = posix.tmpfile()
- fp.close()
+ with warnings.catch_warnings():
+ warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning)
+ fp = posix.tmpfile()
+ fp.close()
+ @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()')
def test_utime(self):
- if hasattr(posix, 'utime'):
- now = time.time()
- posix.utime(test_support.TESTFN, None)
- self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, None))
- self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (now, None))
- self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, now))
- posix.utime(test_support.TESTFN, (int(now), int(now)))
- posix.utime(test_support.TESTFN, (now, now))
+ now = time.time()
+ posix.utime(test_support.TESTFN, None)
+ self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, None))
+ self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (now, None))
+ self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, now))
+ posix.utime(test_support.TESTFN, (int(now), int(now)))
+ posix.utime(test_support.TESTFN, (now, now))
def _test_chflags_regular_file(self, chflags_func, target_file):
st = os.stat(target_file)
@@ -428,56 +450,57 @@
finally:
posix.lchflags(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
+ @unittest.skipUnless(hasattr(posix, 'getcwd'),
+ 'test needs posix.getcwd()')
def test_getcwd_long_pathnames(self):
- if hasattr(posix, 'getcwd'):
- dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
- curdir = os.getcwd()
- base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
+ dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
+ curdir = os.getcwd()
+ base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
- try:
- os.mkdir(base_path)
- os.chdir(base_path)
- except:
-# Just returning nothing instead of the SkipTest exception,
-# because the test results in Error in that case.
-# Is that ok?
-# raise unittest.SkipTest, "cannot create directory for testing"
- return
+ try:
+ os.mkdir(base_path)
+ os.chdir(base_path)
+ except:
+# Just returning nothing instead of the SkipTest exception,
+# because the test results in Error in that case.
+# Is that ok?
+# raise unittest.SkipTest, "cannot create directory for testing"
+ return
- try:
- def _create_and_do_getcwd(dirname, current_path_length = 0):
- try:
- os.mkdir(dirname)
- except:
- raise unittest.SkipTest, "mkdir cannot create directory sufficiently deep for getcwd test"
+ try:
+ def _create_and_do_getcwd(dirname, current_path_length = 0):
+ try:
+ os.mkdir(dirname)
+ except:
+ raise unittest.SkipTest, "mkdir cannot create directory sufficiently deep for getcwd test"
- os.chdir(dirname)
- try:
- os.getcwd()
- if current_path_length < 4099:
- _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
- except OSError as e:
- expected_errno = errno.ENAMETOOLONG
- # The following platforms have quirky getcwd()
- # behaviour -- see issue 9185 and 15765 for
- # more information.
- quirky_platform = (
- 'sunos' in sys.platform or
- 'netbsd' in sys.platform or
- 'openbsd' in sys.platform
- )
- if quirky_platform:
- expected_errno = errno.ERANGE
- self.assertEqual(e.errno, expected_errno)
- finally:
- os.chdir('..')
- os.rmdir(dirname)
+ os.chdir(dirname)
+ try:
+ os.getcwd()
+ if current_path_length < 4099:
+ _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
+ except OSError as e:
+ expected_errno = errno.ENAMETOOLONG
+ # The following platforms have quirky getcwd()
+ # behaviour -- see issue 9185 and 15765 for
+ # more information.
+ quirky_platform = (
+ 'sunos' in sys.platform or
+ 'netbsd' in sys.platform or
+ 'openbsd' in sys.platform
+ )
+ if quirky_platform:
+ expected_errno = errno.ERANGE
+ self.assertEqual(e.errno, expected_errno)
+ finally:
+ os.chdir('..')
+ os.rmdir(dirname)
- _create_and_do_getcwd(dirname)
+ _create_and_do_getcwd(dirname)
- finally:
- os.chdir(curdir)
- shutil.rmtree(base_path)
+ finally:
+ os.chdir(curdir)
+ shutil.rmtree(base_path)
@unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
def test_getgroups(self):
@@ -522,7 +545,7 @@
posix.initgroups(name, self.saved_groups[0])
@unittest.skipUnless(hasattr(posix, 'initgroups'),
- "test needs posix.initgroups()")
+ 'test needs posix.initgroups()')
def test_initgroups(self):
# find missing group
@@ -532,7 +555,7 @@
self.assertIn(g, posix.getgroups())
@unittest.skipUnless(hasattr(posix, 'setgroups'),
- "test needs posix.setgroups()")
+ 'test needs posix.setgroups()')
def test_setgroups(self):
for groups in [[0], range(16)]:
posix.setgroups(groups)
diff --git a/Lib/test/test_set.py b/Lib/test/test_set.py
--- a/Lib/test/test_set.py
+++ b/Lib/test/test_set.py
@@ -561,10 +561,10 @@
s = None
self.assertRaises(ReferenceError, str, p)
- # C API test only available in a debug build
- if hasattr(set, "test_c_api"):
- def test_c_api(self):
- self.assertEqual(set().test_c_api(), True)
+ @unittest.skipUnless(hasattr(set, "test_c_api"),
+ 'C API test only available in a debug build')
+ def test_c_api(self):
+ self.assertEqual(set().test_c_api(), True)
class SetSubclass(set):
pass
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -78,33 +78,34 @@
filename = tempfile.mktemp()
self.assertRaises(OSError, shutil.rmtree, filename)
- # See bug #1071513 for why we don't run this on cygwin
- # and bug #1076467 for why we don't run this as root.
- if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin'
- and not (hasattr(os, 'geteuid') and os.geteuid() == 0)):
- def test_on_error(self):
- self.errorState = 0
- os.mkdir(TESTFN)
- self.childpath = os.path.join(TESTFN, 'a')
- f = open(self.childpath, 'w')
- f.close()
- old_dir_mode = os.stat(TESTFN).st_mode
- old_child_mode = os.stat(self.childpath).st_mode
- # Make unwritable.
- os.chmod(self.childpath, stat.S_IREAD)
- os.chmod(TESTFN, stat.S_IREAD)
+ @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
+ @unittest.skipIf(sys.platform[:6] == 'cygwin',
+ "This test can't be run on Cygwin (issue #1071513).")
+ @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
+ "This test can't be run reliably as root (issue #1076467).")
+ def test_on_error(self):
+ self.errorState = 0
+ os.mkdir(TESTFN)
+ self.childpath = os.path.join(TESTFN, 'a')
+ f = open(self.childpath, 'w')
+ f.close()
+ old_dir_mode = os.stat(TESTFN).st_mode
+ old_child_mode = os.stat(self.childpath).st_mode
+ # Make unwritable.
+ os.chmod(self.childpath, stat.S_IREAD)
+ os.chmod(TESTFN, stat.S_IREAD)
- shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
- # Test whether onerror has actually been called.
- self.assertEqual(self.errorState, 2,
- "Expected call to onerror function did not happen.")
+ shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
+ # Test whether onerror has actually been called.
+ self.assertEqual(self.errorState, 2,
+ "Expected call to onerror function did not happen.")
- # Make writable again.
- os.chmod(TESTFN, old_dir_mode)
- os.chmod(self.childpath, old_child_mode)
+ # Make writable again.
+ os.chmod(TESTFN, old_dir_mode)
+ os.chmod(self.childpath, old_child_mode)
- # Clean up.
- shutil.rmtree(TESTFN)
+ # Clean up.
+ shutil.rmtree(TESTFN)
def check_args_to_onerror(self, func, arg, exc):
# test_rmtree_errors deliberately runs rmtree
@@ -308,37 +309,38 @@
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
- if hasattr(os, "mkfifo"):
- # Issue #3002: copyfile and copytree block indefinitely on named pipes
- def test_copyfile_named_pipe(self):
- os.mkfifo(TESTFN)
+ # Issue #3002: copyfile and copytree block indefinitely on named pipes
+ @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
+ def test_copyfile_named_pipe(self):
+ os.mkfifo(TESTFN)
+ try:
+ self.assertRaises(shutil.SpecialFileError,
+ shutil.copyfile, TESTFN, TESTFN2)
+ self.assertRaises(shutil.SpecialFileError,
+ shutil.copyfile, __file__, TESTFN)
+ finally:
+ os.remove(TESTFN)
+
+ @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
+ def test_copytree_named_pipe(self):
+ os.mkdir(TESTFN)
+ try:
+ subdir = os.path.join(TESTFN, "subdir")
+ os.mkdir(subdir)
+ pipe = os.path.join(subdir, "mypipe")
+ os.mkfifo(pipe)
try:
- self.assertRaises(shutil.SpecialFileError,
- shutil.copyfile, TESTFN, TESTFN2)
- self.assertRaises(shutil.SpecialFileError,
- shutil.copyfile, __file__, TESTFN)
- finally:
- os.remove(TESTFN)
-
- def test_copytree_named_pipe(self):
- os.mkdir(TESTFN)
- try:
- subdir = os.path.join(TESTFN, "subdir")
- os.mkdir(subdir)
- pipe = os.path.join(subdir, "mypipe")
- os.mkfifo(pipe)
- try:
- shutil.copytree(TESTFN, TESTFN2)
- except shutil.Error as e:
- errors = e.args[0]
- self.assertEqual(len(errors), 1)
- src, dst, error_msg = errors[0]
- self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
- else:
- self.fail("shutil.Error should have been raised")
- finally:
- shutil.rmtree(TESTFN, ignore_errors=True)
- shutil.rmtree(TESTFN2, ignore_errors=True)
+ shutil.copytree(TESTFN, TESTFN2)
+ except shutil.Error as e:
+ errors = e.args[0]
+ self.assertEqual(len(errors), 1)
+ src, dst, error_msg = errors[0]
+ self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
+ else:
+ self.fail("shutil.Error should have been raised")
+ finally:
+ shutil.rmtree(TESTFN, ignore_errors=True)
+ shutil.rmtree(TESTFN2, ignore_errors=True)
@unittest.skipUnless(hasattr(os, 'chflags') and
hasattr(errno, 'EOPNOTSUPP') and
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -343,16 +343,17 @@
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
+ @unittest.skipUnless(hasattr(sys, 'getrefcount'),
+ 'test needs sys.getrefcount()')
def testRefCountGetNameInfo(self):
# Testing reference count for getnameinfo
- if hasattr(sys, "getrefcount"):
- try:
- # On some versions, this loses a reference
- orig = sys.getrefcount(__name__)
- socket.getnameinfo(__name__,0)
- except TypeError:
- self.assertEqual(sys.getrefcount(__name__), orig,
- "socket.getnameinfo loses a reference")
+ try:
+ # On some versions, this loses a reference
+ orig = sys.getrefcount(__name__)
+ socket.getnameinfo(__name__,0)
+ except TypeError:
+ self.assertEqual(sys.getrefcount(__name__), orig,
+ "socket.getnameinfo loses a reference")
def testInterpreterCrash(self):
# Making sure getnameinfo doesn't crash the interpreter
@@ -459,17 +460,17 @@
# Check that setting it to an invalid type raises TypeError
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
+ @unittest.skipUnless(hasattr(socket, 'inet_aton'),
+ 'test needs socket.inet_aton()')
def testIPv4_inet_aton_fourbytes(self):
- if not hasattr(socket, 'inet_aton'):
- return # No inet_aton, nothing to check
# Test that issue1008086 and issue767150 are fixed.
# It must return 4 bytes.
self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))
self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))
+ @unittest.skipUnless(hasattr(socket, 'inet_pton'),
+ 'test needs socket.inet_pton()')
def testIPv4toString(self):
- if not hasattr(socket, 'inet_pton'):
- return # No inet_pton() on this platform
from socket import inet_aton as f, inet_pton, AF_INET
g = lambda a: inet_pton(AF_INET, a)
@@ -484,9 +485,9 @@
self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))
+ @unittest.skipUnless(hasattr(socket, 'inet_pton'),
+ 'test needs socket.inet_pton()')
def testIPv6toString(self):
- if not hasattr(socket, 'inet_pton'):
- return # No inet_pton() on this platform
try:
from socket import inet_pton, AF_INET6, has_ipv6
if not has_ipv6:
@@ -503,9 +504,9 @@
f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
)
+ @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
+ 'test needs socket.inet_ntop()')
def testStringToIPv4(self):
- if not hasattr(socket, 'inet_ntop'):
- return # No inet_ntop() on this platform
from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a)
@@ -518,9 +519,9 @@
self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))
self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))
+ @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
+ 'test needs socket.inet_ntop()')
def testStringToIPv6(self):
- if not hasattr(socket, 'inet_ntop'):
- return # No inet_ntop() on this platform
try:
from socket import inet_ntop, AF_INET6, has_ipv6
if not has_ipv6:
@@ -871,6 +872,8 @@
self.cli.connect((HOST, self.port))
time.sleep(1.0)
+ at unittest.skipUnless(hasattr(socket, 'socketpair'),
+ 'test needs socket.socketpair()')
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicSocketPairTest(SocketPairTest):
@@ -1456,12 +1459,12 @@
if not ok:
self.fail("accept() returned success when we did not expect it")
+ @unittest.skipUnless(hasattr(signal, 'alarm'),
+ 'test needs signal.alarm()')
def testInterruptedTimeout(self):
# XXX I don't know how to do this test on MSWindows or any other
# plaform that doesn't support signal.alarm() or os.kill(), though
# the bug should have existed on all platforms.
- if not hasattr(signal, "alarm"):
- return # can only test on *nix
self.serv.settimeout(5.0) # must be longer than alarm
class Alarm(Exception):
pass
@@ -1521,6 +1524,7 @@
self.assertTrue(issubclass(socket.gaierror, socket.error))
self.assertTrue(issubclass(socket.timeout, socket.error))
+ at unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
class TestLinuxAbstractNamespace(unittest.TestCase):
UNIX_PATH_MAX = 108
@@ -1635,11 +1639,11 @@
for line in f:
if line.startswith("tipc "):
return True
- if test_support.verbose:
- print "TIPC module is not loaded, please 'sudo modprobe tipc'"
return False
-class TIPCTest (unittest.TestCase):
+ at unittest.skipUnless(isTipcAvailable(),
+ "TIPC module is not loaded, please 'sudo modprobe tipc'")
+class TIPCTest(unittest.TestCase):
def testRDM(self):
srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
@@ -1659,7 +1663,9 @@
self.assertEqual(msg, MSG)
-class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
+ at unittest.skipUnless(isTipcAvailable(),
+ "TIPC module is not loaded, please 'sudo modprobe tipc'")
+class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
def __init__(self, methodName = 'runTest'):
unittest.TestCase.__init__(self, methodName = methodName)
ThreadableTest.__init__(self)
@@ -1712,13 +1718,9 @@
NetworkConnectionAttributesTest,
NetworkConnectionBehaviourTest,
])
- if hasattr(socket, "socketpair"):
- tests.append(BasicSocketPairTest)
- if sys.platform == 'linux2':
- tests.append(TestLinuxAbstractNamespace)
- if isTipcAvailable():
- tests.append(TIPCTest)
- tests.append(TIPCThreadableTest)
+ tests.append(BasicSocketPairTest)
+ tests.append(TestLinuxAbstractNamespace)
+ tests.extend([TIPCTest, TIPCThreadableTest])
thread_info = test_support.threading_setup()
test_support.run_unittest(*tests)
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -27,7 +27,10 @@
HOST = test.test_support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
+requires_unix_sockets = unittest.skipUnless(HAVE_UNIX_SOCKETS,
+ 'requires Unix sockets')
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
+requires_forking = unittest.skipUnless(HAVE_FORKING, 'requires forking')
def signal_alarm(n):
"""Call signal.alarm when it exists (i.e. not on Windows)."""
@@ -188,31 +191,33 @@
SocketServer.StreamRequestHandler,
self.stream_examine)
- if HAVE_FORKING:
- def test_ForkingTCPServer(self):
- with simple_subprocess(self):
- self.run_server(SocketServer.ForkingTCPServer,
- SocketServer.StreamRequestHandler,
- self.stream_examine)
-
- if HAVE_UNIX_SOCKETS:
- def test_UnixStreamServer(self):
- self.run_server(SocketServer.UnixStreamServer,
+ @requires_forking
+ def test_ForkingTCPServer(self):
+ with simple_subprocess(self):
+ self.run_server(SocketServer.ForkingTCPServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
- def test_ThreadingUnixStreamServer(self):
- self.run_server(SocketServer.ThreadingUnixStreamServer,
+ @requires_unix_sockets
+ def test_UnixStreamServer(self):
+ self.run_server(SocketServer.UnixStreamServer,
+ SocketServer.StreamRequestHandler,
+ self.stream_examine)
+
+ @requires_unix_sockets
+ def test_ThreadingUnixStreamServer(self):
+ self.run_server(SocketServer.ThreadingUnixStreamServer,
+ SocketServer.StreamRequestHandler,
+ self.stream_examine)
+
+ @requires_unix_sockets
+ @requires_forking
+ def test_ForkingUnixStreamServer(self):
+ with simple_subprocess(self):
+ self.run_server(ForkingUnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
- if HAVE_FORKING:
- def test_ForkingUnixStreamServer(self):
- with simple_subprocess(self):
- self.run_server(ForkingUnixStreamServer,
- SocketServer.StreamRequestHandler,
- self.stream_examine)
-
def test_UDPServer(self):
self.run_server(SocketServer.UDPServer,
SocketServer.DatagramRequestHandler,
@@ -223,12 +228,12 @@
SocketServer.DatagramRequestHandler,
self.dgram_examine)
- if HAVE_FORKING:
- def test_ForkingUDPServer(self):
- with simple_subprocess(self):
- self.run_server(SocketServer.ForkingUDPServer,
- SocketServer.DatagramRequestHandler,
- self.dgram_examine)
+ @requires_forking
+ def test_ForkingUDPServer(self):
+ with simple_subprocess(self):
+ self.run_server(SocketServer.ForkingUDPServer,
+ SocketServer.DatagramRequestHandler,
+ self.dgram_examine)
@contextlib.contextmanager
def mocked_select_module(self):
@@ -265,22 +270,24 @@
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
- # if HAVE_UNIX_SOCKETS:
- # def test_UnixDatagramServer(self):
- # self.run_server(SocketServer.UnixDatagramServer,
- # SocketServer.DatagramRequestHandler,
- # self.dgram_examine)
+ # @requires_unix_sockets
+ # def test_UnixDatagramServer(self):
+ # self.run_server(SocketServer.UnixDatagramServer,
+ # SocketServer.DatagramRequestHandler,
+ # self.dgram_examine)
#
- # def test_ThreadingUnixDatagramServer(self):
- # self.run_server(SocketServer.ThreadingUnixDatagramServer,
- # SocketServer.DatagramRequestHandler,
- # self.dgram_examine)
+ # @requires_unix_sockets
+ # def test_ThreadingUnixDatagramServer(self):
+ # self.run_server(SocketServer.ThreadingUnixDatagramServer,
+ # SocketServer.DatagramRequestHandler,
+ # self.dgram_examine)
#
- # if HAVE_FORKING:
- # def test_ForkingUnixDatagramServer(self):
- # self.run_server(SocketServer.ForkingUnixDatagramServer,
- # SocketServer.DatagramRequestHandler,
- # self.dgram_examine)
+ # @requires_unix_sockets
+ # @requires_forking
+ # def test_ForkingUnixDatagramServer(self):
+ # self.run_server(SocketServer.ForkingUnixDatagramServer,
+ # SocketServer.DatagramRequestHandler,
+ # self.dgram_examine)
@reap_threads
def test_shutdown(self):
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py
--- a/Lib/test/test_sys.py
+++ b/Lib/test/test_sys.py
@@ -266,15 +266,16 @@
# still has 5 elements
maj, min, buildno, plat, csd = sys.getwindowsversion()
+ @unittest.skipUnless(hasattr(sys, "setdlopenflags"),
+ 'test needs sys.setdlopenflags()')
def test_dlopenflags(self):
- if hasattr(sys, "setdlopenflags"):
- self.assertTrue(hasattr(sys, "getdlopenflags"))
- self.assertRaises(TypeError, sys.getdlopenflags, 42)
- oldflags = sys.getdlopenflags()
- self.assertRaises(TypeError, sys.setdlopenflags)
- sys.setdlopenflags(oldflags+1)
- self.assertEqual(sys.getdlopenflags(), oldflags+1)
- sys.setdlopenflags(oldflags)
+ self.assertTrue(hasattr(sys, "getdlopenflags"))
+ self.assertRaises(TypeError, sys.getdlopenflags, 42)
+ oldflags = sys.getdlopenflags()
+ self.assertRaises(TypeError, sys.setdlopenflags)
+ sys.setdlopenflags(oldflags+1)
+ self.assertEqual(sys.getdlopenflags(), oldflags+1)
+ sys.setdlopenflags(oldflags)
def test_refcount(self):
# n here must be a global in order for this test to pass while
diff --git a/Lib/test/test_warnings.py b/Lib/test/test_warnings.py
--- a/Lib/test/test_warnings.py
+++ b/Lib/test/test_warnings.py
@@ -259,11 +259,10 @@
finally:
warning_tests.__file__ = filename
+ @unittest.skipUnless(hasattr(sys, 'argv'), 'test needs sys.argv')
def test_missing_filename_main_with_argv(self):
# If __file__ is not specified and the caller is __main__ and sys.argv
# exists, then use sys.argv[0] as the file.
- if not hasattr(sys, 'argv'):
- return
filename = warning_tests.__file__
module_name = warning_tests.__name__
try:
diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py
--- a/Lib/test/test_zlib.py
+++ b/Lib/test/test_zlib.py
@@ -12,6 +12,13 @@
zlib = import_module('zlib')
+requires_Compress_copy = unittest.skipUnless(
+ hasattr(zlib.compressobj(), "copy"),
+ 'requires Compress.copy()')
+requires_Decompress_copy = unittest.skipUnless(
+ hasattr(zlib.decompressobj(), "copy"),
+ 'requires Decompress.copy()')
+
class ChecksumTestCase(unittest.TestCase):
# checksum test cases
@@ -339,39 +346,39 @@
"mode=%i, level=%i") % (sync, level))
del obj
+ @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
+ 'requires zlib.Z_SYNC_FLUSH')
def test_odd_flush(self):
# Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
import random
+ # Testing on 17K of "random" data
- if hasattr(zlib, 'Z_SYNC_FLUSH'):
- # Testing on 17K of "random" data
+ # Create compressor and decompressor objects
+ co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
+ dco = zlib.decompressobj()
- # Create compressor and decompressor objects
- co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
- dco = zlib.decompressobj()
+ # Try 17K of data
+ # generate random data stream
+ try:
+ # In 2.3 and later, WichmannHill is the RNG of the bug report
+ gen = random.WichmannHill()
+ except AttributeError:
+ try:
+ # 2.2 called it Random
+ gen = random.Random()
+ except AttributeError:
+ # others might simply have a single RNG
+ gen = random
+ gen.seed(1)
+ data = genblock(1, 17 * 1024, generator=gen)
- # Try 17K of data
- # generate random data stream
- try:
- # In 2.3 and later, WichmannHill is the RNG of the bug report
- gen = random.WichmannHill()
- except AttributeError:
- try:
- # 2.2 called it Random
- gen = random.Random()
- except AttributeError:
- # others might simply have a single RNG
- gen = random
- gen.seed(1)
- data = genblock(1, 17 * 1024, generator=gen)
+ # compress, sync-flush, and decompress
+ first = co.compress(data)
+ second = co.flush(zlib.Z_SYNC_FLUSH)
+ expanded = dco.decompress(first + second)
- # compress, sync-flush, and decompress
- first = co.compress(data)
- second = co.flush(zlib.Z_SYNC_FLUSH)
- expanded = dco.decompress(first + second)
-
- # if decompressed data is different from the input data, choke.
- self.assertEqual(expanded, data, "17K random source doesn't match")
+ # if decompressed data is different from the input data, choke.
+ self.assertEqual(expanded, data, "17K random source doesn't match")
def test_empty_flush(self):
# Test that calling .flush() on unused objects works.
@@ -408,35 +415,36 @@
data = zlib.compress(input2)
self.assertEqual(dco.flush(), input1[1:])
- if hasattr(zlib.compressobj(), "copy"):
- def test_compresscopy(self):
- # Test copying a compression object
- data0 = HAMLET_SCENE
- data1 = HAMLET_SCENE.swapcase()
- c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
- bufs0 = []
- bufs0.append(c0.compress(data0))
+ @requires_Compress_copy
+ def test_compresscopy(self):
+ # Test copying a compression object
+ data0 = HAMLET_SCENE
+ data1 = HAMLET_SCENE.swapcase()
+ c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
+ bufs0 = []
+ bufs0.append(c0.compress(data0))
- c1 = c0.copy()
- bufs1 = bufs0[:]
+ c1 = c0.copy()
+ bufs1 = bufs0[:]
- bufs0.append(c0.compress(data0))
- bufs0.append(c0.flush())
- s0 = ''.join(bufs0)
+ bufs0.append(c0.compress(data0))
+ bufs0.append(c0.flush())
+ s0 = ''.join(bufs0)
- bufs1.append(c1.compress(data1))
- bufs1.append(c1.flush())
- s1 = ''.join(bufs1)
+ bufs1.append(c1.compress(data1))
+ bufs1.append(c1.flush())
+ s1 = ''.join(bufs1)
- self.assertEqual(zlib.decompress(s0),data0+data0)
- self.assertEqual(zlib.decompress(s1),data0+data1)
+ self.assertEqual(zlib.decompress(s0),data0+data0)
+ self.assertEqual(zlib.decompress(s1),data0+data1)
- def test_badcompresscopy(self):
- # Test copying a compression object in an inconsistent state
- c = zlib.compressobj()
- c.compress(HAMLET_SCENE)
- c.flush()
- self.assertRaises(ValueError, c.copy)
+ @requires_Compress_copy
+ def test_badcompresscopy(self):
+ # Test copying a compression object in an inconsistent state
+ c = zlib.compressobj()
+ c.compress(HAMLET_SCENE)
+ c.flush()
+ self.assertRaises(ValueError, c.copy)
def test_decompress_unused_data(self):
# Repeated calls to decompress() after EOF should accumulate data in
@@ -463,35 +471,36 @@
self.assertEqual(dco.unconsumed_tail, b'')
self.assertEqual(dco.unused_data, remainder)
- if hasattr(zlib.decompressobj(), "copy"):
- def test_decompresscopy(self):
- # Test copying a decompression object
- data = HAMLET_SCENE
- comp = zlib.compress(data)
+ @requires_Decompress_copy
+ def test_decompresscopy(self):
+ # Test copying a decompression object
+ data = HAMLET_SCENE
+ comp = zlib.compress(data)
- d0 = zlib.decompressobj()
- bufs0 = []
- bufs0.append(d0.decompress(comp[:32]))
+ d0 = zlib.decompressobj()
+ bufs0 = []
+ bufs0.append(d0.decompress(comp[:32]))
- d1 = d0.copy()
- bufs1 = bufs0[:]
+ d1 = d0.copy()
+ bufs1 = bufs0[:]
- bufs0.append(d0.decompress(comp[32:]))
- s0 = ''.join(bufs0)
+ bufs0.append(d0.decompress(comp[32:]))
+ s0 = ''.join(bufs0)
- bufs1.append(d1.decompress(comp[32:]))
- s1 = ''.join(bufs1)
+ bufs1.append(d1.decompress(comp[32:]))
+ s1 = ''.join(bufs1)
- self.assertEqual(s0,s1)
- self.assertEqual(s0,data)
+ self.assertEqual(s0,s1)
+ self.assertEqual(s0,data)
- def test_baddecompresscopy(self):
- # Test copying a compression object in an inconsistent state
- data = zlib.compress(HAMLET_SCENE)
- d = zlib.decompressobj()
- d.decompress(data)
- d.flush()
- self.assertRaises(ValueError, d.copy)
+ @requires_Decompress_copy
+ def test_baddecompresscopy(self):
+ # Test copying a compression object in an inconsistent state
+ data = zlib.compress(HAMLET_SCENE)
+ d = zlib.decompressobj()
+ d.decompress(data)
+ d.flush()
+ self.assertRaises(ValueError, d.copy)
# Memory use of the following functions takes into account overallocation
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -23,6 +23,8 @@
Tests
-----
+- Issue #18702: All skipped tests now reported as skipped.
+
- Issue #19085: Added basic tests for all tkinter widget options.
--
Repository URL: http://hg.python.org/cpython
More information about the Python-checkins
mailing list