[Spambayes-checkins] spambayes/spambayes/test test_message.py, NONE, 1.6.2.1

Tony Meyer anadelonbrin at users.sourceforge.net
Thu Jan 13 23:00:04 CET 2005


Update of /cvsroot/spambayes/spambayes/spambayes/test
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv12575/spambayes/test

Added Files:
      Tag: release_1_0-branch
	test_message.py 
Log Message:
Backport unittests for spambayes.message (modified for 1.0.x)

--- NEW FILE: test_message.py ---
# Test spambayes.message module.

import os
import sys
import math
import time
import email
import unittest

import sb_test_support
sb_test_support.fix_sys_path()

from spambayes.Options import options
from spambayes.tokenizer import tokenize
from spambayes.classifier import Classifier
from spambayes.message import MessageInfoDB, msginfoDB
from spambayes.message import Message, SBHeaderMessage, MessageInfoPickle

# We borrow the test messages that test_sb_server uses.
# I doubt it really makes much difference, but if we wanted more than
# one message of each type (the tests should all handle this ok) then
# Richie's hammer.py script has code for generating any number of
# randomly composed email messages.
from test_sb_server import good1, spam1, malformed1

try:
    __file__
except NameError:
    # Python 2.2 compatibility.
    __file__ = sys.argv[0]

TEMP_PICKLE_NAME = os.path.join(os.path.dirname(__file__), "temp.pik")
TEMP_DBM_NAME = os.path.join(os.path.dirname(__file__), "temp.dbm")
# The chances of anyone having files with these names in the test
# directory is minute, but we don't want to wipe anything, so make
# sure that they don't already exist.  Our tearDown code gets rid
# of our copies (whether the tests pass or fail) so they shouldn't
# be ours.
for fn in [TEMP_PICKLE_NAME, TEMP_DBM_NAME]:
    if os.path.exists(fn):
        print fn, "already exists.  Please remove this file before " \
              "running these tests (a file by that name will be " \
              "created and destroyed as part of the tests)."
        sys.exit(1)

