[Python-checkins] python/nondist/sandbox/spambayes timtest.py,NONE,1.1

tim_one@users.sourceforge.net tim_one@users.sourceforge.net
Fri, 30 Aug 2002 23:50:27 -0700


Update of /cvsroot/python/python/nondist/sandbox/spambayes
In directory usw-pr-cvs1:/tmp/cvs-serv15716

Added Files:
	timtest.py 
Log Message:
This is a driver I've been using for test runs.  It's specific to my
corpus directories, but has useful stuff in it all the same.


--- NEW FILE: timtest.py ---
#! /usr/bin/env python

NSETS = 5
SPAMDIRS = ["Data/Spam/Set%d" % i for i in range(1, NSETS+1)]
HAMDIRS  = ["Data/Ham/Set%d" % i for i in range(1, NSETS+1)]
SPAMHAMDIRS = zip(SPAMDIRS, HAMDIRS)

import os
import re
from sets import Set
import email
from email import message_from_string

import Tester
import classifier

def textparts(msg):
    text = Set()
    redundant_html = Set()
    for part in msg.walk():
        if part.get_content_type() == 'multipart/alternative':
            textpart = htmlpart = None
            for subpart in part.get_payload():
                ctype = subpart.get_content_type()
                if ctype == 'text/plain':
                    textpart = subpart
                elif ctype == 'text/html':
                    htmlpart = subpart

            if textpart is not None:
                text.add(textpart)
                if htmlpart is not None:
                    redundant_html.add(htmlpart)
            elif htmlpart is not None:
                text.add(htmlpart)

        elif part.get_content_maintype() == 'text':
            text.add(part)

    return text - redundant_html

url_re = re.compile(r"http://([^\s>'\"\x7f-\xff]+)", re.IGNORECASE)
urlsep_re = re.compile(r"[;?:@&=+,$.]")

def tokenize(string):
    # Skip headers.
    i = string.find('\n\n')
    nohead = None
    if i >= 0:
        nohead = string[i+2:]
        for url in url_re.findall(nohead):
            for i, piece in enumerate(url.lower().split('/')):
                prefix = "url%d:" % i
                for chunk in urlsep_re.split(piece):
                    yield prefix + chunk

    try:
        msg = message_from_string(string)
    except email.Errors.MessageParseError:
        yield 'control: MessageParseError'
        if nohead is not None:
            for w in nohead.split():
                if 3 <= len(w) <= 12:
                    yield w
        return

    for part in textparts(msg):
        try:
            text = part.get_payload(decode=1)
        except:
            yield 'control: get_payload crapped out'
        else:
            if text is None:
                yield 'control: payload is None'
            else:
                for w in text.split():
                    if 3 <= len(w) <= 12:
                        yield w

class Msg(object):
    def __init__(self, dir, name):
        path = dir + "/" + name
        self.path = path
        f = open(path, 'rb')
        guts = f.read()
        f.close()
#        # Skip the headers.
#        i = guts.find('\n\n')
#        if i >= 0:
#            guts = guts[i+2:]
        self.guts = guts

    def __iter__(self):
        return tokenize(self.guts)

    def __hash__(self):
        return hash(self.path)

    def __eq__(self, other):
        return self.path == other.path

class MsgStream(object):
    def __init__(self, directory):
        self.directory = directory

    def produce(self):
        directory = self.directory
        for fname in os.listdir(directory):
            yield Msg(directory, fname)

    def __iter__(self):
        return self.produce()


def drive():
    falsepos = Set()
    falseneg = Set()
    for spamdir, hamdir in SPAMHAMDIRS:
        c = classifier.GrahamBayes()
        t = Tester.Test(c)
        print "Training on", hamdir, "&", spamdir, "...",
        t.train(MsgStream(hamdir), MsgStream(spamdir))
        print t.nham, "hams &", t.nspam, "spams"

        for sd2, hd2 in SPAMHAMDIRS:
            if (sd2, hd2) == (spamdir, hamdir):
                continue
            t.reset_test_results()
            print "    testing against", hd2, "&", sd2, "...",
            t.predict(MsgStream(sd2), True)
            t.predict(MsgStream(hd2), False)
            print t.nham_tested, "hams &", t.nspam_tested, "spams"

            print "    false positive:", t.false_positive_rate()
            print "    false negative:", t.false_negative_rate()

            newfpos = Set(t.false_positives()) - falsepos
            falsepos |= newfpos
            print "    new false positives:", [e.path for e in newfpos]
            for e in newfpos:
                print '*' * 78
                print e.path
                prob, clues = c.spamprob(e, True)
                print "prob =", prob
                for clue in clues:
                    print "prob(%r) = %g" % clue
                print e.guts

            newfneg = Set(t.false_negatives()) - falseneg
            falseneg |= newfneg
            print "    new false negatives:", [e.path for e in newfneg]
            for e in newfneg:
                print '*' * 78
                print e.path
                prob, clues = c.spamprob(e, True)
                print "prob =", prob
                for clue in clues:
                    print "prob(%r) = %g" % clue
                print e.guts[:1000]

            print

            print "    best discriminators:"
            stats = [(r.killcount, w) for w, r in c.wordinfo.iteritems()]
            stats.sort()
            del stats[:-30]
            for count, w in stats:
                r = c.wordinfo[w]
                print "        %r %d %g" % (w, r.killcount, r.spamprob)
            print

drive()