[issue1599254] mailbox: other programs' messages can vanish without trace

David Watson report at bugs.python.org
Sun Mar 23 23:57:20 CET 2014


David Watson added the comment:

On Tue 18 Mar 2014, A.M. Kuchling wrote:
> I suggest we apply the fix for #1, and for #2 just discard and
> update the ToC when we lock the mailbox, ignoring other
> possible routes to corruption (so no detecting the problem and
> raising an ExternalClash exception).  Since 2007 the docs have
> said "If you’re modifying a mailbox, you must lock it by
> calling the lock() and unlock() methods before reading any
> messages in the file or making any changes".

Well, the warning you're referring to begins "Be very cautious
when modifying mailboxes that might be simultaneously changed by
some other process. The safest mailbox format to use for such
tasks is Maildir...", suggesting that it only applies to
mailboxes that might in fact be modified by another process.

But if a reread is forced *simply by discarding the ToC*, then
the application's own keys become invalid if it has,
e.g. previously deleted a message, even if it's a private mailbox
that no other process ever touches.  So such a change would make
the warning apply to such mailboxes, whereas previously it (in
effect) did not.

(That does raise the question of why the application is calling
.lock() at all on a mailbox that won't be modified by another
process, but if the programmer thought the warning didn't apply
to their situation, then they presumably wouldn't think that
calling .lock() after modifying might actually be dangerous - and
it currently isn't dangerous for such a mailbox.)

Anyway, I've rebased the mailbox-unified2 patch onto 2.7,
renaming it as mailbox-copy-back-2.7.diff; this fixes the
original (renaming and fcntl locking) issue and adds some
warnings about locking.  I've combined all the tests (I think)
into a separate patch, and ported them to the multiprocessing
module (possibly unwise as I'm not very familiar with it - it
would be nice if someone could test it on Windows).  I haven't
looked at the tests closely again, but they appear to check for
many of the ToC issues.

There actually isn't a test for the original locking issue, as
the tests all use the mailbox API, which doesn't provide a way to
turn off dot-locking.  Also, the module no longer rewrites the
mailbox if messages have only been appended to it - it just syncs
it instead.  However, I have verified by hand that the problem is
still there when the mailbox *is* rewritten due to deletions,
etc.

----------
Added file: http://bugs.python.org/file34596/mailbox-copy-back-2.7.diff
Added file: http://bugs.python.org/file34597/mailbox-all-tests-2.7.diff

_______________________________________
Python tracker <report at bugs.python.org>
<http://bugs.python.org/issue1599254>
_______________________________________
-------------- next part --------------
# HG changeset patch
# Parent be1e015a84051595ee41f8d788ba0fa243294039
Rewrite single-file mailboxes by copying, add some warnings about locking.

diff --git a/Lib/mailbox.py b/Lib/mailbox.py
--- a/Lib/mailbox.py
+++ b/Lib/mailbox.py
@@ -17,6 +17,8 @@ import email
 import email.message
 import email.generator
 import StringIO
+import shutil
+import warnings
 try:
     if sys.platform == 'os2emx':
         # OS/2 EMX fcntl() not adequate
@@ -630,6 +632,10 @@ class _singlefileMailbox(Mailbox):
         if not self._locked:
             _lock_file(self._file)
             self._locked = True
+            if self._pending:
+                warnings.warn("mailbox .lock() method called with pending changes; "
+                              "should have been locked before making changes",
+                              stacklevel=2)
 
     def unlock(self):
         """Unlock the mailbox if it is locked."""
@@ -661,6 +667,15 @@ class _singlefileMailbox(Mailbox):
                                      '(expected %i, found %i)' %
                                      (self._file_length, cur_len))
 
+        if fcntl and self._locked and not hasattr(self._file, 'truncate'):
+            warnings.warn('as file.truncate() is unavailable, flush() may '
+                          'momentarily release the fcntl lock; if you depend '
+                          'on fcntl locking, you should regard flush() as '
+                          'invalidating the message keys', RuntimeWarning,
+                          stacklevel=2)
+
+        orig_file = self._file
+        remove_temp_file = True
         new_file = _create_temporary(self._path)
         try:
             new_toc = {}
@@ -678,32 +693,50 @@ class _singlefileMailbox(Mailbox):
                     new_file.write(buffer)
                 new_toc[key] = (new_start, new_file.tell())
                 self._post_message_hook(new_file)
