[Spambayes-checkins] spambayes/spambayes/test test_Corpus.py, NONE, 1.1 test_FileCorpus.py, NONE, 1.1

Tony Meyer anadelonbrin at users.sourceforge.net
Thu Jan 20 04:37:57 CET 2005


Update of /cvsroot/spambayes/spambayes/spambayes/test
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv1916/spambayes/test

Added Files:
	test_Corpus.py test_FileCorpus.py 
Log Message:
Add unit tests for Corpus and FileCorpus modules.

--- NEW FILE: test_Corpus.py ---
# Test the Corpus module.

import sys
import time
import unittest

import sb_test_support
sb_test_support.fix_sys_path()

from spambayes.Corpus import Corpus, ExpiryCorpus, MessageFactory

# We borrow the test messages that test_sb_server uses.
from test_sb_server import good1, spam1, malformed1

class simple_msg(object):
    def __init__(self, key):
        self._key = key
        self.creation_time = time.time()
        self.loaded = False
    def createTimestamp(self):
        return self.creation_time
    def key(self):
        return self._key
    def load(self):
        self.loaded = True

class simple_observer(object):
    # Just want to tell that they have been called, so raise particular
    # errors.
    def onAddMessage(self, msg, flags):
        raise ValueError()
    def onRemoveMessage(self, msg, flags):
        raise TypeError()

class CorpusTest(unittest.TestCase):
    def setUp(self):
        self.factory = MessageFactory()
        self.cacheSize = 100
        self.corpus = Corpus(self.factory, self.cacheSize)

    def test___init__(self):
        self.assertEqual(self.corpus.cacheSize, self.cacheSize)
        self.assertEqual(self.corpus.msgs, {})
        self.assertEqual(self.corpus.keysInMemory, [])
        self.assertEqual(self.corpus.observers, [])
        self.assertEqual(self.corpus.factory, self.factory)

    def test_addObserver(self):
        self.corpus.addObserver(simple_observer())
        self.assertRaises(ValueError, self.corpus.addMessage,
                          simple_msg(0))
        self.assertRaises(TypeError, self.corpus.removeMessage,
                          simple_msg(1))

    def test_addMessage(self):
        msg = simple_msg(0)
        self.assertEqual(self.corpus.get(0), None)
        self.corpus.addMessage(msg)
        self.assertEqual(self.corpus[0], msg)

    def test_removeMessage(self):
        msg = simple_msg(0)
        self.assertEqual(self.corpus.get(0), None)
        self.corpus.addMessage(msg)
        self.assertEqual(self.corpus[0], msg)
        self.corpus.removeMessage(msg)
        self.assertEqual(self.corpus.get(0), None)

    def test_cacheMessage(self):
        msg = simple_msg(0)
        self.corpus.cacheMessage(msg)
        self.assertEqual(self.corpus.msgs[0], msg)
        self.assert_(0 in self.corpus.keysInMemory)

    def test_flush_cache(self):
        self.corpus.cacheSize = 1
        msg = simple_msg(0)
        self.corpus.cacheMessage(msg)
        self.assertEqual(self.corpus.msgs[0], msg)
        self.assert_(0 in self.corpus.keysInMemory)
        msg = simple_msg(1)
        self.corpus.cacheMessage(msg)
        self.assertEqual(self.corpus.msgs[1], msg)
        self.assert_(1 in self.corpus.keysInMemory)
        self.assert_(0 not in self.corpus.keysInMemory)

    def test_unCacheMessage(self):
        msg = simple_msg(0)
        self.corpus.cacheMessage(msg)
        self.assertEqual(self.corpus.msgs[0], msg)
        self.assert_(0 in self.corpus.keysInMemory)
        self.corpus.unCacheMessage(msg)
        self.assert_(0 in self.corpus.keysInMemory)

    def test_takeMessage(self):
        other_corpus = Corpus(self.factory, self.cacheSize)
        msg = simple_msg(0)
        other_corpus.addMessage(msg)
        self.assertEqual(self.corpus.get(0), None)
        self.corpus.takeMessage(0, other_corpus)
        self.assertEqual(msg.loaded, True)
        self.assertEqual(other_corpus.get(0), None)
        self.assertEqual(self.corpus.get(0), msg)

    def test_get(self):
        ids = [0, 1, 2]
        for id in ids:
            self.corpus.addMessage(simple_msg(id))
        self.assertEqual(self.corpus.get(0).key(), 0)

    def test_get_fail(self):
        ids = [0, 1, 2]
        for id in ids:
            self.corpus.addMessage(simple_msg(id))
        self.assertEqual(self.corpus.get(4), None)

    def test_get_default(self):
        ids = [0, 1, 2]
        for id in ids:
            self.corpus.addMessage(simple_msg(id))
        self.assertEqual(self.corpus.get(4, "test"), "test")

    def test___getitem__(self):
        ids = [0, 1, 2]
        for id in ids:
            self.corpus.addMessage(simple_msg(id))
        self.assertEqual(self.corpus[0].key(), 0)

    def test___getitem___fail(self):
        ids = [0, 1, 2]
        for id in ids:
            self.corpus.addMessage(simple_msg(id))
        self.assertRaises(NotImplementedError, self.corpus.__getitem__, 4)

    def test_keys(self):
        self.assertEqual(self.corpus.keys(), [])
        ids = [0, 1, 2]
        for id in ids:
            self.corpus.addMessage(simple_msg(id))
        self.assertEqual(self.corpus.keys(), ids)

    def test___iter__(self):
        self.assertEqual(tuple(self.corpus), ())
        msgs = (simple_msg(0), simple_msg(1), simple_msg(2))
        for msg in msgs:
            self.corpus.addMessage(msg)
        self.assertEqual(tuple(self.corpus), msgs)

    def test_makeMessage_no_content(self):
        key = "testmessage"
        self.assertRaises(NotImplementedError, self.corpus.makeMessage, key)

    def test_makeMessage_with_content(self):
        key = "testmessage"
        content = good1
        self.assertRaises(NotImplementedError, self.corpus.makeMessage,
                          key, content)


