[Jython-checkins] jython: Update test_mailbox and test_support for latest 2.7.
frank.wierzbicki
jython-checkins at python.org
Mon Mar 11 18:23:54 CET 2013
http://hg.python.org/jython/rev/6f0976c35eaa
changeset: 7082:6f0976c35eaa
user: Frank Wierzbicki <fwierzbicki at gmail.com>
date: Mon Mar 11 10:02:12 2013 -0700
summary:
Update test_mailbox and test_support for latest 2.7.
files:
Lib/test/test_mailbox.py | 208 ++++++++++++++++++++------
Lib/test/test_support.py | 158 +++++++++++++++++++-
2 files changed, 308 insertions(+), 58 deletions(-)
diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -8,6 +8,7 @@
import re
import shutil
import StringIO
+import tempfile
from test import test_support
import unittest
import mailbox
@@ -20,7 +21,7 @@
# Silence Py3k warning
rfc822 = test_support.import_module('rfc822', deprecated=True)
-class TestBase(unittest.TestCase):
+class TestBase:
def _check_sample(self, msg):
# Inspect a mailbox.Message representation of the sample message
@@ -39,15 +40,15 @@
def _delete_recursively(self, target):
# Delete a file or delete a directory recursively
if os.path.isdir(target):
- shutil.rmtree(target)
+ test_support.rmtree(target)
elif os.path.exists(target):
- os.remove(target)
+ test_support.unlink(target)
class TestMailbox(TestBase):
_factory = None # Overridden by subclasses to reuse tests
- _template = 'From: foo\n\n%s'
+ _template = 'From: foo\n\n%s\n'
def setUp(self):
self._path = test_support.TESTFN
@@ -75,6 +76,18 @@
for i in (1, 2, 3, 4):
self._check_sample(self._box[keys[i]])
+ def test_add_file(self):
+ with tempfile.TemporaryFile('w+') as f:
+ f.write(_sample_message)
+ f.seek(0)
+ key = self._box.add(f)
+ self.assertEqual(self._box.get_string(key).split('\n'),
+ _sample_message.split('\n'))
+
+ def test_add_StringIO(self):
+ key = self._box.add(StringIO.StringIO(self._template % "0"))
+ self.assertEqual(self._box.get_string(key), self._template % "0")
+
def test_remove(self):
# Remove messages using remove()
self._test_remove_or_delitem(self._box.remove)
@@ -124,7 +137,7 @@
key0 = self._box.add(self._template % 0)
msg = self._box.get(key0)
self.assertEqual(msg['from'], 'foo')
- self.assertEqual(msg.get_payload(), '0')
+ self.assertEqual(msg.get_payload(), '0\n')
self.assertIs(self._box.get('foo'), None)
self.assertFalse(self._box.get('foo', False))
self._box.close()
@@ -132,14 +145,15 @@
key1 = self._box.add(self._template % 1)
msg = self._box.get(key1)
self.assertEqual(msg['from'], 'foo')
- self.assertEqual(msg.fp.read(), '1')
+ self.assertEqual(msg.fp.read(), '1' + os.linesep)
+ msg.fp.close()
def test_getitem(self):
# Retrieve message using __getitem__()
key0 = self._box.add(self._template % 0)
msg = self._box[key0]
self.assertEqual(msg['from'], 'foo')
- self.assertEqual(msg.get_payload(), '0')
+ self.assertEqual(msg.get_payload(), '0\n')
self.assertRaises(KeyError, lambda: self._box['foo'])
self._box.discard(key0)
self.assertRaises(KeyError, lambda: self._box[key0])
@@ -151,7 +165,7 @@
msg0 = self._box.get_message(key0)
self.assertIsInstance(msg0, mailbox.Message)
self.assertEqual(msg0['from'], 'foo')
- self.assertEqual(msg0.get_payload(), '0')
+ self.assertEqual(msg0.get_payload(), '0\n')
self._check_sample(self._box.get_message(key1))
def test_get_string(self):
@@ -165,10 +179,14 @@
# Get file representations of messages
key0 = self._box.add(self._template % 0)
key1 = self._box.add(_sample_message)
- self.assertEqual(self._box.get_file(key0).read().replace(os.linesep, '\n'),
+ msg0 = self._box.get_file(key0)
+ self.assertEqual(msg0.read().replace(os.linesep, '\n'),
self._template % 0)
- self.assertEqual(self._box.get_file(key1).read().replace(os.linesep, '\n'),
+ msg1 = self._box.get_file(key1)
+ self.assertEqual(msg1.read().replace(os.linesep, '\n'),
_sample_message)
+ msg0.close()
+ msg1.close()
def test_get_file_can_be_closed_twice(self):
# Issue 11700
@@ -320,15 +338,15 @@
self.assertIn(key0, self._box)
key1 = self._box.add(self._template % 1)
self.assertIn(key1, self._box)
- self.assertEqual(self._box.pop(key0).get_payload(), '0')
+ self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
self.assertNotIn(key0, self._box)
self.assertIn(key1, self._box)
key2 = self._box.add(self._template % 2)
self.assertIn(key2, self._box)
- self.assertEqual(self._box.pop(key2).get_payload(), '2')
+ self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
self.assertNotIn(key2, self._box)
self.assertIn(key1, self._box)
- self.assertEqual(self._box.pop(key1).get_payload(), '1')
+ self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
self.assertNotIn(key1, self._box)
self.assertEqual(len(self._box), 0)
@@ -386,6 +404,17 @@
# Write changes to disk
self._test_flush_or_close(self._box.flush, True)
+ def test_popitem_and_flush_twice(self):
+ # See #15036.
+ self._box.add(self._template % 0)
+ self._box.add(self._template % 1)
+ self._box.flush()
+
+ self._box.popitem()
+ self._box.flush()
+ self._box.popitem()
+ self._box.flush()
+
def test_lock_unlock(self):
# Lock and unlock the mailbox
self.assertFalse(os.path.exists(self._get_lock_path()))
@@ -403,6 +432,7 @@
self._box.add(contents[0])
self._box.add(contents[1])
self._box.add(contents[2])
+ oldbox = self._box
method()
if should_call_close:
self._box.close()
@@ -411,6 +441,7 @@
self.assertEqual(len(keys), 3)
for key in keys:
self.assertIn(self._box.get_string(key), contents)
+ oldbox.close()
def test_dump_message(self):
# Write message representations to disk
@@ -429,7 +460,7 @@
return self._path + '.lock'
-class TestMailboxSuperclass(TestBase):
+class TestMailboxSuperclass(TestBase, unittest.TestCase):
def test_notimplemented(self):
# Test that all Mailbox methods raise NotImplementedException.
@@ -464,7 +495,7 @@
self.assertRaises(NotImplementedError, lambda: box.close())
-class TestMaildir(TestMailbox):
+class TestMaildir(TestMailbox, unittest.TestCase):
_factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
@@ -506,7 +537,7 @@
msg_returned = self._box.get_message(key)
self.assertEqual(msg_returned.get_subdir(), 'new')
self.assertEqual(msg_returned.get_flags(), '')
- self.assertEqual(msg_returned.get_payload(), '1')
+ self.assertEqual(msg_returned.get_payload(), '1\n')
msg2 = mailbox.MaildirMessage(self._template % 2)
msg2.set_info('2,S')
self._box[key] = msg2
@@ -514,7 +545,7 @@
msg_returned = self._box.get_message(key)
self.assertEqual(msg_returned.get_subdir(), 'new')
self.assertEqual(msg_returned.get_flags(), 'S')
- self.assertEqual(msg_returned.get_payload(), '3')
+ self.assertEqual(msg_returned.get_payload(), '3\n')
def test_consistent_factory(self):
# Add a message.
@@ -636,13 +667,13 @@
self.assertTrue(match is not None, "Invalid file name: '%s'" % tail)
groups = match.groups()
if previous_groups is not None:
- self.assertTrue(int(groups[0] >= previous_groups[0]),
+ self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]),
"Non-monotonic seconds: '%s' before '%s'" %
(previous_groups[0], groups[0]))
- self.assertTrue(int(groups[1] >= previous_groups[1]) or
- groups[0] != groups[1],
- "Non-monotonic milliseconds: '%s' before '%s'" %
- (previous_groups[1], groups[1]))
+ if int(groups[0]) == int(previous_groups[0]):
+ self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]),
+ "Non-monotonic milliseconds: '%s' before '%s'" %
+ (previous_groups[1], groups[1]))
self.assertTrue(int(groups[2]) == pid,
"Process ID mismatch: '%s' should be '%s'" %
(groups[2], pid))
@@ -813,7 +844,49 @@
self._box._refresh()
self.assertTrue(refreshed())
-class _TestMboxMMDF(TestMailbox):
+
+class _TestSingleFile(TestMailbox):
+ '''Common tests for single-file mailboxes'''
+
+ def test_add_doesnt_rewrite(self):
+ # When only adding messages, flush() should not rewrite the
+ # mailbox file. See issue #9559.
+
+ # Inode number changes if the contents are written to another
+ # file which is then renamed over the original file. So we
+ # must check that the inode number doesn't change.
+ inode_before = os.stat(self._path).st_ino
+
+ self._box.add(self._template % 0)
+ self._box.flush()
+
+ inode_after = os.stat(self._path).st_ino
+ self.assertEqual(inode_before, inode_after)
+
+ # Make sure the message was really added
+ self._box.close()
+ self._box = self._factory(self._path)
+ self.assertEqual(len(self._box), 1)
+
+ def test_permissions_after_flush(self):
+ # See issue #5346
+
+ # Make the mailbox world writable. It's unlikely that the new
+ # mailbox file would have these permissions after flush(),
+ # because umask usually prevents it.
+ mode = os.stat(self._path).st_mode | 0o666
+ os.chmod(self._path, mode)
+
+ self._box.add(self._template % 0)
+ i = self._box.add(self._template % 1)
+ # Need to remove one message to make flush() create a new file
+ self._box.remove(i)
+ self._box.flush()
+
+ self.assertEqual(os.stat(self._path).st_mode, mode)
+
+
+class _TestMboxMMDF(_TestSingleFile):
def tearDown(self):
self._box.close()
@@ -823,14 +896,14 @@
def test_add_from_string(self):
# Add a string starting with 'From ' to the mailbox
- key = self._box.add('From foo at bar blah\nFrom: foo\n\n0')
+ key = self._box.add('From foo at bar blah\nFrom: foo\n\n0\n')
self.assertEqual(self._box[key].get_from(), 'foo at bar blah')
- self.assertEqual(self._box[key].get_payload(), '0')
+ self.assertEqual(self._box[key].get_payload(), '0\n')
def test_add_mbox_or_mmdf_message(self):
# Add an mboxMessage or MMDFMessage
for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
- msg = class_('From foo at bar blah\nFrom: foo\n\n0')
+ msg = class_('From foo at bar blah\nFrom: foo\n\n0\n')
key = self._box.add(msg)
def test_open_close_open(self):
@@ -914,7 +987,7 @@
self._box.close()
-class TestMbox(_TestMboxMMDF):
+class TestMbox(_TestMboxMMDF, unittest.TestCase):
_factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
@@ -937,12 +1010,35 @@
perms = st.st_mode
self.assertFalse((perms & 0111)) # Execute bits should all be off.
-class TestMMDF(_TestMboxMMDF):
+ def test_terminating_newline(self):
+ message = email.message.Message()
+ message['From'] = 'john at example.com'
+ message.set_payload('No newline at the end')
+ i = self._box.add(message)
+
+ # A newline should have been appended to the payload
+ message = self._box.get(i)
+ self.assertEqual(message.get_payload(), 'No newline at the end\n')
+
+ def test_message_separator(self):
+ # Check there's always a single blank line after each message
+ self._box.add('From: foo\n\n0') # No newline at the end
+ with open(self._path) as f:
+ data = f.read()
+ self.assertEqual(data[-3:], '0\n\n')
+
+ self._box.add('From: foo\n\n0\n') # Newline at the end
+ with open(self._path) as f:
+ data = f.read()
+ self.assertEqual(data[-3:], '0\n\n')
+
+
+class TestMMDF(_TestMboxMMDF, unittest.TestCase):
_factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
-class TestMH(TestMailbox):
+class TestMH(TestMailbox, unittest.TestCase):
_factory = lambda self, path, factory=None: mailbox.MH(path, factory)
@@ -1074,7 +1170,7 @@
return os.path.join(self._path, '.mh_sequences.lock')
-class TestBabyl(TestMailbox):
+class TestBabyl(_TestSingleFile, unittest.TestCase):
_factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
@@ -1103,7 +1199,7 @@
self.assertEqual(set(self._box.get_labels()), set(['blah']))
-class TestMessage(TestBase):
+class TestMessage(TestBase, unittest.TestCase):
_factory = mailbox.Message # Overridden by subclasses to reuse tests
@@ -1174,7 +1270,7 @@
pass
-class TestMaildirMessage(TestMessage):
+class TestMaildirMessage(TestMessage, unittest.TestCase):
_factory = mailbox.MaildirMessage
@@ -1249,7 +1345,7 @@
self._check_sample(msg)
-class _TestMboxMMDFMessage(TestMessage):
+class _TestMboxMMDFMessage:
_factory = mailbox._mboxMMDFMessage
@@ -1296,12 +1392,12 @@
r"\d{2} \d{4}", msg.get_from()))
-class TestMboxMessage(_TestMboxMMDFMessage):
+class TestMboxMessage(_TestMboxMMDFMessage, TestMessage):
_factory = mailbox.mboxMessage
-class TestMHMessage(TestMessage):
+class TestMHMessage(TestMessage, unittest.TestCase):
_factory = mailbox.MHMessage
@@ -1332,7 +1428,7 @@
self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
-class TestBabylMessage(TestMessage):
+class TestBabylMessage(TestMessage, unittest.TestCase):
_factory = mailbox.BabylMessage
@@ -1387,12 +1483,12 @@
self.assertEqual(visible[header], msg[header])
-class TestMMDFMessage(_TestMboxMMDFMessage):
+class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage):
_factory = mailbox.MMDFMessage
-class TestMessageConversion(TestBase):
+class TestMessageConversion(TestBase, unittest.TestCase):
def test_plain_to_x(self):
# Convert Message to all formats
@@ -1715,7 +1811,7 @@
proxy.close()
-class TestProxyFile(TestProxyFileBase):
+class TestProxyFile(TestProxyFileBase, unittest.TestCase):
def setUp(self):
self._path = test_support.TESTFN
@@ -1764,7 +1860,7 @@
self._test_close(mailbox._ProxyFile(self._file))
-class TestPartialFile(TestProxyFileBase):
+class TestPartialFile(TestProxyFileBase, unittest.TestCase):
def setUp(self):
self._path = test_support.TESTFN
@@ -1831,6 +1927,10 @@
def setUp(self):
# create a new maildir mailbox to work with:
self._dir = test_support.TESTFN
+ if os.path.isdir(self._dir):
+ test_support.rmtree(self._dir)
+ if os.path.isfile(self._dir):
+ test_support.unlink(self._dir)
os.mkdir(self._dir)
os.mkdir(os.path.join(self._dir, "cur"))
os.mkdir(os.path.join(self._dir, "tmp"))
@@ -1840,10 +1940,10 @@
def tearDown(self):
map(os.unlink, self._msgfiles)
- os.rmdir(os.path.join(self._dir, "cur"))
- os.rmdir(os.path.join(self._dir, "tmp"))
- os.rmdir(os.path.join(self._dir, "new"))
- os.rmdir(self._dir)
+ test_support.rmdir(os.path.join(self._dir, "cur"))
+ test_support.rmdir(os.path.join(self._dir, "tmp"))
+ test_support.rmdir(os.path.join(self._dir, "new"))
+ test_support.rmdir(self._dir)
def createMessage(self, dir, mbox=False):
t = int(time.time() % 1000000)
@@ -1879,7 +1979,9 @@
self.createMessage("cur")
self.mbox = mailbox.Maildir(test_support.TESTFN)
#self.assertTrue(len(self.mbox.boxes) == 1)
- self.assertIsNot(self.mbox.next(), None)
+ msg = self.mbox.next()
+ self.assertIsNot(msg, None)
+ msg.fp.close()
self.assertIs(self.mbox.next(), None)
self.assertIs(self.mbox.next(), None)
@@ -1887,7 +1989,9 @@
self.createMessage("new")
self.mbox = mailbox.Maildir(test_support.TESTFN)
#self.assertTrue(len(self.mbox.boxes) == 1)
- self.assertIsNot(self.mbox.next(), None)
+ msg = self.mbox.next()
+ self.assertIsNot(msg, None)
+ msg.fp.close()
self.assertIs(self.mbox.next(), None)
self.assertIs(self.mbox.next(), None)
@@ -1896,8 +2000,12 @@
self.createMessage("new")
self.mbox = mailbox.Maildir(test_support.TESTFN)
#self.assertTrue(len(self.mbox.boxes) == 2)
- self.assertIsNot(self.mbox.next(), None)
- self.assertIsNot(self.mbox.next(), None)
+ msg = self.mbox.next()
+ self.assertIsNot(msg, None)
+ msg.fp.close()
+ msg = self.mbox.next()
+ self.assertIsNot(msg, None)
+ msg.fp.close()
self.assertIs(self.mbox.next(), None)
self.assertIs(self.mbox.next(), None)
@@ -1906,11 +2014,13 @@
import email.parser
fname = self.createMessage("cur", True)
n = 0
- for msg in mailbox.PortableUnixMailbox(open(fname),
+ fid = open(fname)
+ for msg in mailbox.PortableUnixMailbox(fid,
email.parser.Parser().parse):
n += 1
self.assertEqual(msg["subject"], "Simple Test")
self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE))
+ fid.close()
self.assertEqual(n, 1)
## End: classes from the original module (for backward compatibility).
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -18,6 +18,14 @@
import UserDict
import re
import time
+import struct
+import sysconfig
+
+try:
+ import _testcapi
+except ImportError:
+ _testcapi = None
+
try:
import thread
except ImportError:
@@ -181,15 +189,79 @@
except KeyError:
pass
+if sys.platform.startswith("win"):
+ def _waitfor(func, pathname, waitall=False):
+ # Peform the operation
+ func(pathname)
+ # Now setup the wait loop
+ if waitall:
+ dirname = pathname
+ else:
+ dirname, name = os.path.split(pathname)
+ dirname = dirname or '.'
+ # Check for `pathname` to be removed from the filesystem.
+ # The exponential backoff of the timeout amounts to a total
+ # of ~1 second after which the deletion is probably an error
+ # anyway.
+ # Testing on a i7 at 4.3GHz shows that usually only 1 iteration is
+ # required when contention occurs.
+ timeout = 0.001
+ while timeout < 1.0:
+ # Note we are only testing for the existance of the file(s) in
+ # the contents of the directory regardless of any security or
+ # access rights. If we have made it this far, we have sufficient
+ # permissions to do that much using Python's equivalent of the
+ # Windows API FindFirstFile.
+ # Other Windows APIs can fail or give incorrect results when
+ # dealing with files that are pending deletion.
+ L = os.listdir(dirname)
+ if not (L if waitall else name in L):
+ return
+ # Increase the timeout and try again
+ time.sleep(timeout)
+ timeout *= 2
+ warnings.warn('tests may fail, delete still pending for ' + pathname,
+ RuntimeWarning, stacklevel=4)
+
+ def _unlink(filename):
+ _waitfor(os.unlink, filename)
+
+ def _rmdir(dirname):
+ _waitfor(os.rmdir, dirname)
+
+ def _rmtree(path):
+ def _rmtree_inner(path):
+ for name in os.listdir(path):
+ fullname = os.path.join(path, name)
+ if os.path.isdir(fullname):
+ _waitfor(_rmtree_inner, fullname, waitall=True)
+ os.rmdir(fullname)
+ else:
+ os.unlink(fullname)
+ _waitfor(_rmtree_inner, path, waitall=True)
+ _waitfor(os.rmdir, path)
+else:
+ _unlink = os.unlink
+ _rmdir = os.rmdir
+ _rmtree = shutil.rmtree
+
def unlink(filename):
try:
- os.unlink(filename)
+ _unlink(filename)
except OSError:
pass
+def rmdir(dirname):
+ try:
+ _rmdir(dirname)
+ except OSError as error:
+ # The directory need not exist.
+ if error.errno != errno.ENOENT:
+ raise
+
def rmtree(path):
try:
- shutil.rmtree(path)
+ _rmtree(path)
except OSError, e:
# Unix returns ENOENT, Windows returns ESRCH.
if e.errno not in (errno.ENOENT, errno.ESRCH):
@@ -465,7 +537,7 @@
the CWD, an error is raised. If it's True, only a warning is raised
and the original CWD is used.
"""
- if isinstance(name, unicode):
+ if have_unicode and isinstance(name, unicode):
try:
name = name.encode(sys.getfilesystemencoding() or 'ascii')
except UnicodeEncodeError:
@@ -813,6 +885,9 @@
('EAI_FAIL', -4),
('EAI_NONAME', -2),
('EAI_NODATA', -5),
+ # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
+ # implementation actually returns WSANO_DATA i.e. 11004.
+ ('WSANO_DATA', 11004),
]
denied = ResourceDenied("Resource '%s' is not available" % resource_name)
@@ -886,6 +961,33 @@
return captured_output("stdin")
+_header = '2P'
+if hasattr(sys, "gettotalrefcount"):
+ _header = '2P' + _header
+_vheader = _header + 'P'
+
+def calcobjsize(fmt):
+ return struct.calcsize(_header + fmt + '0P')
+
+def calcvobjsize(fmt):
+ return struct.calcsize(_vheader + fmt + '0P')
+
+
+_TPFLAGS_HAVE_GC = 1<<14
+_TPFLAGS_HEAPTYPE = 1<<9
+
+def check_sizeof(test, o, size):
+ result = sys.getsizeof(o)
+ # add GC header size
+ if (_testcapi and\
+ (type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
+ ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
+ size += _testcapi.SIZEOF_PYGC_HEAD
+ msg = 'wrong size for %s: got %d, expected %d' \
+ % (type(o), result, size)
+ test.assertEqual(result, size, msg)
+
+
#=======================================================================
# Decorator for running a function in a different locale, correctly resetting
# it afterwards.
@@ -994,7 +1096,7 @@
return wrapper
return decorator
-def precisionbigmemtest(size, memuse, overhead=5*_1M):
+def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
def decorator(f):
def wrapper(self):
if not real_max_memuse:
@@ -1002,11 +1104,12 @@
else:
maxsize = size
- if real_max_memuse and real_max_memuse < maxsize * memuse:
- if verbose:
- sys.stderr.write("Skipping %s because of memory "
- "constraint\n" % (f.__name__,))
- return
+ if ((real_max_memuse or not dry_run)
+ and real_max_memuse < maxsize * memuse):
+ if verbose:
+ sys.stderr.write("Skipping %s because of memory "
+ "constraint\n" % (f.__name__,))
+ return
return f(self, maxsize)
wrapper.size = size
@@ -1144,6 +1247,16 @@
suite.addTest(unittest.makeSuite(cls))
_run_suite(suite)
+#=======================================================================
+# Check for the presence of docstrings.
+
+HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
+ sys.platform == 'win32' or
+ sysconfig.get_config_var('WITH_DOC_STRINGS'))
+
+requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
+ "test requires docstrings")
+
#=======================================================================
# doctest driver.
@@ -1231,6 +1344,33 @@
except:
break
+ at contextlib.contextmanager
+def swap_attr(obj, attr, new_val):
+ """Temporary swap out an attribute with a new object.
+
+ Usage:
+ with swap_attr(obj, "attr", 5):
+ ...
+
+ This will set obj.attr to 5 for the duration of the with: block,
+ restoring the old value at the end of the block. If `attr` doesn't
+ exist on `obj`, it will be created and then deleted at the end of the
+ block.
+ """
+ if hasattr(obj, attr):
+ real_val = getattr(obj, attr)
+ setattr(obj, attr, new_val)
+ try:
+ yield
+ finally:
+ setattr(obj, attr, real_val)
+ else:
+ setattr(obj, attr, new_val)
+ try:
+ yield
+ finally:
+ delattr(obj, attr)
+
def py3k_bytes(b):
"""Emulate the py3k bytes() constructor.
--
Repository URL: http://hg.python.org/jython
More information about the Jython-checkins
mailing list