-            self._file_length = new_file.tell()
-        except:
-            new_file.close()
-            os.remove(new_file.name)
-            raise
-        _sync_close(new_file)
-        # self._file is about to get replaced, so no need to sync.
-        self._file.close()
-        # Make sure the new file's mode is the same as the old file's
-        mode = os.stat(self._path).st_mode
-        os.chmod(new_file.name, mode)
-        try:
-            os.rename(new_file.name, self._path)
-        except OSError, e:
-            if e.errno == errno.EEXIST or \
-              (os.name == 'os2' and e.errno == errno.EACCES):
-                os.remove(self._path)
-                os.rename(new_file.name, self._path)
-            else:
-                raise
-        self._file = open(self._path, 'rb+')
-        self._toc = new_toc
-        self._pending = False
-        self._pending_sync = False
-        if self._locked:
-            _lock_file(self._file, dotlock=False)
+            new_len = new_file.tell()
+            _sync_flush(new_file)
+            new_file.seek(0)
+            self._file.seek(0)
+            if new_len < cur_len and not hasattr(self._file, 'truncate'):
+                # Ensure that orig_file won't write any more data when
+                # closed (we can't close it here as that would release
+                # any fcntl lock on the mailbox).
+                orig_file.flush()
+                try:
+                    if not os.path.samestat(os.fstat(self._file.fileno()),
+                                            os.stat(self._path)):
+                        raise ExternalClashError("Mailbox has been replaced: "
+                                                 "%s" % self._path)
+                except OSError as e:
+                    if e.errno == errno.ENOENT:
+                        raise NoSuchMailboxError(self._path)
+                    raise
+                except AttributeError:
+                    # No stat(), etc.
+                    pass
+                # *** race condition ***
+                remove_temp_file = False
+                self._file = open(self._path, 'wb+')
+            remove_temp_file = False
+            shutil.copyfileobj(new_file, self._file)
+            if hasattr(self._file, 'truncate'):
+                self._file.truncate(new_len)
+            self._file_length = new_len
+            self._toc = new_toc
+            _sync_flush(self._file)
+            remove_temp_file = True
+            self._pending = False
+            self._pending_sync = False
+        finally:
+            try:
+                new_file.close()
+                if remove_temp_file:
+                    os.remove(new_file.name)
+            finally:
+                if self._file is not orig_file:
+                    orig_file.close()
+                    if self._locked:
+                        _lock_file(self._file, dotlock=False)
 
     def _pre_mailbox_hook(self, f):
         """Called before writing the mailbox to file f."""
-------------- next part --------------
# HG changeset patch
# Parent a87930571eb6fe3ba650c9b02432e10d15602d9a

diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -13,6 +13,7 @@ from test import test_support
 import unittest
 import mailbox
 import glob
+from contextlib import contextmanager
 try:
     import fcntl
 except ImportError:
@@ -21,6 +22,27 @@ except ImportError:
 # Silence Py3k warning
 rfc822 = test_support.import_module('rfc822', deprecated=True)
 
+try:
+    import multiprocessing
+except ImportError:
+    multiprocessing = None
+else:
+    @contextmanager
+    def child_process(func, *args, **kwargs):
+        """Context manager to run a function concurrently in a child process.
+
+        Runs func(*args, **kwargs) in a subprocess using
+        multiprocessing and waits for it to terminate.
+
+        """
+        process = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
+        try:
+            process.start()
+            yield
+        finally:
+            process.join()
+
+
 class TestBase:
 
     def _check_sample(self, msg):
@@ -45,6 +67,64 @@ class TestBase:
             test_support.unlink(target)
 
 
+def random_message():
+    # Generate a random message body
+    import random
+    body = ""
+    for i in range(random.randint(1, 10)):
+        line = "a" * random.randint(0, 75) + '\n'
+        body += line
+
+    return body
+
+def add_25_messages(factory, path):
+    "Helper function to add 25 messages to a mailbox."
+    mbox = factory(path)
+    try:
+        for i in range(25):
+            msg = """Subject: %i, pid %i
+From: sender at example.com
+
+Content goes here.
+%s""" % (i, os.getpid(), random_message())
+            while True:
+                try:
+                    mbox.lock()
+                except mailbox.ExternalClashError:
+                    # In case of conflict, wait a bit and try again.
+                    time.sleep(0.01)
+                else:
+                    break
+            mbox.add(msg)
+            mbox.flush()
+            mbox.unlock()
+    finally:
+        mbox.close()
+
+def add_message(factory, path, msg):
+    # Add "msg" to mailbox at "path", using mailbox instance returned
+    # by "factory".
+    mbox = factory(path)
+    mbox.add(msg)
+    mbox.close()
+
+def only_yield():
+    yield
+
+def child_func(to_child, from_parent, child, child_args):
+    # Used by _subprocess method below.  Waits for Connection object
+    # "from_parent" to receive EOF, and then calls "child" with
+    # arguments "child_args".
+    to_child.close()
+    try:
+        from_parent.recv()
+    except EOFError:
+        pass
+    else:
+        raise AssertionError("Unexpectedly received data from parent process.")
+    from_parent.close()
+    child(*child_args)
+
 class TestMailbox(TestBase):
 
     _factory = None     # Overridden by subclasses to reuse tests
