[Spambayes-checkins] spambayes/spambayes/test test_Corpus.py, NONE,
1.1 test_FileCorpus.py, NONE, 1.1
Tony Meyer
anadelonbrin at users.sourceforge.net
Thu Jan 20 04:37:57 CET 2005
Update of /cvsroot/spambayes/spambayes/spambayes/test
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv1916/spambayes/test
Added Files:
test_Corpus.py test_FileCorpus.py
Log Message:
Add unit tests for Corpus and FileCorpus modules.
--- NEW FILE: test_Corpus.py ---
# Test the Corpus module.
import sys
import time
import unittest
import sb_test_support
sb_test_support.fix_sys_path()
from spambayes.Corpus import Corpus, ExpiryCorpus, MessageFactory
# We borrow the test messages that test_sb_server uses.
from test_sb_server import good1, spam1, malformed1
class simple_msg(object):
def __init__(self, key):
self._key = key
self.creation_time = time.time()
self.loaded = False
def createTimestamp(self):
return self.creation_time
def key(self):
return self._key
def load(self):
self.loaded = True
class simple_observer(object):
# Just want to tell that they have been called, so raise particular
# errors.
def onAddMessage(self, msg, flags):
raise ValueError()
def onRemoveMessage(self, msg, flags):
raise TypeError()
class CorpusTest(unittest.TestCase):
def setUp(self):
self.factory = MessageFactory()
self.cacheSize = 100
self.corpus = Corpus(self.factory, self.cacheSize)
def test___init__(self):
self.assertEqual(self.corpus.cacheSize, self.cacheSize)
self.assertEqual(self.corpus.msgs, {})
self.assertEqual(self.corpus.keysInMemory, [])
self.assertEqual(self.corpus.observers, [])
self.assertEqual(self.corpus.factory, self.factory)
def test_addObserver(self):
self.corpus.addObserver(simple_observer())
self.assertRaises(ValueError, self.corpus.addMessage,
simple_msg(0))
self.assertRaises(TypeError, self.corpus.removeMessage,
simple_msg(1))
def test_addMessage(self):
msg = simple_msg(0)
self.assertEqual(self.corpus.get(0), None)
self.corpus.addMessage(msg)
self.assertEqual(self.corpus[0], msg)
def test_removeMessage(self):
msg = simple_msg(0)
self.assertEqual(self.corpus.get(0), None)
self.corpus.addMessage(msg)
self.assertEqual(self.corpus[0], msg)
self.corpus.removeMessage(msg)
self.assertEqual(self.corpus.get(0), None)
def test_cacheMessage(self):
msg = simple_msg(0)
self.corpus.cacheMessage(msg)
self.assertEqual(self.corpus.msgs[0], msg)
self.assert_(0 in self.corpus.keysInMemory)
def test_flush_cache(self):
self.corpus.cacheSize = 1
msg = simple_msg(0)
self.corpus.cacheMessage(msg)
self.assertEqual(self.corpus.msgs[0], msg)
self.assert_(0 in self.corpus.keysInMemory)
msg = simple_msg(1)
self.corpus.cacheMessage(msg)
self.assertEqual(self.corpus.msgs[1], msg)
self.assert_(1 in self.corpus.keysInMemory)
self.assert_(0 not in self.corpus.keysInMemory)
def test_unCacheMessage(self):
msg = simple_msg(0)
self.corpus.cacheMessage(msg)
self.assertEqual(self.corpus.msgs[0], msg)
self.assert_(0 in self.corpus.keysInMemory)
self.corpus.unCacheMessage(msg)
self.assert_(0 in self.corpus.keysInMemory)
def test_takeMessage(self):
other_corpus = Corpus(self.factory, self.cacheSize)
msg = simple_msg(0)
other_corpus.addMessage(msg)
self.assertEqual(self.corpus.get(0), None)
self.corpus.takeMessage(0, other_corpus)
self.assertEqual(msg.loaded, True)
self.assertEqual(other_corpus.get(0), None)
self.assertEqual(self.corpus.get(0), msg)
def test_get(self):
ids = [0, 1, 2]
for id in ids:
self.corpus.addMessage(simple_msg(id))
self.assertEqual(self.corpus.get(0).key(), 0)
def test_get_fail(self):
ids = [0, 1, 2]
for id in ids:
self.corpus.addMessage(simple_msg(id))
self.assertEqual(self.corpus.get(4), None)
def test_get_default(self):
ids = [0, 1, 2]
for id in ids:
self.corpus.addMessage(simple_msg(id))
self.assertEqual(self.corpus.get(4, "test"), "test")
def test___getitem__(self):
ids = [0, 1, 2]
for id in ids:
self.corpus.addMessage(simple_msg(id))
self.assertEqual(self.corpus[0].key(), 0)
def test___getitem___fail(self):
ids = [0, 1, 2]
for id in ids:
self.corpus.addMessage(simple_msg(id))
self.assertRaises(NotImplementedError, self.corpus.__getitem__, 4)
def test_keys(self):
self.assertEqual(self.corpus.keys(), [])
ids = [0, 1, 2]
for id in ids:
self.corpus.addMessage(simple_msg(id))
self.assertEqual(self.corpus.keys(), ids)
def test___iter__(self):
self.assertEqual(tuple(self.corpus), ())
msgs = (simple_msg(0), simple_msg(1), simple_msg(2))
for msg in msgs:
self.corpus.addMessage(msg)
self.assertEqual(tuple(self.corpus), msgs)
def test_makeMessage_no_content(self):
key = "testmessage"
self.assertRaises(NotImplementedError, self.corpus.makeMessage, key)
def test_makeMessage_with_content(self):
key = "testmessage"
content = good1
self.assertRaises(NotImplementedError, self.corpus.makeMessage,
key, content)
class ExpiryCorpusTest(unittest.TestCase):
def setUp(self):
class Mixed(Corpus, ExpiryCorpus):
def __init__(self, expireBefore, factory, cacheSize):
Corpus.__init__(self, factory, cacheSize)
ExpiryCorpus.__init__(self, expireBefore)
self.factory = MessageFactory()
self.cacheSize = 100
self.expireBefore = 10.0
self.corpus = Mixed(self.expireBefore, self.factory,
self.cacheSize)
def test___init___expiry(self):
self.assertEqual(self.corpus.expireBefore, self.expireBefore)
def test_removeExpiredMessages(self):
# Put messages in to expire.
expire = [simple_msg(1), simple_msg(2)]
for msg in expire:
self.corpus.addMessage(msg)
# Ensure that we don't expire the wrong ones.
self.corpus.expireBefore = 0.25
time.sleep(0.5)
# Put messages in to not expire.
not_expire = [simple_msg(3), simple_msg(4)]
for msg in not_expire:
self.corpus.addMessage(msg)
# Run expiry.
self.corpus.removeExpiredMessages()
# Check that expired messages are gone.
for msg in expire:
self.assertEqual(msg in self.corpus, False)
# Check that not expired messages are still there.
for msg in not_expire:
self.assertEqual(msg in self.corpus, True)
def suite():
suite = unittest.TestSuite()
clses = (CorpusTest,
ExpiryCorpusTest,
)
for cls in clses:
suite.addTest(unittest.makeSuite(cls))
return suite
if __name__=='__main__':
sb_test_support.unittest_main(argv=sys.argv + ['suite'])
--- NEW FILE: test_FileCorpus.py ---
# Test the FileCorpus module.
import os
import sys
import time
import gzip
import errno
import unittest
import sb_test_support
sb_test_support.fix_sys_path()
from spambayes import storage
from spambayes.FileCorpus import ExpiryFileCorpus
from spambayes.FileCorpus import FileCorpus, FileMessage, GzipFileMessage
from spambayes.FileCorpus import FileMessageFactory, GzipFileMessageFactory
# We borrow the test messages that test_sb_server uses.
from test_sb_server import good1, spam1, malformed1
class _FactoryBaseTest(unittest.TestCase):
# Subclass must define a concrete factory.
factory = None
def test_create_no_content(self):
f = self.factory()
key = "testmessage"
directory = "fctesthamcorpus"
msg = f.create(key, directory)
self.assertEqual(msg.file_name, key)
self.assertEqual(msg.directory, directory)
self.assertEqual(msg.loaded, False)
def test_create_with_content(self):
f = self.factory()
key = "testmessage"
directory = "fctesthamcorpus"
content = good1
msg = f.create(key, directory, content=good1)
self.assertEqual(msg.file_name, key)
self.assertEqual(msg.directory, directory)
self.assertEqual(msg.loaded, True)
self.assertEqual(msg.as_string(), good1.replace("\n", "\r\n"))
class FileMessageFactoryTest(_FactoryBaseTest):
factory = FileMessageFactory
def test_klass(self):
self.assertEqual(self.factory.klass, FileMessage)
class GzipFileMessageFactoryTest(_FactoryBaseTest):
factory = GzipFileMessageFactory
def test_klass(self):
self.assertEqual(self.factory.klass, GzipFileMessage)
class _FileCorpusBaseTest(unittest.TestCase):
def _setUpDirectory(self, dirname):
try:
os.mkdir(dirname)
except OSError, e:
if e[0] != errno.EEXIST:
raise
def setUp(self):
# Make corpus directories.
self._setUpDirectory('fctestspamcorpus')
self._setUpDirectory('fctesthamcorpus')
self._setUpDirectory('fctestunsurecorpus')
def _tearDownDirectory(self, dirname):
try:
flist = os.listdir(dirname)
except OSError, e:
if e.errno != 3:
raise
else:
for filename in flist:
fn = os.path.join(dirname, filename)
os.unlink(fn)
try:
os.rmdir(dirname)
except OSError, e:
if e.errno != 2:
raise
def tearDown(self):
self._tearDownDirectory('fctestspamcorpus')
self._tearDownDirectory('fctesthamcorpus')
self._tearDownDirectory('fctestunsurecorpus')
try:
os.unlink('fctestmisc.bayes')
except OSError, e:
if e.errno != 2:
raise
try:
os.unlink('fctestclass.bayes')
except OSError, e:
if e.errno != 2:
raise
class _FileMessageBaseTest(_FileCorpusBaseTest):
# Subclass must define a concrete message klass, and wrong_klass.
klass = None
wrong_klass = None
def setUp(self):
_FileCorpusBaseTest.setUp(self)
self.filename = "testmessage"
self.directory = "fctestspamcorpus"
fn = os.path.join(self.directory, self.filename)
try:
os.remove(fn)
except OSError:
pass
f = open(fn, "w")
self.created_time = time.time()
f.write(spam1)
f.close()
self.msg = self.klass(self.filename, self.directory)
# Message of wrong type, to test mixed corpus.
self.wrongname = "wrongmessage"
def good_as_string():
return good1
wrong_msg = self.wrong_klass(self.wrongname, self.directory)
wrong_msg.as_string = good_as_string
wrong_msg.store()
def tearDown(self):
fn = os.path.join(self.directory, self.filename)
try:
os.remove(fn)
except OSError:
pass
fn = os.path.join(self.directory, self.wrongname)
try:
os.remove(fn)
except OSError:
pass
_FileCorpusBaseTest.tearDown(self)
def test___init__(self):
self.assertEqual(self.msg.file_name, self.filename)
self.assertEqual(self.msg.directory, self.directory)
self.assertEqual(self.msg.loaded, False)
def test_as_string(self):
self.assertEqual(self.msg.as_string(), spam1.replace("\n", "\r\n"))
def test_pathname(self):
self.assertEqual(self.msg.pathname(), os.path.join(self.directory,
self.filename))
def test_name(self):
self.assertEqual(self.msg.name(), self.filename)
def test_key(self):
self.assertEqual(self.msg.key(), self.filename)
def test_createTimestamp(self):
timestamp = self.msg.createTimestamp()
# As long as they are equal to the nearest second, that will do.
self.assertEqual(int(timestamp), int(self.created_time))
def test_remove(self):
pathname = os.path.join(self.directory, self.filename)
self.assertEqual(os.path.exists(pathname), True)
self.msg.remove()
self.assertEqual(os.path.exists(pathname), False)
def test_remove_not_there(self):
pathname = os.path.join(self.directory, self.filename)
self.assertEqual(os.path.exists(pathname), True)
os.remove(pathname)
self.msg.remove()
self.assertEqual(os.path.exists(pathname), False)
def test_load(self):
# Load correct type.
self.assertEqual(self.msg.loaded, False)
self.msg.load()
self.assertEqual(self.msg.loaded, True)
self.assertEqual(self.msg.as_string(), spam1.replace("\n", "\r\n"))
def test_load_wrong(self):
# Load incorrect type.
self.msg.file_name = self.wrongname
self.assertEqual(self.msg.loaded, False)
self.msg.load()
self.assertEqual(self.msg.loaded, True)
self.assertEqual(self.msg.as_string(), good1.replace("\n", "\r\n"))
def test_load_already_loaded(self):
# Shouldn't do anything if already loaded.
self.msg.file_name = None
self.msg.loaded = True
# This will raise an error if a load from storage is attempted.
self.msg.load()
class FileMessageTest(_FileMessageBaseTest):
klass = FileMessage
wrong_klass = GzipFileMessage
def test_store(self):
def good_as_string():
return good1
self.msg.as_string = good_as_string
self.msg.store()
pathname = os.path.join(self.directory, self.filename)
f = open(pathname)
content = f.read()
f.close()
self.assertEqual(content, good1)
class GzipFileMessageTest(_FileMessageBaseTest):
klass = GzipFileMessage
wrong_klass = FileMessage
def test_store(self):
def good_as_string():
return good1
self.msg.as_string = good_as_string
self.msg.store()
pathname = os.path.join(self.directory, self.filename)
f = gzip.open(pathname)
content = f.read()
f.close()
self.assertEqual(content, good1)
class FileCorpusTest(_FileCorpusBaseTest):
def setUp(self):
_FileCorpusBaseTest.setUp(self)
self.directory = 'fctesthamcorpus'
self.cache_size = 100
self.factory = FileMessageFactory()
self.stuff_corpus()
self.corpus = FileCorpus(self.factory, self.directory,
'?', self.cache_size)
def stuff_corpus(self):
"""Put messages in the corpus"""
i = 0
for content in [good1, spam1, malformed1]:
self.msg = self.factory.create(str(i), self.directory, content)
self.msg.store()
i += 1
# Put in a message that won't match the filter.
msg = self.factory.create("10", self.directory, good1)
msg.store()
def test___init__(self):
self.assertEqual(self.corpus.directory, self.directory)
self.assertEqual(self.corpus.filter, '?')
self.assertEqual(self.corpus.cacheSize, self.cache_size)
def test_filter(self):
self.assertEqual(len(self.corpus.msgs), 3)
# Try again, with all messages.
self.corpus = FileCorpus(self.factory, self.directory,
'*', self.cache_size)
self.assertEqual(len(self.corpus.msgs), 4)
def test_makeMessage_no_content(self):
key = "testmake"
self.corpus.makeMessage(key)
def test_makeMessage_with_content(self):
key = "testmake"
content = spam1
msg = self.corpus.makeMessage(key, content)
self.assertEqual(msg.key(), key)
self.assertEqual(msg.as_string(), content.replace("\n", "\r\n"))
def test_addMessage_invalid(self):
class msg(object):
def key(self):
return 'aa'
self.assertRaises(ValueError, self.corpus.addMessage, msg())
def test_addMessage(self):
msg = self.factory.create("9", 'fctestspamcorpus', good1)
self.corpus.addMessage(msg)
self.assertEqual(msg.directory, self.directory)
fn = os.path.join(self.directory, "9")
f = open(fn)
content = f.read()
f.close()
self.assertEqual(content, good1)
def test_removeMessage(self):
fn = self.msg.pathname()
self.assertEqual(os.path.exists(fn), True)
self.corpus.removeMessage(self.msg)
self.assertEqual(os.path.exists(fn), False)
class ExpiryFileCorpusTest(FileCorpusTest):
def setUp(self):
_FileCorpusBaseTest.setUp(self)
self.cache_size = 100
self.directory = 'fctesthamcorpus'
self.factory = FileMessageFactory()
self.stuff_corpus()
self.corpus = ExpiryFileCorpus(1.0, self.factory, self.directory,
'?', self.cache_size)
def suite():
suite = unittest.TestSuite()
clses = (FileMessageFactoryTest,
GzipFileMessageFactoryTest,
FileMessageTest,
GzipFileMessageTest,
FileCorpusTest,
ExpiryFileCorpusTest,
)
for cls in clses:
suite.addTest(unittest.makeSuite(cls))
return suite
if __name__=='__main__':
sb_test_support.unittest_main(argv=sys.argv + ['suite'])
More information about the Spambayes-checkins
mailing list