class ExpiryCorpusTest(unittest.TestCase):
    def setUp(self):
        class Mixed(Corpus, ExpiryCorpus):
            def __init__(self, expireBefore, factory, cacheSize):
                Corpus.__init__(self, factory, cacheSize)
                ExpiryCorpus.__init__(self, expireBefore)
        self.factory = MessageFactory()
        self.cacheSize = 100
        self.expireBefore = 10.0
        self.corpus = Mixed(self.expireBefore, self.factory,
                            self.cacheSize)

    def test___init___expiry(self):
        self.assertEqual(self.corpus.expireBefore, self.expireBefore)

    def test_removeExpiredMessages(self):
        # Put messages in to expire.
        expire = [simple_msg(1), simple_msg(2)]
        for msg in expire:
            self.corpus.addMessage(msg)

        # Ensure that we don't expire the wrong ones.
        self.corpus.expireBefore = 0.25
        time.sleep(0.5)

        # Put messages in to not expire.
        not_expire = [simple_msg(3), simple_msg(4)]
        for msg in not_expire:
            self.corpus.addMessage(msg)

        # Run expiry.
        self.corpus.removeExpiredMessages()

        # Check that expired messages are gone.
        for msg in expire:
            self.assertEqual(msg in self.corpus, False)

        # Check that not expired messages are still there.
        for msg in not_expire:
            self.assertEqual(msg in self.corpus, True)
        

def suite():
    suite = unittest.TestSuite()
    clses = (CorpusTest,
             ExpiryCorpusTest,
             )
    for cls in clses:
        suite.addTest(unittest.makeSuite(cls))
    return suite


if __name__=='__main__':
    sb_test_support.unittest_main(argv=sys.argv + ['suite'])

--- NEW FILE: test_FileCorpus.py ---
# Test the FileCorpus module.

import os
import sys
import time
import gzip
import errno
import unittest

import sb_test_support
sb_test_support.fix_sys_path()

from spambayes import storage
from spambayes.FileCorpus import ExpiryFileCorpus
from spambayes.FileCorpus import FileCorpus, FileMessage, GzipFileMessage
from spambayes.FileCorpus import FileMessageFactory, GzipFileMessageFactory

# We borrow the test messages that test_sb_server uses.
from test_sb_server import good1, spam1, malformed1

class _FactoryBaseTest(unittest.TestCase):
    # Subclass must define a concrete factory.
    factory = None
    def test_create_no_content(self):
        f = self.factory()
        key = "testmessage"
        directory = "fctesthamcorpus"
        msg = f.create(key, directory)
        self.assertEqual(msg.file_name, key)
        self.assertEqual(msg.directory, directory)
        self.assertEqual(msg.loaded, False)

    def test_create_with_content(self):
        f = self.factory()
        key = "testmessage"
        directory = "fctesthamcorpus"
        content = good1
        msg = f.create(key, directory, content=good1)
        self.assertEqual(msg.file_name, key)
        self.assertEqual(msg.directory, directory)
        self.assertEqual(msg.loaded, True)
        self.assertEqual(msg.as_string(), good1.replace("\n", "\r\n"))