@@ -59,6 +139,47 @@ class TestMailbox(TestBase):
         self._box.close()
         self._delete_recursively(self._path)
 
+    def _acquire_lock(self, mbox=None):
+        # Keep trying to acquire lock on self._box (or mbox if given)
+        # until we get it.
+        if mbox is None:
+            mbox = self._box
+        while True:
+            try:
+                mbox.lock()
+                break
+            except mailbox.ExternalClashError:
+                time.sleep(0.01)
+
+    @contextmanager
+    def _locked(self, mbox=None):
+        # Context manager to lock and unlock self._box, or mbox if given.
+        if mbox is None:
+            mbox = self._box
+        try:
+            self._acquire_lock(mbox)
+            yield
+        finally:
+            mbox.unlock()
+
+    def _compare_mailbox(self, mapping, other=(), mbox=None):
+        # Check that .as_string() values of mbox contents match
+        # strings in "mapping" and "other".  Messages in "mapping"
+        # must be present under their respective keys, while messages
+        # in "other" may have any key.  No other messages may be
+        # present in mbox.
+        if mbox is None:
+            mbox = self._box
+        self.assertEqual(len(mbox), len(mapping) + len(other))
+        other = list(other)
+        for key in mbox.iterkeys():
+            msgstr = mbox[key].as_string()
+            if key in mapping:
+                self.assertEqual(mapping[key], msgstr)
+            else:
+                self.assertIn(msgstr, other)
+                del other[other.index(msgstr)]
+
     def test_add(self):
         # Add copies of a sample message
         keys = []
@@ -132,6 +253,38 @@ class TestMailbox(TestBase):
         self.assertEqual(len(self._box), 1)
         self.assertRaises(KeyError, lambda: self._box[key0])
 
+    def test_double_shorten(self):
+        # Check that flush() can shorten the mailbox twice
+        self._test_remove_two_of_three(broken_locking=False)
+
+    def test_remove_with_broken_locking(self):
+        # Check that a (broken) application releasing the lock and
+        # then removing messages using the existing keys does not
+        # delete the wrong messages.
+        self._test_remove_two_of_three(broken_locking=True)
+
+    def _test_remove_two_of_three(self, broken_locking=False):
+        self._box.lock()
+        key0 = self._box.add(self._template % 0)
+        key1 = self._box.add(self._template % 1)
+        key2 = self._box.add(self._template % 2)
+        self._box.flush()
+        self._box.remove(key0)
+        self._box.flush()
+        if broken_locking:
+            # As the name suggests, code that does this is broken
+            # (releasing the lock invalidates the keys, in general),
+            # but ideally mailbox.py should not break it further.
+            self._box.unlock()
+            self._box.lock()
+        self._box.remove(key1)
+        self._box.flush()
+        self._box.unlock()
+        self._box.close()
+        self._box = self._factory(self._path)
+        self.assertEqual(len(self._box), 1)
+        self.assertEqual(self._box.itervalues().next().get_payload(), '2\n')
+
     def test_get(self):
         # Retrieve messages using get()
         key0 = self._box.add(self._template % 0)
@@ -454,6 +607,216 @@ class TestMailbox(TestBase):
         self.assertRaises(TypeError,
                           lambda: self._box._dump_message(None, output))
 