class MessageTest(unittest.TestCase):
    def setUp(self):
        self.msg = email.message_from_string(spam1, _class=Message)

    def test_persistent_state(self):
        self.assertEqual(self.msg.stored_attributes, ['c', 't'])

    def test_initialisation(self):
        self.assertEqual(self.msg.id, None)
        self.assertEqual(self.msg.c, None)
        self.assertEqual(self.msg.t, None)

    def test_setId(self):
        # Verify that you can't change the id.
        self.msg.id = "test"
        self.assertRaises(ValueError, self.msg.setId, "test2")

        # Verify that you can't set the id to None.
        self.msg.id = None
        self.assertRaises(ValueError, self.msg.setId, None)

        # Verify that id must be a string.
        self.assertRaises(TypeError, self.msg.setId, 1)
        self.assertRaises(TypeError, self.msg.setId, False)
        self.assertRaises(TypeError, self.msg.setId, [])

        id = "Test"
        self.msg.setId(id)
        self.assertEqual(self.msg.id, id)

        # Check info db _getState is called.
        self.msg.id = None
        saved = msginfoDB._getState
        self.done = False
        try:
            msginfoDB._getState = self._fake_setState
            self.msg.setId(id)
            self.assertEqual(self.done, True)
        finally:
            msginfoDB._getState = saved

    def test_getId(self):
        self.assertEqual(self.msg.getId(), None)
        id = "test"
        self.msg.id = id
        self.assertEqual(self.msg.getId(), id)

    def test_tokenize(self):
        toks = self.msg.tokenize()
        self.assertEqual(tuple(tokenize(spam1)), tuple(toks))

    def test_force_CRLF(self):
        self.assert_('\r' not in good1)
        lines = self.msg._force_CRLF(good1).split('\n')
        for line in lines:
            if line:
                self.assert_(line.endswith('\r'))

    def test_as_string_endings(self):
        self.assert_('\r' not in spam1)
        lines = self.msg.as_string().split('\n')
        for line in lines:
            if line:
                self.assert_(line.endswith('\r'))

    def _fake_setState(self, state):
        self.done = True
        
    def test_modified(self):
        saved = msginfoDB._setState
        try:
            msginfoDB._setState = self._fake_setState
            self.done = False
            self.msg.modified()
            self.assertEqual(self.done, False)
            self.msg.id = "Test"
            self.msg.modified()
            self.assertEqual(self.done, True)
        finally:
            msginfoDB._setState = saved

    def test_GetClassification(self):
        self.msg.c = 's'
        self.assertEqual(self.msg.GetClassification(),
                         options['Headers','header_spam_string'])
        self.msg.c = 'h'
        self.assertEqual(self.msg.GetClassification(),
                         options['Headers','header_ham_string'])
        self.msg.c = 'u'
        self.assertEqual(self.msg.GetClassification(),
                         options['Headers','header_unsure_string'])
        self.msg.c = 'a'
        self.assertEqual(self.msg.GetClassification(), None)

    def test_RememberClassification(self):
        self.msg.RememberClassification(options['Headers',
                                                'header_spam_string'])
        self.assertEqual(self.msg.c, 's')
        self.msg.RememberClassification(options['Headers',
                                                'header_ham_string'])
        self.assertEqual(self.msg.c, 'h')
        self.msg.RememberClassification(options['Headers',
                                                'header_unsure_string'])
        self.assertEqual(self.msg.c, 'u')
        self.assertRaises(ValueError, self.msg.RememberClassification, "a")

        # Check that self.msg.modified is called.
        saved = self.msg.modified
        self.done = False
        try:
            self.msg.modified = self._fake_modified
            self.msg.RememberClassification(options['Headers',
                                                    'header_unsure_string'])
            self.assertEqual(self.done, True)
        finally:
            self.msg.modified = saved

    def _fake_modified(self):
        self.done = True

    def test_GetAndRememberTrained(self):
        t = "test"
        saved = self.msg.modified
        self.done = False
        try:
            self.msg.modified = self._fake_modified
            self.msg.RememberTrained(t)
            self.assertEqual(self.done, True)
        finally:
            self.msg.modified = saved
        self.assertEqual(self.msg.GetTrained(), t)