class FileMessageFactoryTest(_FactoryBaseTest):
    factory = FileMessageFactory
    def test_klass(self):
        self.assertEqual(self.factory.klass, FileMessage)


class GzipFileMessageFactoryTest(_FactoryBaseTest):
    factory = GzipFileMessageFactory
    def test_klass(self):
        self.assertEqual(self.factory.klass, GzipFileMessage)


class _FileCorpusBaseTest(unittest.TestCase):
    def _setUpDirectory(self, dirname):
        try:
            os.mkdir(dirname)
        except OSError, e:
            if e[0] != errno.EEXIST:
                raise

    def setUp(self):
        # Make corpus directories.
        self._setUpDirectory('fctestspamcorpus')
        self._setUpDirectory('fctesthamcorpus')
        self._setUpDirectory('fctestunsurecorpus')

    def _tearDownDirectory(self, dirname):
        try:
            flist = os.listdir(dirname)
        except OSError, e:
            if e.errno != 3:
                raise
        else:
            for filename in flist:
                fn = os.path.join(dirname, filename)
                os.unlink(fn)
        try:
            os.rmdir(dirname)
        except OSError, e:
            if e.errno != 2:
                raise

    def tearDown(self):
        self._tearDownDirectory('fctestspamcorpus')
        self._tearDownDirectory('fctesthamcorpus')
        self._tearDownDirectory('fctestunsurecorpus')

        try:
            os.unlink('fctestmisc.bayes')
        except OSError, e:
            if e.errno != 2:
                raise
        try:
            os.unlink('fctestclass.bayes')
        except OSError, e:
            if e.errno != 2:
                raise


class _FileMessageBaseTest(_FileCorpusBaseTest):
    # Subclass must define a concrete message klass, and wrong_klass.
    klass = None
    wrong_klass = None

    def setUp(self):
        _FileCorpusBaseTest.setUp(self)
        self.filename = "testmessage"
        self.directory = "fctestspamcorpus"
        fn = os.path.join(self.directory, self.filename)
        try:
            os.remove(fn)
        except OSError:
            pass
        f = open(fn, "w")
        self.created_time = time.time()
        f.write(spam1)
        f.close()
        self.msg = self.klass(self.filename, self.directory)
        # Message of wrong type, to test mixed corpus.
        self.wrongname = "wrongmessage"
        def good_as_string():
            return good1
        wrong_msg = self.wrong_klass(self.wrongname, self.directory)
        wrong_msg.as_string = good_as_string
        wrong_msg.store()

    def tearDown(self):
        fn = os.path.join(self.directory, self.filename)
        try:
            os.remove(fn)
        except OSError:
            pass
            fn = os.path.join(self.directory, self.wrongname)
        try:
            os.remove(fn)
        except OSError:
            pass
        _FileCorpusBaseTest.tearDown(self)

    def test___init__(self):
        self.assertEqual(self.msg.file_name, self.filename)
        self.assertEqual(self.msg.directory, self.directory)
        self.assertEqual(self.msg.loaded, False)

    def test_as_string(self):
        self.assertEqual(self.msg.as_string(), spam1.replace("\n", "\r\n"))

    def test_pathname(self):
        self.assertEqual(self.msg.pathname(), os.path.join(self.directory,
                                                           self.filename))

    def test_name(self):
        self.assertEqual(self.msg.name(), self.filename)

    def test_key(self):
        self.assertEqual(self.msg.key(), self.filename)

    def test_createTimestamp(self):
        timestamp = self.msg.createTimestamp()
        # As long as they are equal to the nearest second, that will do.
        self.assertEqual(int(timestamp), int(self.created_time))

    def test_remove(self):
        pathname = os.path.join(self.directory, self.filename)
        self.assertEqual(os.path.exists(pathname), True)
        self.msg.remove()
        self.assertEqual(os.path.exists(pathname), False)

    def test_remove_not_there(self):
        pathname = os.path.join(self.directory, self.filename)
        self.assertEqual(os.path.exists(pathname), True)
        os.remove(pathname)
        self.msg.remove()
        self.assertEqual(os.path.exists(pathname), False)

    def test_load(self):
        # Load correct type.
        self.assertEqual(self.msg.loaded, False)
        self.msg.load()
        self.assertEqual(self.msg.loaded, True)
        self.assertEqual(self.msg.as_string(), spam1.replace("\n", "\r\n"))

    def test_load_wrong(self):
        # Load incorrect type.
        self.msg.file_name = self.wrongname
        self.assertEqual(self.msg.loaded, False)
        self.msg.load()
        self.assertEqual(self.msg.loaded, True)
        self.assertEqual(self.msg.as_string(), good1.replace("\n", "\r\n"))

    def test_load_already_loaded(self):
        # Shouldn't do anything if already loaded.
        self.msg.file_name = None
        self.msg.loaded = True
        # This will raise an error if a load from storage is attempted.
        self.msg.load()


