[Spambayes-checkins] spambayes/spambayes TestToolsUI.py,NONE,1.1

Tony Meyer anadelonbrin at users.sourceforge.net
Sun Dec 21 23:38:13 EST 2003


Update of /cvsroot/spambayes/spambayes/spambayes
In directory sc8-pr-cvs1:/tmp/cvs-serv24316/spambayes

Added Files:
	TestToolsUI.py 
Log Message:
A beginning at a more userfriendly interface to the testing setup.
At the moment to use this, replace the ProxyUserInterface class in sb_server
with the TestTools one (all the old stuff is still available) and go to
http://localhost:8880/cv - it should be straightforward from there.
(I'll post a patch to -dev as well).

Note that the option to use Outlook Express as the source for testing
data doesn't quite work yet.  The 'standard' and 'cache' ones do, from
the testing I've done so far.


--- NEW FILE: TestToolsUI.py ---
"""Testing Tools Web Interface

Classes:
    TestToolsUserInterface - Interface class for testing tools.

Abstract:

This module implements a browser based Spambayes user interface for the
various testing tools.  Users may use it to interface with the tools.

The following functions are currently included:
  onCV - cross-validation testing

To do:
 o Add interface to Alex's incremental test setup.
 o Suggestions?
"""

# This module is part of the spambayes project, which is Copyright 2002-3
# The Python Software Foundation and is covered by the Python Software
# Foundation license.

__author__ = "Tony Meyer <ta-meyer at ihug.co.nz>"
__credits__ = "All the Spambayes folk."

try:
    True, False
except NameError:
    # Maintain compatibility with Python 2.2
    True, False = 1, 0

import os
import sys
import cgi
import glob
import random
import StringIO

import ProxyUI
import oe_mailbox
from spambayes import msgs
from spambayes import TestDriver
from spambayes import OptionsClass
from spambayes.Options import options

# These are the options that will be offered on the testing page.
# If the option is None, then the entry is a header and the following
# options will appear in a new box on the configuration page.
# These are also used to generate http request parameters and template
# fields/variables.
testtools_ini_map = (
##    ('General Options', None),
#   Put any general options that we wish to encourage people to test
#   here, for example:
#   ('Classifier',           'max_discriminators'),
    ('Experimental Options', None),
)

# Dynamically add any current experimental/deprecated options.
for opt in options.options(True):
    sect, opt = opt[1:].split(']', 1)
    if opt[:2].lower() == "x-":
        testtools_ini_map += ((sect, opt),)

class TestToolsUserInterface(ProxyUI.ProxyUserInterface):
    """Serves the HTML user interface for the test tools."""
    def onCv(self):
        global testtools_ini_map
        self._writePreamble("CV Test")
        configTable = self.html.configForm.clone()
        del configTable.configTextRow1
        del configTable.configTextRow2
        del configTable.configCbRow1
        del configTable.configRow2
        del configTable.blankRow
        del configTable.folderRow

        # Add some options that are only available via this page.
        # (This makes displaying the options nice and easy, since
        # they're just handled like everything else).
        sect = 'TestToolsUI'
        for newopt in [('source', 'Messages source', 'Standard test setup',
                        'Select the source of the messages to test on.',
                        ('Standard test setup', 'Cache', 'Outlook Express'),
                        False),
                       ('n', 'Number of runs', 10,
                        'Select the number of cross-validation runs.',
                        OptionsClass.INTEGER, False),]:
            options._options[sect, newopt[0]] = OptionsClass.Option(*newopt)
        testtools_ini_map += (('Testing Options', None),
                              ('TestToolsUI', 'source'),
                              ('TestToolsUI', 'n'),)
        
        option_choice = self._buildConfigPageBody(\
            configTable, testtools_ini_map)
        option_choice.action_page.action = "cvresults"
        option_choice.introduction = "Select the options for your test " \
                                     "(these will be run against the " \
                                     "defaults)."
        option_choice.optionsPathname = "memory only"
        del option_choice.restore_form
        del option_choice.adv_button
        option_choice.config_submit.value = "Run Test"
        self.write(option_choice)
        self._writePostamble()

    def onCvresults(self, *args, **kwargs):
        del kwargs["how"]
        self._writePreamble("CV Test Results")
        text = "Display the results of a cross-validation test with the " \
               "current settings against the defaults."
        nsets = options["TestToolsUI", "n"]

        # With defaults first.
        self.write("<p>Testing with defaults...</p>")
        saved = {}
        for opt in options.options(True):
            # Ignore those that have do_not_restore as True
            # (These are predominately storage options, and at least
            # the cache directory ones may be needed later on).
            sect, opt = opt[1:].split(']', 1)
            saved[(sect, opt)] = options[(sect, opt)]
            if not options.no_restore(sect, opt):
                options.set(sect, opt, options.default(sect, opt))
        options["TestToolsUI", "source"] = kwargs["TestToolsUI_source"]
        # XXX Cache this somewhere?  If the testing data isn't changing,
        # XXX and the user is running multiple tests, then it doesn't
        # XXX make much sense to rerun the 'default's test over and over
        # XXX again.
        cv_out, errors = self.timCV(nsets)
##        print errors.read()
        defaults = self.rates(cv_out)

        # Now with specified settings.
        self.write("<p>Testing with selected settings...</p>")
        for opt in options.options(True):
            sect, opt = opt[1:].split(']', 1)
            try:
                value = kwargs["%s_%s" % (sect, opt)]
            except KeyError:
                # Leave as the default.
                pass
            else:
                options.set(sect, opt, value)
        cv_out, errors = self.timCV(nsets)
##        print errors.read()
        current = self.rates(cv_out)

        # Restore the settings.
        for opt in options.options(True):
            sect, opt = opt[1:].split(']', 1)
            options.set(sect, opt, saved[(sect, opt)])

        # Do the comparison.
        comp, errors = self.compare(defaults, current)
##        print errors.read()

        # Output the results
        # XXX This is just what you'd get from running cmp.py
        # XXX at the moment - it could be prettied up a bit.
        comp = comp.read()
        box = self._buildBox('Cross-validation test', None,
                             cgi.escape(comp).replace("\n", "<br />"))
        self.write(box)
        self._writePostamble()

    def timCV(self, nsets):
        # Until we are un-lazy enough to change the code borrowed from
        # timcv.py, just capture the output that normally goes to stdout
        # or stderr and return it.
        cout, cerr = sys.stdout, sys.stderr
        sys.stdout = StringIO.StringIO()
        sys.stderr = StringIO.StringIO()

        if options["TestToolsUI", "source"] == "Standard test setup":
            # Source the test data from the 'standard' test setup,
            # as described in the testtools directory.
            hamdirs  = [options["TestDriver", "ham_directories"] % \
                        i for i in range(1, nsets+1)]
            spamdirs = [options["TestDriver", "spam_directories"] % \
                        i for i in range(1, nsets+1)]
            hstream = msgs.HamStream
            sstream = msgs.SpamStream
        elif options["TestToolsUI", "source"] == "Cache":
            # Source the test data from the cache directories
            # specified in the "Storage" section of the configuration.
            # This means that we have one 'ham' directory and one
            # 'spam' directory (we ignore the unknown one, obviously),
            # but what we really want is n directories of ham and spam.
            # To overcome this without actually moving the files about
            # we have a class that fakes it for us.
            hamdirs  = ["%s%s%s/%s" % (options["Storage", "ham_cache"],
                                       os.pathsep, i, nsets)
                        for i in range(1, nsets+1)]
            spamdirs = ["%s%s%s/%s" % (options["Storage", "spam_cache"],
                                       os.pathsep, i, nsets)
                        for i in range(1, nsets+1)]
            hstream = HamCacheStream
            sstream = SpamCacheStream
        elif options["TestToolsUI", "source"] == "Outlook Express":
            # Source the test data from Outlook Express
            # Pretty crude at the moment (hard coded):
            #   Ignores:
            #     o Deleted Items
            #     o Drafts
            #     o Folders
            #     o Offline
            #     o Outbox
            #     o Pop3uidl
            #   Assumes that anything with 'spam' in the name is spam
            #   (so don't have a folder called "looks like spam"!) and
            #   that anything else is ham.
            # No mixed dbxes!
            # Each dbx is the equivilent of a directory in the 'standard'
            # test setup - so it would be good if each dbx had a roughly
            # equal number of messages, and if there were the same number
            # of ham and spam dbxes.
            dbx_dir = oe_mailbox.OEStoreRoot()
            dbxes = glob.glob(os.path.join(dbx_dir, "*.dbx"))
            spamdirs = []
            hamdirs = []
            for dbx in dbxes:
                if os.path.splitext(os.path.basename(dbx))[0].lower() in \
                   ["deleted items", "drafts", "folders",
                    "offline", "outbox", "pop3uidl",]:
                  continue
                elif dbx.lower().find("spam") == -1:
                    spamdirs.append(dbx)
                else:
                    hamdirs.append(dbx)
            hstream = oe_mailbox.OEHamStream
            sstream = oe_mailbox.OESpamStream

        d = TestDriver.Driver()
        # Train it on all sets except the first.
        h = hstream("%s-%d" % (hamdirs[1], nsets), hamdirs[1:], train=1)
        d.train(hstream("%s-%d" % (hamdirs[1], nsets), hamdirs[1:],
                        train=1),
                sstream("%s-%d" % (spamdirs[1], nsets), spamdirs[1:],
                        train=1))

        # Now run nsets times, predicting pair i against all except pair i.
        for i in range(nsets):
            h = hamdirs[i]
            s = spamdirs[i]
            hamstream = hstream(h, [h], train=0)
            spamstream = sstream(s, [s], train=0)

            if i > 0:
                if options["CV Driver", "build_each_classifier_from_scratch"]:
                    # Build a new classifier from the other sets.
                    d.new_classifier()

                    hname = "%s-%d, except %d" % (hamdirs[0], nsets, i+1)
                    h2 = hamdirs[:]
                    del h2[i]

                    sname = "%s-%d, except %d" % (spamdirs[0], nsets, i+1)
                    s2 = spamdirs[:]
                    del s2[i]

                    d.train(hstream(hname, h2, train=1),
                            sstream(sname, s2, train=1))

                else:
                    # Forget this set.
                    d.untrain(hamstream, spamstream)

            # Predict this set.
            d.test(hamstream, spamstream)
            d.finishtest()

            if i < nsets - 1 and not options["CV Driver",
                                             "build_each_classifier_from_scratch"]:
                # Add this set back in.
                d.train(hamstream, spamstream)
        d.alldone()

        # Other end of the lazy 'capture the output' code.
        sys.stdout.seek(0)
        sys.stderr.seek(0)
        out, err = sys.stdout, sys.stderr
        sys.stdout = cout
        sys.stderr = cerr
        return out, err

    def rates(self, ifile):
        """This is essentially rates.py from the testtools directory."""
        # XXX Stop being lazy and using the remapping cout/cerr cheat
        # XXX at some point.
        cout = sys.stdout
        cerr = sys.stderr
        sys.stdout = StringIO.StringIO()
        sys.stderr = StringIO.StringIO()
        
        interesting = filter(lambda line: line.startswith('-> '), ifile)
        ifile.close()

        ofile = StringIO.StringIO()

        def dump(*stuff):
            msg = ' '.join(map(str, stuff))
            print msg
            print >> ofile, msg

        ntests = nfn = nfp = 0
        sumfnrate = sumfprate = 0.0

        for line in interesting:
            dump(line[:-1])
            fields = line.split()

            # 0      1      2    3    4 5    6                 -5  -4 -3   -2    -1
            #-> <stat> tested 4000 hams & 2750 spams against 8000 hams & 5500 spams
            if line.startswith('-> <stat> tested '):
                ntests += 1
                continue

            #  0      1     2        3
            # -> <stat> false positive %: 0.025
            # -> <stat> false negative %: 0.327272727273
            if line.startswith('-> <stat> false '):
                kind = fields[3]
                percent = float(fields[-1])
                if kind == 'positive':
                    sumfprate += percent
                    lastval = percent
                else:
                    sumfnrate += percent
                    dump('    %7.3f %7.3f' % (lastval, percent))
                continue

            #  0      1 2   3     4         5
            # -> <stat> 1 new false positives
            if len(fields) >= 5 and fields[3] == 'new' and fields[4] == 'false':
                kind = fields[-1]
                count = int(fields[2])
                if kind == 'positives':
                    nfp += count
                else:
                    nfn += count

        dump('total unique false pos', nfp)
        dump('total unique false neg', nfn)
        dump('average fp %', sumfprate / ntests)
        dump('average fn %', sumfnrate / ntests)
        ofile.seek(0)
        sys.stdout = cout
        sys.stderr = cerr
        return ofile

    def compare(self, f1, f2):
        """This is essentially cmp.py from the testtools directory."""
        # XXX Stop being lazy and using the remapping cout/cerr cheat
        # XXX at some point.
        cout, cerr = sys.stdout, sys.stderr
        sys.stdout = StringIO.StringIO()
        sys.stderr = StringIO.StringIO()

        def suck(f):
            fns = []
            fps = []
            hamdev = []
            spamdev = []
            hamdevall = spamdevall = (0.0, 0.0)

            get = f.readline
            while 1:
                line = get()
                if line.startswith('-> <stat> tested'):
                    print line,
                if line.find(' items; mean ') != -1:
                    # -> <stat> Ham distribution for this pair: 1000 items; mean 0.05; sample sdev 0.68
                    # and later "sample " went away
                    vals = line.split(';')
                    mean = float(vals[1].split()[-1])
                    sdev = float(vals[2].split()[-1])
                    val = (mean, sdev)
                    typ = vals[0].split()[2]
                    if line.find('for all runs') != -1:
                        if typ == 'Ham':
                            hamdevall = val
                        else:
                            spamdevall = val
                    elif line.find('all in this') != -1:
                        if typ == 'Ham':
                            hamdev.append(val)
                        else:
                            spamdev.append(val)
                    continue
                if line.startswith('-> '):
                    continue
                if line.startswith('total'):
                    break
                if len(line) == 0:
                    continue
                # A line with an f-p rate and an f-n rate.
                p, n = map(float, line.split())
                fps.append(p)
                fns.append(n)

            # "total unique false pos 0"
            # "total unique false neg 0"
            # "average fp % 0.0"
            # "average fn % 0.0"
            fptot = int(line.split()[-1])
            fntot = int(get().split()[-1])
            fpmean = float(get().split()[-1])
            fnmean = float(get().split()[-1])
            return (fps, fns, fptot, fntot, fpmean, fnmean,
                    hamdev, spamdev, hamdevall, spamdevall)

        def tag(p1, p2):
            if p1 == p2:
                t = "tied          "
            else:
                t = p1 < p2 and "lost " or "won  "
                if p1:
                    p = (p2 - p1) * 100.0 / p1
                    t += " %+7.2f%%" % p
                else:
                    t += " +(was 0)"
            return t

        def mtag(m1, m2):
            mean1, dev1 = m1
            mean2, dev2 = m2
            t = "%7.2f %7.2f " % (mean1, mean2)
            if mean1:
                mp = (mean2 - mean1) * 100.0 / mean1
                t += "%+7.2f%%" % mp
            else:
                t += "+(was 0)"
            t += "     %7.2f %7.2f " % (dev1, dev2)
            if dev1:
                dp = (dev2 - dev1) * 100.0 / dev1
                t += "%+7.2f%%" % dp
            else:
                t += "+(was 0)"
            return t

        def dump(p1s, p2s):
            alltags = ""
            for p1, p2 in zip(p1s, p2s):
                t = tag(p1, p2)
                print "    %5.3f  %5.3f  %s" % (p1, p2, t)
                alltags += t + " "
            print
            for t in "won", "tied", "lost":
                print "%-4s %2d times" % (t, alltags.count(t))
            print

        def dumpdev(meandev1, meandev2):
            for m1, m2 in zip(meandev1, meandev2):
                print mtag(m1, m2)

        (fp1, fn1, fptot1, fntot1, fpmean1, fnmean1,
         hamdev1, spamdev1, hamdevall1, spamdevall1) = suck(f1)

        (fp2, fn2, fptot2, fntot2, fpmean2, fnmean2,
         hamdev2, spamdev2, hamdevall2, spamdevall2) = suck(f2)

        print
        print "false positive percentages"
        dump(fp1, fp2)
        print "total unique fp went from", fptot1, "to", fptot2, tag(fptot1, fptot2)
        print "mean fp % went from", fpmean1, "to", fpmean2, tag(fpmean1, fpmean2)

        print
        print "false negative percentages"
        dump(fn1, fn2)
        print "total unique fn went from", fntot1, "to", fntot2, tag(fntot1, fntot2)
        print "mean fn % went from", fnmean1, "to", fnmean2, tag(fnmean1, fnmean2)

        print
        if len(hamdev1) == len(hamdev2) and len(spamdev1) == len(spamdev2):
            print "ham mean                     ham sdev"
            dumpdev(hamdev1, hamdev2)
            print
            print "ham mean and sdev for all runs"
            dumpdev([hamdevall1], [hamdevall2])


            print
            print "spam mean                    spam sdev"
            dumpdev(spamdev1, spamdev2)
            print
            print "spam mean and sdev for all runs"
            dumpdev([spamdevall1], [spamdevall2])

            print
            diff1 = spamdevall1[0] - hamdevall1[0]
            diff2 = spamdevall2[0] - hamdevall2[0]
            print "ham/spam mean difference: %2.2f %2.2f %+2.2f" % (diff1,
                                                                    diff2,
                                                                    diff2 - diff1)
        else:
            print "[info about ham & spam means & sdevs not available in both files]"

        sys.stdout.seek(0)
        sys.stderr.seek(0)
        out, err = sys.stdout, sys.stderr
        sys.stdout = cout
        sys.stderr = cerr
        return out, err


# The iterator yields a stream of Msg objects from the given
# 'directory'.  The directory is actually the actual directory
# and then an indication of the portion of it that we are after.
# (so that a single directory can be used, a la the caches, rather
# than a nicely split up into sets directory).
class CacheStream(msgs.MsgStream):
    def produce(self):
        # We only want some of the msgs.  Shuffle each directory list, but
        # in such a way that we'll get the same result each time this is
        # called on the same directory list.
        base_check = None
        for directory in self.directories:
            directory, portion = directory.split(os.pathsep)
            # All the directories in the list *must* be the same, and just
            # different sections, because this makes the code easier, and is
            # the desired usage, anyway.
            if base_check is None:
                base_check = directory
            assert directory == base_check

            set_num, nsets = portion.split('/')

            all = os.listdir(directory)
            random.seed(hash(max(all)) ^ msgs.SEED)
            random.shuffle(all)

            set_size = len(all) // int(nsets)
            set_num = int(set_num)
            set = all[set_num*set_size:((set_num+1)*set_size)-1]
            set.sort()
            for fname in set:
                yield msgs.Msg(directory, fname)

class HamCacheStream(CacheStream):
    def __init__(self, tag, directories, train=0):
        if train:
            CacheStream.__init__(self, tag, directories, msgs.HAMTRAIN)
        else:
            CacheStream.__init__(self, tag, directories, msgs.HAMTEST)

class SpamCacheStream(CacheStream):
    def __init__(self, tag, directories, train=0):
        if train:
            CacheStream.__init__(self, tag, directories, msgs.SPAMTRAIN)
        else:
            CacheStream.__init__(self, tag, directories, msgs.SPAMTEST)





More information about the Spambayes-checkins mailing list