class SBHeaderMessageTest(unittest.TestCase):
    def setUp(self):
        self.msg = email.message_from_string(spam1, _class=SBHeaderMessage)
        # Get a prob and some clues.
        c = Classifier()
        self.u_prob, clues = c.spamprob(tokenize(good1), True)
        c.learn(tokenize(good1), False)
        self.g_prob, clues = c.spamprob(tokenize(good1), True)
        c.unlearn(tokenize(good1), False)
        c.learn(tokenize(spam1), True)
        self.s_prob, self.clues = c.spamprob(tokenize(spam1), True)
        self.ham = options['Headers','header_ham_string']
        self.spam = options['Headers','header_spam_string']
        self.unsure = options['Headers','header_unsure_string']
        self.to = "tony.meyer at gmail.com;ta-meyer at ihug.co.nz"
        self.msg["to"] = self.to

    def test_setIdFromPayload(self):
        id = self.msg.setIdFromPayload()
        self.assertEqual(id, None)
        self.assertEqual(self.msg.id, None)
        msgid = "test"
        msg = "".join((options['Headers','mailid_header_name'], ": ",
                       msgid, "\r\n", good1))
        msg = email.message_from_string(msg, _class=SBHeaderMessage)
        id = msg.setIdFromPayload()
        self.assertEqual(id, msgid)
        self.assertEqual(msg.id, msgid)

    def test_disposition_header_ham(self):
        name = options['Headers','classification_header_name']
        self.msg.addSBHeaders(self.g_prob, self.clues)
        self.assertEqual(self.msg[name], self.ham)
        self.assertEqual(self.msg.GetClassification(), self.ham)

    def test_disposition_header_spam(self):
        name = options['Headers','classification_header_name']
        self.msg.addSBHeaders(self.s_prob, self.clues)
        self.assertEqual(self.msg[name], self.spam)
        self.assertEqual(self.msg.GetClassification(), self.spam)

    def test_disposition_header_unsure(self):
        name = options['Headers','classification_header_name']
        self.msg.addSBHeaders(self.u_prob, self.clues)
        self.assertEqual(self.msg[name], self.unsure)
        self.assertEqual(self.msg.GetClassification(), self.unsure)

    def test_score_header_off(self):
        options['Headers','include_score'] = False
        self.msg.addSBHeaders(self.g_prob, self.clues)
        self.assertEqual(self.msg[options['Headers', 'score_header_name']],
                         None)

    def test_score_header(self):
        options['Headers','include_score'] = True
        options["Headers", "header_score_digits"] = 21
        options["Headers", "header_score_logarithm"] = False
        self.msg.addSBHeaders(self.g_prob, self.clues)
        self.assertEqual(self.msg[options['Headers', 'score_header_name']],
                         "%.21f" % (self.g_prob,))

    def test_score_header_log(self):
        options['Headers','include_score'] = True
        options["Headers", "header_score_digits"] = 21
        options["Headers", "header_score_logarithm"] = True
        self.msg.addSBHeaders(self.s_prob, self.clues)
        self.assert_(self.msg[options['Headers', 'score_header_name']].\
                     startswith("%.21f" % (self.s_prob,)))
        self.assert_(self.msg[options['Headers', 'score_header_name']].\
                     endswith(" (%d)" % (-math.log10(1.0-self.s_prob),)))

    def test_thermostat_header_off(self):
        options['Headers','include_thermostat'] = False
        self.msg.addSBHeaders(self.u_prob, self.clues)
        self.assertEqual(self.msg[options['Headers',
                                          'thermostat_header_name']], None)

    def test_thermostat_header_unsure(self):
        options['Headers','include_thermostat'] = True
        self.msg.addSBHeaders(self.u_prob, self.clues)
        self.assertEqual(self.msg[options['Headers',
                                          'thermostat_header_name']],
                         "*****")

    def test_thermostat_header_spam(self):
        options['Headers','include_thermostat'] = True
        self.msg.addSBHeaders(self.s_prob, self.clues)
        self.assertEqual(self.msg[options['Headers',
                                          'thermostat_header_name']],
                         "*********")

    def test_thermostat_header_ham(self):
        options['Headers','include_thermostat'] = True
        self.msg.addSBHeaders(self.g_prob, self.clues)
        self.assertEqual(self.msg[options['Headers',
                                          'thermostat_header_name']], "")

    def test_evidence_header(self):
        options['Headers', 'include_evidence'] = True
        options['Headers', 'clue_mailheader_cutoff'] = 0.5 # all
        self.msg.addSBHeaders(self.g_prob, self.clues)
        header = self.msg[options['Headers', 'evidence_header_name']]
        header_clues = [s.split(':') for s in \
                        [s.strip() for s in header.split(';')]]
        header_clues = dict([(":".join(clue[:-1])[1:-1], float(clue[-1])) \
                             for clue in header_clues])
        for word, score in self.clues:
            self.assert_(word in header_clues)
            self.assertEqual(round(score, 2), header_clues[word])

    def test_evidence_header_partial(self):
        options['Headers', 'include_evidence'] = True
        options['Headers', 'clue_mailheader_cutoff'] = 0.1
        self.msg.addSBHeaders(self.g_prob, self.clues)
        header = self.msg[options['Headers', 'evidence_header_name']]
        header_clues = [s.split(':') for s in \
                        [s.strip() for s in header.split(';')]]
        header_clues = dict([(":".join(clue[:-1])[1:-1], float(clue[-1])) \
                             for clue in header_clues])
        for word, score in self.clues:
            if score <= 0.1 or score >= 0.9:
                self.assert_(word in header_clues)
                self.assertEqual(round(score, 2), header_clues[word])
            else:
                self.assert_(word not in header_clues)

    def test_evidence_header_empty(self):
        options['Headers', 'include_evidence'] = True
        options['Headers', 'clue_mailheader_cutoff'] = 0.0
        self.msg.addSBHeaders(self.g_prob, self.clues)
        header = self.msg[options['Headers','evidence_header_name']]
        header_clues = [s.split(':') for s in \
                        [s.strip() for s in header.split(';')]]
        header_clues = dict([(":".join(clue[:-1])[1:-1], float(clue[-1])) \
                             for clue in header_clues])
        for word, score in self.clues:
            if word == "*H*" or word == "*S*":
                self.assert_(word in header_clues)
                self.assertEqual(round(score, 2), header_clues[word])
            else:
                self.assert_(word not in header_clues)

    def test_evidence_header_off(self):
        options['Headers', 'include_evidence'] = False
        self.msg.addSBHeaders(self.g_prob, self.clues)
        self.assertEqual(self.msg[options['Headers',
                                          'evidence_header_name']], None)

    def test_notate_to_off(self):
        options["Headers", "notate_to"] = ()
        self.msg.addSBHeaders(self.g_prob, self.clues)
        self.msg.addSBHeaders(self.u_prob, self.clues)
        self.msg.addSBHeaders(self.s_prob, self.clues)
        self.assertEqual(self.msg["To"], self.to)

    def test_notate_to_ham(self):
        options["Headers", "notate_to"] = (self.ham,)
        self.msg.addSBHeaders(self.g_prob, self.clues)
        disp, orig = self.msg["To"].split(',', 1)
        self.assertEqual(orig, self.to)
        self.assertEqual(disp, self.ham)

    def test_notate_to_unsure(self):
        options["Headers", "notate_to"] = (self.ham, self.unsure)
        self.msg.addSBHeaders(self.u_prob, self.clues)
        disp, orig = self.msg["To"].split(',', 1)
        self.assertEqual(orig, self.to)
        self.assertEqual(disp, self.unsure)

    def test_notate_to_spam(self):
        options["Headers", "notate_to"] = (self.ham, self.spam, self.unsure)
        self.msg.addSBHeaders(self.s_prob, self.clues)
        disp, orig = self.msg["To"].split(',', 1)
        self.assertEqual(orig, self.to)
        self.assertEqual(disp, self.spam)

    def test_notate_subject_off(self):
        subject = self.msg["Subject"]
        options["Headers", "notate_subject"] = ()
        self.msg.addSBHeaders(self.g_prob, self.clues)
        self.msg.addSBHeaders(self.u_prob, self.clues)
        self.msg.addSBHeaders(self.s_prob, self.clues)
        self.assertEqual(self.msg["Subject"], subject)

    def test_notate_subject_ham(self):        
        subject = self.msg["Subject"]
        options["Headers", "notate_subject"] = (self.ham,)
        self.msg.addSBHeaders(self.g_prob, self.clues)
        disp, orig = self.msg["Subject"].split(',', 1)
        self.assertEqual(orig, subject)
        self.assertEqual(disp, self.ham)

    def test_notate_subject_unsure(self):
        subject = self.msg["Subject"]
        options["Headers", "notate_subject"] = (self.ham, self.unsure)
        self.msg.addSBHeaders(self.u_prob, self.clues)
        disp, orig = self.msg["Subject"].split(',', 1)
        self.assertEqual(orig, subject)
        self.assertEqual(disp, self.unsure)

    def test_notate_subject_spam(self):
        subject = self.msg["Subject"]
        options["Headers", "notate_subject"] = (self.ham, self.spam,
                                                self.unsure)
        self.msg.addSBHeaders(self.s_prob, self.clues)
        disp, orig = self.msg["Subject"].split(',', 1)
        self.assertEqual(orig, subject)
        self.assertEqual(disp, self.spam)

    def test_notate_to_changed(self):
        saved_ham = options["Headers", "header_ham_string"]
        notate_to = options.get_option("Headers", "notate_to")
        saved_to = notate_to.allowed_values
        try:
            options["Headers", "header_ham_string"] = "bacon"
            header_strings = (options["Headers", "header_ham_string"],
                              options["Headers", "header_spam_string"],
                              options["Headers", "header_unsure_string"])
            notate_to = options.get_option("Headers", "notate_to")
            notate_to.allowed_values = header_strings
            self.ham = options["Headers", "header_ham_string"]
            result = self.test_notate_to_ham()
            # Just be sure that it's using the new value.
            self.assertEqual(self.msg["To"].split(',', 1)[0],
                             "bacon")
        finally:
            # If we leave these changed, then lots of other tests will
            # fail.
            options["Headers", "header_ham_string"] = saved_ham
            self.ham = saved_ham
            notate_to.allowed_values = saved_to
        return result

    def test_id_header(self):
        options['Headers','add_unique_id'] = True
        id = "test"
        self.msg.id = id
        self.msg.addSBHeaders(self.g_prob, self.clues)
        self.assertEqual(self.msg[options['Headers',
                                          'mailid_header_name']], id)

    def test_id_header_off(self):
        options['Headers','add_unique_id'] = False
        id = "test"
        self.msg.id = id
        self.msg.addSBHeaders(self.g_prob, self.clues)
        self.assertEqual(self.msg[options['Headers',
                                          'mailid_header_name']], None)

    def test_currentSBHeaders(self):
        sbheaders = self.msg.currentSBHeaders()
        self.assertEqual({}, sbheaders)
        headers = {options['Headers', 'classification_header_name'] : '1',
                   options['Headers', 'mailid_header_name'] : '2',
                   options['Headers',
                           'classification_header_name'] + "-ID" : '3',
                   options['Headers', 'thermostat_header_name'] : '4',
                   options['Headers', 'evidence_header_name'] : '5',
                   options['Headers', 'score_header_name'] : '6',
                   options['Headers', 'trained_header_name'] : '7',
                   }
        for name, val in headers.items():
            self.msg[name] = val
        sbheaders = self.msg.currentSBHeaders()
        self.assertEqual(headers, sbheaders)

    def test_delSBHeaders(self):
        headers = (options['Headers', 'classification_header_name'],
                   options['Headers', 'mailid_header_name'],
                   options['Headers',
                           'classification_header_name'] + "-ID",
                   options['Headers', 'thermostat_header_name'],
                   options['Headers', 'evidence_header_name'],
                   options['Headers', 'score_header_name'],
                   options['Headers', 'trained_header_name'],)
        for header in headers:
            self.msg[header] = "test"
        for header in headers:
            self.assert_(header in self.msg.keys())
        self.msg.delSBHeaders()
        for header in headers:
            self.assert_(header not in self.msg.keys())