+    @unittest.skipIf(multiprocessing is None, "requires multiprocessing")
+    def test_concurrent_add(self):
+        # Simple test of concurrent addition to a mailbox.
+        # This exercises the add() and flush() methods, based on bug #1599254.
+        # This bug affected only the classes based on _singlefileMailbox
+        # (mbox, MMDF, Babyl), but this test can apply to any mailbox type.
+
+        self._box.close()
+
+        # Fire off a subprocess that will add 25 messages to a mailbox
+        # file, locking and unlocking it each time.  The parent process
+        # will do the same.  The resulting mailbox should contain 50 messages.
+        with child_process(add_25_messages, self._factory, self._path):
+            add_25_messages(self._factory, self._path)
+
+        # We expect the mailbox to contain 50 messages.
+        self._box = self._factory(self._path)
+        self._box.lock()
+        self.assertEqual(len(self._box), 50)
+        self._box.unlock()
+
+    def _subprocess(self, parent, child, child_args, inspect=None, path=None,
+                    lock1=False, lock2=False, flush=False):
+        # Method to run code in parent and child processes under
+        # various conditions.  The function "child" is run in the
+        # child process with arguments "child_args", while "parent"
+        # should be a generator function which yields when it wants to
+        # allow the child to run; once the child has returned, the
+        # generator will be resumed.  Finally, the function "inspect"
+        # will be run.  Both "parent" and "inspect" are called with no
+        # arguments, and separate mailbox instances on self._box.
+        #
+        # If "lock1" is true, self._box will be locked when the first
+        # step of the parent generator is run, and unlocked when it
+        # yields.  If "flush" is true, self._box.flush() will be
+        # called when the generator first yields, before releasing the
+        # lock (if set) and allowing the child to run.  If "lock2" is
+        # true, self._box will be locked during the second step.
+        if multiprocessing is None:
+            self.skipTest("requires multiprocessing")
+        if path is None:
+            path = self._path
+        @contextmanager
+        def nullcm(*args, **kwargs):
+            yield
+        lock1cm = self._locked if lock1 else nullcm
+        lock2cm = self._locked if lock2 else nullcm
+        self._box.close()
+        self._delete_recursively(self._path)
+        from_parent, to_child = multiprocessing.Pipe(duplex=False)
+        with child_process(child_func, to_child, from_parent,
+                           child, child_args):
+            from_parent.close()
+            try:
+                self._box = self._factory(path)
+                parent_iter = parent()
+                with lock1cm():
+                    parent_iter.next()
+                    if flush:
+                        self._box.flush()
+            finally:
+                to_child.close()  # Allow child to continue
+        with lock2cm():
+            try:
+                parent_iter.next()
+            except StopIteration:
+                pass
+        self._box.close()
+        if inspect is not None:
+            self._box = self._factory(path)
+            inspect()
+
+    def _subprocess_correct(self, parent, child, child_args,
+                            inspect=None, path=None):
+        # Run with proper locking and flushing in parent.
+        self._subprocess(parent, child, child_args, inspect, path,
+                         lock1=True, lock2=True, flush=True)
+
+    def _subprocess_modify_unlocked_flush(self, parent, child, child_args,
+                                          inspect=None, path=None):
+        # Run first step unlocked, but flush before yielding to child.
+        self._subprocess(parent, child, child_args, inspect, path,
+                         lock1=False, lock2=True, flush=True)
+
+    def _subprocess_modify_unlocked(self, parent, child, child_args,
+                                    inspect=None, path=None):
+        # Run first step without locks, and yield to child without flushing.
+        self._subprocess(parent, child, child_args, inspect, path,
+                         lock1=False, lock2=True, flush=False)
+
+    def _subprocess_tests(self, parent, child, child_args,
+                          inspect=None, path=None):
+        # Run with some particular conditions we want to test for.
+        self._subprocess_correct(parent, child, child_args, inspect, path)
+        self._subprocess_modify_unlocked_flush(parent, child, child_args,
+                                               inspect, path)
+        self._subprocess_modify_unlocked(parent, child, child_args,
+                                         inspect, path)
+
+    def test_subprocess(self):
+        # Check that self._subprocess runs OK with various options.
+        for n in range(8):
+            self._subprocess(only_yield, only_yield, (), lambda: None,
+                             lock1=(n & 4), lock2=(n & 2), flush=(n & 1))
+
+    def test_add_by_other(self):
+        # Check that other process can add a message and we can read it.
+        msg = self._template % 0
+        def parent():
+            yield
+            self._compare_mailbox({}, [msg])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, msg))
+
+    def test_add_by_other_reread(self):
+        # Check we can read other process' message after writing our own.
+        msgp = self._template % 0
+        msgc = self._template % 1
+        def parent():
+            key = self._box.add(msgp)
+            yield
+            self._compare_mailbox({key: msgp}, [msgc])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, msgc))
+
+    def test_interleave(self):
+        # Check that other process can add a message in between our messages.
+        p1 = self._template % "p1"
+        p2 = self._template % "p2"
+        c1 = self._template % "c1"
+        def parent():
+            k1 = self._box.add(p1)
+            yield
+            k2 = self._box.add(p2)
+            self._compare_mailbox({k1: p1, k2: p2}, [c1])
+        def inspect():
+            self._compare_mailbox({}, [p1, c1, p2])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, c1), inspect)
+
+    def test_delete_reread(self):
+        # Have other process add a message after we've deleted one.
+        p1 = self._template % "p1"
+        c1 = self._template % "c1"
+        def parent():
+            k1 = self._box.add(p1)
+            del self._box[k1]
+            yield
+            self._compare_mailbox({}, [c1])
+        def inspect():
+            self._compare_mailbox({}, [c1])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, c1), inspect)
+
+    def test_delete_reread2(self):
+        # As above, but have parent add more messages before and after.
+        p1 = self._template % "p1"
+        p2 = self._template % "p2"
+        p3 = self._template % "p3"
+        p4 = self._template % "p4"
+        c1 = self._template % "c1"
+        def parent():
+            k1 = self._box.add(p1)
+            k2 = self._box.add(p2)
+            del self._box[k2]
+            k3 = self._box.add(p3)
+            yield
+            k4 = self._box.add(p4)
+            self._compare_mailbox({k1: p1, k3: p3, k4: p4}, [c1])
+        def inspect():
+            self._compare_mailbox({}, [p1, p3, c1, p4])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, c1), inspect)
+
+    def test_replace_reread(self):
+        # Have other process add a message after we've replaced one.
+        p1 = self._template % "p1"
+        p2 = self._template % "p2"
+        c1 = self._template % "c1"
+        def parent():
+            k1 = self._box.add(p1)
+            self._box[k1] = p2
+            yield
+            self._compare_mailbox({k1: p2}, [c1])
+        def inspect():
+            self._compare_mailbox({}, [p2, c1])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, c1), inspect)
+
+    def test_replace_reread2(self):
+        # As above, but have parent add more messages before and after.
+        p1 = self._template % "p1"
+        p2 = self._template % "p2"
+        p3 = self._template % "p3"
+        p4 = self._template % "p4"
+        p5 = self._template % "p5"
+        c1 = self._template % "c1"
+        def parent():
+            k1 = self._box.add(p1)
+            k2 = self._box.add(p2)
+            self._box[k2] = p3
+            k4 = self._box.add(p4)
+            yield
+            k5 = self._box.add(p5)
+            self._compare_mailbox({k1: p1, k2: p3, k4: p4, k5: p5}, [c1])
+        def inspect():
+            self._compare_mailbox({}, [p1, p3, p4, c1, p5])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, c1), inspect)
+
     def _get_lock_path(self):
         # Return the path of the dot lock file. May be overridden.
         return self._path + '.lock'