class FileMessageTest(_FileMessageBaseTest):
    klass = FileMessage
    wrong_klass = GzipFileMessage

    def test_store(self):
        def good_as_string():
            return good1
        self.msg.as_string = good_as_string
        self.msg.store()
        pathname = os.path.join(self.directory, self.filename)
        f = open(pathname)
        content = f.read()
        f.close()
        self.assertEqual(content, good1)


class GzipFileMessageTest(_FileMessageBaseTest):
    klass = GzipFileMessage
    wrong_klass = FileMessage

    def test_store(self):
        def good_as_string():
            return good1
        self.msg.as_string = good_as_string
        self.msg.store()
        pathname = os.path.join(self.directory, self.filename)
        f = gzip.open(pathname)
        content = f.read()
        f.close()
        self.assertEqual(content, good1)


class FileCorpusTest(_FileCorpusBaseTest):
    def setUp(self):
        _FileCorpusBaseTest.setUp(self)
        self.directory = 'fctesthamcorpus'
        self.cache_size = 100
        self.factory = FileMessageFactory()
        self.stuff_corpus()
        self.corpus = FileCorpus(self.factory, self.directory,
                                 '?', self.cache_size)

    def stuff_corpus(self):
        """Put messages in the corpus"""
        i = 0
        for content in [good1, spam1, malformed1]:
            self.msg = self.factory.create(str(i), self.directory, content)
            self.msg.store()
            i += 1

        # Put in a message that won't match the filter.
        msg = self.factory.create("10", self.directory, good1)
        msg.store()
        
    def test___init__(self):
        self.assertEqual(self.corpus.directory, self.directory)
        self.assertEqual(self.corpus.filter, '?')
        self.assertEqual(self.corpus.cacheSize, self.cache_size)

    def test_filter(self):
        self.assertEqual(len(self.corpus.msgs), 3)
        # Try again, with all messages.
        self.corpus = FileCorpus(self.factory, self.directory,
                                 '*', self.cache_size)
        self.assertEqual(len(self.corpus.msgs), 4)

    def test_makeMessage_no_content(self):
        key = "testmake"
        self.corpus.makeMessage(key)

    def test_makeMessage_with_content(self):
        key = "testmake"
        content = spam1
        msg = self.corpus.makeMessage(key, content)
        self.assertEqual(msg.key(), key)
        self.assertEqual(msg.as_string(), content.replace("\n", "\r\n"))

    def test_addMessage_invalid(self):
        class msg(object):
            def key(self):
                return 'aa'
        self.assertRaises(ValueError, self.corpus.addMessage, msg())

    def test_addMessage(self):
        msg = self.factory.create("9", 'fctestspamcorpus', good1)
        self.corpus.addMessage(msg)
        self.assertEqual(msg.directory, self.directory)
        fn = os.path.join(self.directory, "9")
        f = open(fn)
        content = f.read()
        f.close()
        self.assertEqual(content, good1)

    def test_removeMessage(self):
        fn = self.msg.pathname()
        self.assertEqual(os.path.exists(fn), True)
        self.corpus.removeMessage(self.msg)
        self.assertEqual(os.path.exists(fn), False)


class ExpiryFileCorpusTest(FileCorpusTest):
    def setUp(self):
        _FileCorpusBaseTest.setUp(self)
        self.cache_size = 100
        self.directory = 'fctesthamcorpus'
        self.factory = FileMessageFactory()
        self.stuff_corpus()
        self.corpus = ExpiryFileCorpus(1.0, self.factory, self.directory,
                                       '?', self.cache_size)


def suite():
    suite = unittest.TestSuite()
    clses = (FileMessageFactoryTest,
             GzipFileMessageFactoryTest,
             FileMessageTest,
             GzipFileMessageTest,
             FileCorpusTest,
             ExpiryFileCorpusTest,
             )
    for cls in clses:
        suite.addTest(unittest.makeSuite(cls))
    return suite


if __name__=='__main__':
    sb_test_support.unittest_main(argv=sys.argv + ['suite'])



More information about the Spambayes-checkins mailing list