class MessageInfoBaseTest(unittest.TestCase):
    def setUp(self, fn=TEMP_PICKLE_NAME):
        self.db = self.klass(fn, self.mode)

    def test_mode(self):
        self.assertEqual(self.mode, self.db.mode)

    def test__getState_missing(self):
        msg = email.message_from_string(good1, _class=Message)
        msg.id = "Test"
        dummy_values = "a", "b"
        msg.c, msg.t = dummy_values
        self.db._getState(msg)
        self.assertEqual((msg.c, msg.t), dummy_values)

    def test__getState_compat(self):
        msg = email.message_from_string(good1, _class=Message)
        msg.id = "Test"
        dummy_values = "a", "b"
        self.db.db[msg.id] = dummy_values
        self.db._getState(msg)
        self.assertEqual((msg.c, msg.t), dummy_values)

    def test__getState(self):
        msg = email.message_from_string(good1, _class=Message)
        msg.id = "Test"
        dummy_values = [('a', 1), ('b', 2)]
        self.db.db[msg.id] = dummy_values
        self.db._getState(msg)
        for att, val in dummy_values:
            self.assertEqual(getattr(msg, att), val)

    def test__setState(self):
        msg = email.message_from_string(good1, _class=Message)
        msg.id = "Test"

        saved = self.db.store
        self.done = False
        try:
            self.db.store = self._fake_store
            self.db._setState(msg)
        finally:
            self.db.store = saved
        self.assertEqual(self.done, True)
        correct = [(att, getattr(msg, att)) \
                   for att in msg.stored_attributes]
        db_version = dict(self.db.db[msg.id])
        correct_version = dict(correct)
        self.assertEqual(db_version, correct_version)

    def _fake_store(self):
        self.done = True
        
    def test__delState(self):
        msg = email.message_from_string(good1, _class=Message)
        msg.id = "Test"
        self.db.db[msg.id] = "test"
        saved = self.db.store
        self.done = False
        try:
            self.db.store = self._fake_store
            self.db._delState(msg)
        finally:
            self.db.store = saved
        self.assertEqual(self.done, True)
        self.assertRaises(KeyError, self.db.db.__getitem__, msg.id)
        
    def test_load(self):
        # Create a db to try and load.
        data = {"1" : ('a', 'b', 'c'),
                "2" : ('d', 'e', 'f'),
                "3" : "test"}
        for k, v in data.items():
            self.db.db[k] = v
        self.db.store()
        fn = self.db.db_name
        self.db.close()
        db2 = self.klass(fn, self.mode)
        try:
            self.assertEqual(len(db2.db.keys()), len(data.keys()))
            for k, v in data.items():
                self.assertEqual(db2.db[k], v)
        finally:
            db2.close()

    def test_load_new(self):
        # Load from a non-existing db (i.e. create new).
        self.assertEqual(self.db.db.keys(), [])