@@ -494,11 +857,13 @@ class TestMailboxSuperclass(TestBase, un
         self.assertRaises(NotImplementedError, lambda: box.close())
 
 
+def factory_Maildir(path, factory=None):
+    return mailbox.Maildir(path, factory)
+
 class TestMaildir(TestMailbox, unittest.TestCase):
 
-    _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
-
     def setUp(self):
+        self._factory = factory_Maildir
         TestMailbox.setUp(self)
         if os.name in ('nt', 'os2') or sys.platform == 'cygwin':
             self._box.colon = '!'
@@ -985,9 +1350,14 @@ class _TestMboxMMDF(_TestSingleFile):
         self._box.close()
 
 
+def factory_mbox(path, factory=None):
+    return mailbox.mbox(path, factory)
+
 class TestMbox(_TestMboxMMDF, unittest.TestCase):
 
-    _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
+    def setUp(self):
+        self._factory = factory_mbox
+        _TestMboxMMDF.setUp(self)
 
     @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
     @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
@@ -1032,14 +1402,24 @@ class TestMbox(_TestMboxMMDF, unittest.T
             self.assertEqual(data[-3:], '0\n\n')
 
 
+def factory_MMDF(path, factory=None):
+    return mailbox.MMDF(path, factory)
+
 class TestMMDF(_TestMboxMMDF, unittest.TestCase):
 
-    _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
+    def setUp(self):
+        self._factory = factory_MMDF
+        _TestMboxMMDF.setUp(self)
 
 
+def factory_MH(path, factory=None):
+    return mailbox.MH(path, factory)
+
 class TestMH(TestMailbox, unittest.TestCase):
 
-    _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
+    def setUp(self):
+        self._factory = factory_MH
+        TestMailbox.setUp(self)
 
     def test_list_folders(self):
         # List folders
@@ -1169,9 +1549,14 @@ class TestMH(TestMailbox, unittest.TestC
         return os.path.join(self._path, '.mh_sequences.lock')
 
 
+def factory_Babyl(path, factory=None):
+    return mailbox.Babyl(path, factory)
+
 class TestBabyl(_TestSingleFile, unittest.TestCase):
 
-    _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
+    def setUp(self):
+        self._factory = factory_Babyl
+        _TestSingleFile.setUp(self)
 
     def tearDown(self):
         self._box.close()


More information about the Python-bugs-list mailing list