[Python-checkins] python/nondist/sandbox/mailbox libmailbox.tex, 1.13, 1.14 mailbox.py, 1.14, 1.15 test_mailbox.py, 1.9, 1.10
gregorykjohnson@users.sourceforge.net
gregorykjohnson at users.sourceforge.net
Mon Aug 22 22:19:39 CEST 2005
Update of /cvsroot/python/python/nondist/sandbox/mailbox
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv15538
Modified Files:
libmailbox.tex mailbox.py test_mailbox.py
Log Message:
* Fix various things detected by pychecker.
* Fix various things detected by testing on Windows.
* Tests pass on Windows, but there are still some line-ending issues.
Index: libmailbox.tex
===================================================================
RCS file: /cvsroot/python/python/nondist/sandbox/mailbox/libmailbox.tex,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -d -r1.13 -r1.14
--- libmailbox.tex 21 Aug 2005 19:02:47 -0000 1.13
+++ libmailbox.tex 22 Aug 2005 20:19:27 -0000 1.14
@@ -261,6 +261,19 @@
nesting is indicated using \character{.} to delimit levels, e.g.,
"Archived.2005.07".
+\begin{notice}
+The Maildir specification requires the use of a colon (\character{:}) in
+certain message file names. Some operating systems do not permit this character
+in file names, however. If you wish to use a Maildir-like format on such an
+operating system, you should specify another character to use instead. The
+exclamation point (\character{!}) is a popular choice. For example:
+\begin{verbatim}
+import mailbox
+mailbox.Maildir.colon = '!'
+\end{verbatim}
+The \member{colon} attribute may also be set on a per-instance basis.
+\end{notice}
+
\class{Maildir} instances have all of the methods of \class{Mailbox} in
addition to the following:
Index: mailbox.py
===================================================================
RCS file: /cvsroot/python/python/nondist/sandbox/mailbox/mailbox.py,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -d -r1.14 -r1.15
--- mailbox.py 21 Aug 2005 21:17:53 -0000 1.14
+++ mailbox.py 22 Aug 2005 20:19:28 -0000 1.15
@@ -7,7 +7,6 @@
import calendar
import socket
import errno
-import stat
import copy
import email
import email.Message
@@ -64,7 +63,7 @@
def __getitem__(self, key):
"""Return the keyed message; raise KeyError if it doesn't exist."""
- if self._factory is None:
+ if not self._factory:
return self.get_message(key)
else:
return self._factory(self.get_file(key))
@@ -175,7 +174,7 @@
"""Lock the mailbox."""
raise NotImplementedError('Method must be implemented by subclass')
- def unlock(self, f=None):
+ def unlock(self):
"""Unlock the mailbox if it is locked."""
raise NotImplementedError('Method must be implemented by subclass')
@@ -190,7 +189,8 @@
generator.flatten(message)
elif isinstance(message, str):
if mangle_from_:
- message = message.replace('\nFrom ', '\n>From ')
+ message = message.replace(os.linesep + 'From ',
+ os.linesep + '>From ')
target.write(message)
elif hasattr(message, 'read'):
if mangle_from_:
@@ -214,6 +214,8 @@
class Maildir(Mailbox):
"""A qmail-style Maildir mailbox."""
+ colon = ':'
+
def __init__(self, dirname, factory=rfc822.Message, create=True):
"""Initialize a Maildir instance."""
Mailbox.__init__(self, dirname, factory, create)
@@ -236,13 +238,13 @@
tmp_file.close()
if isinstance(message, MaildirMessage):
subdir = message.get_subdir()
- suffix = ':' + message.get_info()
- if suffix == ':':
+ suffix = self.colon + message.get_info()
+ if suffix == self.colon:
suffix = ''
else:
subdir = 'new'
suffix = ''
- uniq = os.path.basename(tmp_file.name).split(':')[0]
+ uniq = os.path.basename(tmp_file.name).split(self.colon)[0]
dest = os.path.join(self._path, subdir, uniq + suffix)
os.rename(tmp_file.name, dest)
if isinstance(message, MaildirMessage):
@@ -261,7 +263,7 @@
except KeyError:
pass
except OSError, e:
- if errno == errno.ENOENT:
+ if e.errno == errno.ENOENT:
pass
else:
raise
@@ -278,8 +280,8 @@
# temp's subdir and suffix were defaults from add().
dominant_subpath = old_subpath
subdir = os.path.dirname(dominant_subpath)
- if ':' in dominant_subpath:
- suffix = ':' + dominant_subpath.split(':')[-1]
+ if self.colon in dominant_subpath:
+ suffix = self.colon + dominant_subpath.split(self.colon)[-1]
else:
suffix = ''
self.discard(key)
@@ -292,21 +294,21 @@
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
subpath = self._lookup(key)
- f = file(os.path.join(self._path, subpath), 'r')
+ f = file(os.path.join(self._path, subpath), 'rb')
try:
msg = MaildirMessage(f)
finally:
f.close()
subdir, name = os.path.split(subpath)
msg.set_subdir(subdir)
- if ':' in name:
- msg.set_info(name.split(':')[-1])
+ if self.colon in name:
+ msg.set_info(name.split(self.colon)[-1])
msg.set_date(os.path.getmtime(os.path.join(self._path, subpath)))
return msg
def get_string(self, key):
"""Return a string representation or raise a KeyError."""
- f = file(os.path.join(self._path, self._lookup(key)), 'r')
+ f = file(os.path.join(self._path, self._lookup(key)), 'rb')
try:
return f.read()
finally:
@@ -314,7 +316,7 @@
def get_file(self, key):
"""Return a file-like representation or raise a KeyError."""
- f = file(os.path.join(self._path, self._lookup(key)), 'r')
+ f = file(os.path.join(self._path, self._lookup(key)), 'rb')
return _ProxyFile(f)
def iterkeys(self):
@@ -413,14 +415,14 @@
if ':' in hostname:
hostname = hostname.replace(':', r'\072')
uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
- self._count, hostname)
+ Maildir._count, hostname)
path = os.path.join(self._path, 'tmp', uniq)
try:
os.stat(path)
except OSError, e:
if e.errno == errno.ENOENT:
- self._count += 1
- return file(path, 'w+')
+ Maildir._count += 1
+ return file(path, 'wb+')
else:
raise
else:
@@ -432,7 +434,7 @@
self._toc = {}
for subdir in ('new', 'cur'):
for entry in os.listdir(os.path.join(self._path, subdir)):
- uniq = entry.split(':')[0]
+ uniq = entry.split(self.colon)[0]
self._toc[uniq] = os.path.join(subdir, entry)
def _lookup(self, key):
@@ -451,12 +453,15 @@
# This method is for backward compatibility only.
def next(self):
"""Return the next message in a one-time iteration."""
- if not hasattr(self, '_onetime_iterator'):
- self._onetime_iterator = self.itervalues()
- try:
- return self._onetime_iterator.next()
- except StopIteration:
- return None
+ if not hasattr(self, '_onetime_keys'):
+ self._onetime_keys = self.iterkeys()
+ while True:
+ try:
+ return self[self._onetime_keys.next()]
+ except StopIteration:
+ return None
+ except KeyError:
+ continue
class _singlefileMailbox(Mailbox):
@@ -466,15 +471,15 @@
"""Initialize a single-file mailbox."""
Mailbox.__init__(self, path, factory, create)
try:
- f = file(self._path, 'r+')
+ f = file(self._path, 'rb+')
except IOError, e:
if e.errno == errno.ENOENT:
if create:
- f = file(self._path, 'w+')
+ f = file(self._path, 'wb+')
else:
raise NoSuchMailboxError(self._path)
elif e.errno == errno.EACCES:
- f = file(self._path, 'r')
+ f = file(self._path, 'rb')
else:
raise
self._file = f
@@ -525,7 +530,7 @@
_lock_file(self._file)
self._locked = True
- def unlock(self, f=None):
+ def unlock(self):
"""Unlock the mailbox if it is locked."""
if self._locked:
_unlock_file(self._file)
@@ -567,7 +572,7 @@
os.rename(new_file.name, self._path)
else:
raise
- self._file = file(self._path, 'r+')
+ self._file = file(self._path, 'rb+')
self._toc = new_toc
self._pending = False
if self._locked:
@@ -624,7 +629,7 @@
self._file.seek(start)
from_line = self._file.readline()
msg = self._message_factory(self._file.read(stop - self._file.tell()))
- msg.set_from(from_line[5:-1])
+ msg.set_from(from_line[5:-len(os.linesep)])
return msg
def get_string(self, key, from_=False):
@@ -661,7 +666,7 @@
if from_line is None:
from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
start = self._file.tell()
- self._file.write('%s%s' % (from_line, os.linesep))
+ self._file.write(from_line + os.linesep)
self._dump_message(message, self._file, self._mangle_from_)
stop = self._file.tell()
return (start, stop)
@@ -710,11 +715,11 @@
def _pre_message_hook(self, f):
"""Called before writing each message to file f."""
- f.write('\001\001\001\001\n')
+ f.write('\001\001\001\001' + os.linesep)
def _post_message_hook(self, f):
"""Called after writing each message to file f."""
- f.write('\n\001\001\001\001\n')
+ f.write(os.linesep + '\001\001\001\001' + os.linesep)
def _generate_toc(self):
"""Generate key-to-(start, stop) table of contents."""
@@ -785,7 +790,7 @@
"""Remove the keyed message; raise KeyError if it doesn't exist."""
path = os.path.join(self._path, str(key))
try:
- f = file(path, 'r+')
+ f = file(path, 'rb+')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
@@ -807,7 +812,7 @@
"""Replace the keyed message; raise KeyError if it doesn't exist."""
path = os.path.join(self._path, str(key))
try:
- f = file(path, 'r+')
+ f = file(path, 'rb+')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
@@ -831,9 +836,9 @@
"""Return a Message representation or raise a KeyError."""
try:
if self._locked:
- f = file(os.path.join(self._path, str(key)), 'r+')
+ f = file(os.path.join(self._path, str(key)), 'rb+')
else:
- f = file(os.path.join(self._path, str(key)), 'r')
+ f = file(os.path.join(self._path, str(key)), 'rb')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
@@ -858,9 +863,9 @@
"""Return a string representation or raise a KeyError."""
try:
if self._locked:
- f = file(os.path.join(self._path, str(key)), 'r+')
+ f = file(os.path.join(self._path, str(key)), 'rb+')
else:
- f = file(os.path.join(self._path, str(key)), 'r')
+ f = file(os.path.join(self._path, str(key)), 'rb')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
@@ -873,14 +878,14 @@
return f.read()
finally:
if self._locked:
- _unlock_file()
+ _unlock_file(f)
finally:
f.close()
def get_file(self, key):
"""Return a file-like representation or raise a KeyError."""
try:
- f = file(os.path.join(self._path, str(key)), 'r')
+ f = file(os.path.join(self._path, str(key)), 'rb')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
@@ -904,11 +909,11 @@
def lock(self):
"""Lock the mailbox."""
if not self._locked:
- self._file = file(os.path.join(self._path, '.mh_sequences'), 'r+')
+ self._file = file(os.path.join(self._path, '.mh_sequences'), 'rb+')
_lock_file(self._file)
self._locked = True
- def unlock(self, f=None):
+ def unlock(self):
"""Unlock the mailbox if it is locked."""
if self._locked:
_unlock_file(self._file)
@@ -956,7 +961,7 @@
def get_sequences(self):
"""Return a name-to-key-list dictionary to define each sequence."""
results = {}
- f = file(os.path.join(self._path, '.mh_sequences'), 'r')
+ f = file(os.path.join(self._path, '.mh_sequences'), 'rb')
try:
all_keys = set(self.keys())
for line in f:
@@ -982,7 +987,7 @@
def set_sequences(self, sequences):
"""Set sequences using the given name-to-key-list dictionary."""
- f = file(os.path.join(self._path, '.mh_sequences'), 'r+')
+ f = file(os.path.join(self._path, '.mh_sequences'), 'rb+')
try:
os.close(os.open(f.name, os.O_WRONLY | os.O_TRUNC))
for name, keys in sequences.iteritems():
@@ -1003,7 +1008,7 @@
f.write(' %s' % key)
prev = key
if completing:
- f.write('%s%s' % (prev, os.linesep))
+ f.write(str(prev) + os.linesep)
else:
f.write(os.linesep)
finally:
@@ -1017,7 +1022,7 @@
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
- f = file(os.path.join(self._path, str(key)), 'r+')
+ f = file(os.path.join(self._path, str(key)), 'rb+')
try:
if self._locked:
_lock_file(f)
@@ -1039,7 +1044,6 @@
self._next_key = prev + 1
if len(changes) == 0:
return
- keys = self.keys()
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
@@ -1176,16 +1180,17 @@
def _pre_mailbox_hook(self, f):
"""Called before writing the mailbox to file f."""
- f.write('BABYL OPTIONS:\nVersion: 5\nLabels:%s\n\037' %
- ','.join(self.get_labels()))
+ f.write('BABYL OPTIONS:%sVersion: 5%sLabels:%s%s\037' %
+ (os.linesep, os.linesep, ','.join(self.get_labels()),
+ os.linesep))
def _pre_message_hook(self, f):
"""Called before writing each message to file f."""
- f.write('\014\n')
+ f.write('\014' + os.linesep)
def _post_message_hook(self, f):
"""Called after writing each message to file f."""
- f.write('\n\037')
+ f.write(os.linesep + '\037')
def _install_message(self, message):
"""Write message contents and return (start, stop)."""
@@ -1204,9 +1209,9 @@
self._file.write(',,')
for label in labels:
self._file.write(' ' + label + ',')
- self._file.write('\n')
+ self._file.write(os.linesep)
else:
- self._file.write('1,,\n')
+ self._file.write('1,,' + os.linesep)
if isinstance(message, email.Message.Message):
pseudofile = StringIO.StringIO()
ps_generator = email.Generator.Generator(pseudofile, False, 0)
@@ -1242,7 +1247,7 @@
self._file.write(message[:body_start])
self._file.write(message[body_start:])
else:
- self._file.write('*** EOOH ***%s%s' % (os.linesep, os.linesep))
+ self._file.write('*** EOOH ***' + os.linesep + os.linesep)
self._file.write(message)
elif hasattr(message, 'readline'):
original_pos = message.tell()
@@ -1486,7 +1491,7 @@
try:
message.set_date(calendar.timegm(time.strptime(maybe_date,
'%a %b %d %H:%M:%S %Y')))
- except ValueError, OverflowError:
+ except (ValueError, OverflowError):
pass
elif isinstance(message, _mboxMMDFMessage):
message.set_flags(self.get_flags())
@@ -1807,7 +1812,7 @@
pre_lock = _create_temporary(f.name + '.lock')
pre_lock.close()
except IOError, e:
- if e.errno == errno.EACCESS:
+ if e.errno == errno.EACCES:
return # Without write access, just skip dotlocking.
else:
raise
@@ -1846,7 +1851,7 @@
"""Create a file if it doesn't exist and open for reading and writing."""
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
try:
- return file(path, 'r+')
+ return file(path, 'rb+')
finally:
os.close(fd)
Index: test_mailbox.py
===================================================================
RCS file: /cvsroot/python/python/nondist/sandbox/mailbox/test_mailbox.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- test_mailbox.py 21 Aug 2005 21:17:53 -0000 1.9
+++ test_mailbox.py 22 Aug 2005 20:19:28 -0000 1.10
@@ -49,7 +49,7 @@
class TestMailbox(TestBase):
_factory = None # Overridden by subclasses to reuse tests
- _template = 'From: foo%s%s' % (os.linesep, os.linesep) + '%s'
+ _template = 'From: foo' + os.linesep + os.linesep + '%s'
def setUp(self):
self._path = test_support.TESTFN
@@ -457,14 +457,19 @@
_factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
+ def setUp(self):
+ TestMailbox.setUp(self)
+ if os.name == 'nt':
+ self._box.colon = '!'
+
def test_add_MM(self):
# Add a MaildirMessage instance
msg = mailbox.MaildirMessage(self._template % 0)
msg.set_subdir('cur')
msg.set_info('foo')
key = self._box.add(msg)
- self.assert_(os.path.exists(os.path.join(self._path, 'cur',
- "%s:foo" % key)))
+ self.assert_(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
+ (key, self._box.colon))))
def test_get_MM(self):
# Get a MaildirMessage instance
@@ -504,10 +509,10 @@
# Initialize a non-existent mailbox
self.tearDown()
self._box = mailbox.Maildir(self._path)
- self._check_basics(factory=rfc822.Message, perms=True)
+ self._check_basics(factory=rfc822.Message)
self._delete_recursively(self._path)
self._box = self._factory(self._path, factory=None)
- self._check_basics(perms=True)
+ self._check_basics()
def test_initialize_existing(self):
# Initialize an existing mailbox
@@ -519,7 +524,7 @@
self._box = mailbox.Maildir(self._path, factory=None)
self._check_basics()
- def _check_basics(self, factory=None, perms=False):
+ def _check_basics(self, factory=None):
# (Used by test_open_new() and test_open_existing().)
self.assertEqual(self._box._path, os.path.abspath(self._path))
self.assertEqual(self._box._factory, factory)
@@ -527,9 +532,6 @@
path = os.path.join(self._path, subdir)
mode = os.stat(path)[stat.ST_MODE]
self.assert_(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
- if perms:
- self.assert_(stat.S_IMODE(mode) ^ 0700 == 0,
- "Accessible by group or other: '%s'" % path)
def test_list_folders(self):
# List folders
@@ -668,20 +670,23 @@
class _TestMboxMMDF(TestMailbox):
def tearDown(self):
+ self._box.close()
self._delete_recursively(self._path)
for lock_remnant in glob.glob(self._path + '.*'):
os.remove(lock_remnant)
def test_add_from_string(self):
# Add a string starting with 'From ' to the mailbox
- key = self._box.add('From foo at bar blah\nFrom: foo\n\n0')
+ key = self._box.add('From foo at bar blah%sFrom: foo%s%s0' %
+ (os.linesep, os.linesep, os.linesep))
self.assert_(self._box[key].get_from() == 'foo at bar blah')
self.assert_(self._box[key].get_payload() == '0')
def test_add_mbox_or_mmdf_message(self):
# Add an mboxMessage or MMDFMessage
for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
- msg = class_('From foo at bar blah\nFrom: foo\n\n0')
+ msg = class_('From foo at bar blah%sFrom: foo%s%s0' %
+ (os.linesep, os.linesep, os.linesep))
key = self._box.add(msg)
def test_open_close_open(self):
@@ -708,7 +713,7 @@
self._box._file.seek(0)
contents = self._box._file.read()
self._box.close()
- self.assert_(contents == file(self._path, 'r').read())
+ self.assert_(contents == file(self._path, 'rb').read())
self._box = self._factory(self._path)
@@ -821,6 +826,12 @@
_factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
+ def tearDown(self):
+ self._box.close()
+ self._delete_recursively(self._path)
+ for lock_remnant in glob.glob(self._path + '.*'):
+ os.remove(lock_remnant)
+
def test_labels(self):
# Get labels from the mailbox
self.assert_(self._box.get_labels() == [])
@@ -1414,12 +1425,12 @@
proxy.seek(0)
self.assert_(proxy.readlines() == ['foo' + os.linesep,
'bar' + os.linesep,
- 'fred' + os.linesep, 'bob'])
+ 'fred' + os.linesep, 'bob'])
proxy.seek(0)
self.assert_(proxy.readlines(2) == ['foo' + os.linesep])
proxy.seek(3 + len(os.linesep))
- self.assert_(proxy.readlines(5) == ['bar' + os.linesep,
- 'fred' + os.linesep])
+ self.assert_(proxy.readlines(4 + len(os.linesep)) ==
+ ['bar' + os.linesep, 'fred' + os.linesep])
proxy.seek(3)
self.assert_(proxy.readlines(1000) == [os.linesep, 'bar' + os.linesep,
'fred' + os.linesep, 'bob'])
@@ -1458,7 +1469,7 @@
def setUp(self):
self._path = test_support.TESTFN
- self._file = file(self._path, 'w+')
+ self._file = file(self._path, 'wb+')
def tearDown(self):
self._file.close()
@@ -1507,7 +1518,7 @@
def setUp(self):
self._path = test_support.TESTFN
- self._file = file(self._path, 'w+')
+ self._file = file(self._path, 'wb+')
def tearDown(self):
self._file.close()
@@ -1546,7 +1557,7 @@
def test_seek_and_tell(self):
self._file.write('(((foo%sbar%s$$$' % (os.linesep, os.linesep))
self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
- 8 + 3 * len(os.linesep)))
+ 9 + 2 * len(os.linesep)))
def test_close(self):
self._file.write('&foo%sbar%s^' % (os.linesep, os.linesep))
More information about the Python-checkins
mailing list