class MessageInfoPickleTest(MessageInfoBaseTest):
    def setUp(self):
        self.mode = 1
        self.klass = MessageInfoPickle
        MessageInfoBaseTest.setUp(self, TEMP_PICKLE_NAME)

    def tearDown(self):
        try:
            os.remove(TEMP_PICKLE_NAME)
        except OSError:
            pass

    def store(self):
        if self.db is not None:
            self.db.sync()


class MessageInfoDBTest(MessageInfoBaseTest):
    def setUp(self):
        self.mode = 'c'
        self.klass = MessageInfoDB
        MessageInfoBaseTest.setUp(self, TEMP_DBM_NAME)

    def tearDown(self):
        self.db.close()
        try:
            os.remove(TEMP_DBM_NAME)
        except OSError:
            pass

    def store(self):
        if self.db is not None:
            self.db.sync()

    def _fake_close(self):
        self.done += 1
        
    def test_close(self):
        saved_db = self.db.db.close
        saved_dbm = self.db.dbm.close
        try:
            self.done = 0
            self.db.db.close = self._fake_close
            self.db.dbm.close = self._fake_close
            self.db.close()
            self.assertEqual(self.done, 2)
        finally:
            # If we don't put these back (whatever happens), then
            # the db isn't closed and can't be deleted in tearDown.
            self.db.db.close = saved_db
            self.db.dbm.close = saved_dbm


class UtilitiesTest(unittest.TestCase):
    def _verify_details(self, details):
        loc = details.find(__file__)
        self.assertNotEqual(loc, -1)
        loc = details.find("Exception: Test")
        self.assertNotEqual(loc, -1)

    def _verify_exception_header(self, msg, details):        
        msg = email.message_from_string(msg)
        details = "\r\n.".join(details.strip().split('\n'))
        headerName = 'X-Spambayes-Exception'
        header = email.Header.Header(details, header_name=headerName)
        self.assertEqual(msg[headerName].replace('\r\n', '\n'),
                         str(header).replace('\r\n', '\n'))


def suite():
    suite = unittest.TestSuite()
    classes = (MessageTest,
               SBHeaderMessageTest,
               MessageInfoPickleTest,
               UtilitiesTest,
               )
    from spambayes import dbmstorage
    try:
        dbmstorage.open_best()
    except dbmstorage.error:
        print "Skipping MessageInfoDBTest - no dbm module available"
        from spambayes import message
        def always_pickle():
            return "__test.pik", "pickle"
        message.database_type = always_pickle
    except TypeError:
        # We need an argument, so TypeError will be raised
        # when it *is* available.
        classes += (MessageInfoDBTest,)
    for cls in classes:
        suite.addTest(unittest.makeSuite(cls))
    return suite

if __name__=='__main__':
    sb_test_support.unittest_main(argv=sys.argv + ['suite'])



More information about the Spambayes-checkins mailing list