[Spambayes-checkins] spambayes/spambayes/test test_sb-server.py, NONE, 1.1

Tony Meyer anadelonbrin at users.sourceforge.net
Thu Sep 4 19:16:48 EDT 2003


Update of /cvsroot/spambayes/spambayes/spambayes/test
In directory sc8-pr-cvs1:/tmp/cvs-serv14316/spambayes/test

Added Files:
	test_sb-server.py 
Log Message:
Crap!  We can't use "sb-" as a prefix, because then we can't import the scripts.
I guess that all the importable code could be moved into modules, but that seems
like a huge hassle.  Let's use "sb_" as a prefix instead.

Apologies for cluttering the attic...sigh.

--- NEW FILE: test_sb-server.py ---
#! /usr/bin/env python

"""Test the pop3proxy is working correctly.

When using the -z command line option, carries out a test that the
POP3 proxy can be connected to, that incoming mail is classified,
that pipelining is removed from the CAPA[bility] query, and that the
web ui is present.

The -t option runs a fake POP3 server on port 8810.  This is the
same server that the -z option uses, and may be separately run for
other testing purposes.

Usage:

    pop3proxytest.py [options]

        options:
            -z      : Runs a self-test and exits.
            -t      : Runs a fake POP3 server on port 8110 (for testing).
            -h      : Displays this help message.
"""

# This module is part of the spambayes project, which is Copyright 2002
# The Python Software Foundation and is covered by the Python Software
# Foundation license.

__author__ = "Richie Hindle <richie at entrian.com>"
__credits__ = "All the Spambayes folk."

try:
    True, False
except NameError:
    # Maintain compatibility with Python 2.2
    True, False = 1, 0

# This code originally formed a part of pop3proxy.py.  If you are examining
# the history of this file, you may need to go back to there.

todo = """
Web training interface:

 o Functional tests.
"""

# One example of spam and one of ham - both are used to train, and are
# then classified.  Not a good test of the classifier, but a perfectly
# good test of the POP3 proxy.  The bodies of these came from the
# spambayes project, and Richie added the headers because the
# originals had no headers.

spam1 = """From: friend at public.com
Subject: Make money fast

Hello tim_chandler , Want to save money ?
Now is a good time to consider refinancing. Rates are low so you can cut
your current payments and save money.

http://64.251.22.101/interest/index%38%30%300%2E%68t%6D

Take off list on site [s5]
"""

good1 = """From: chris at example.com
Subject: ZPT and DTML

Jean Jordaan wrote:
> 'Fraid so ;>  It contains a vintage dtml-calendar tag.
>   http://www.zope.org/Members/teyc/CalendarTag
>
> Hmm I think I see what you mean: one needn't manually pass on the
> namespace to a ZPT?

Yeah, Page Templates are a bit more clever, sadly, DTML methods aren't :-(

Chris
"""

import asyncore
import socket
import operator
import re
import getopt

# a bit of a hack to help those without spambayes on their
# Python path - stolen from timtest.py
import sys
import os
sys.path.insert(-1, os.getcwd())
sys.path.insert(-1, os.path.dirname(os.getcwd()))

from spambayes import Dibbler
from spambayes import tokenizer
from spambayes.UserInterface import UserInterfaceServer
from spambayes.ProxyUI import ProxyUserInterface
from pop3proxy import BayesProxyListener
from pop3proxy import state, _recreateState
from spambayes.Options import options

# HEADER_EXAMPLE is the longest possible header - the length of this one
# is added to the size of each message.
HEADER_EXAMPLE = '%s: xxxxxxxxxxxxxxxxxxxx\r\n' % \
                 options["Hammie", "header_name"]

class TestListener(Dibbler.Listener):
    """Listener for TestPOP3Server.  Works on port 8110, to co-exist
    with real POP3 servers."""

    def __init__(self, socketMap=asyncore.socket_map):
        Dibbler.Listener.__init__(self, 8110, TestPOP3Server,
                                  (socketMap,), socketMap=socketMap)


class TestPOP3Server(Dibbler.BrighterAsyncChat):
    """Minimal POP3 server, for testing purposes.  Doesn't support
    UIDL.  USER, PASS, APOP, DELE and RSET simply return "+OK"
    without doing anything.  Also understands the 'KILL' command, to
    kill it.  The mail content is the example messages above.
    """

    def __init__(self, clientSocket, socketMap):
        # Grumble: asynchat.__init__ doesn't take a 'map' argument,
        # hence the two-stage construction.
        Dibbler.BrighterAsyncChat.__init__(self)
        Dibbler.BrighterAsyncChat.set_socket(self, clientSocket, socketMap)
        self.maildrop = [spam1, good1]
        self.set_terminator('\r\n')
        self.okCommands = ['USER', 'PASS', 'APOP', 'NOOP',
                           'DELE', 'RSET', 'QUIT', 'KILL']
        self.handlers = {'CAPA': self.onCapa,
                         'STAT': self.onStat,
                         'LIST': self.onList,
                         'RETR': self.onRetr,
                         'TOP': self.onTop}
        self.push("+OK ready\r\n")
        self.request = ''

    def collect_incoming_data(self, data):
        """Asynchat override."""
        self.request = self.request + data

    def found_terminator(self):
        """Asynchat override."""
        if ' ' in self.request:
            command, args = self.request.split(None, 1)
        else:
            command, args = self.request, ''
        command = command.upper()
        if command in self.okCommands:
            self.push("+OK (we hope)\r\n")
            if command == 'QUIT':
                self.close_when_done()
            if command == 'KILL':
                self.socket.shutdown(2)
                self.close()
                raise SystemExit
        else:
            handler = self.handlers.get(command, self.onUnknown)
            self.push(handler(command, args))   # Or push_slowly for testing
        self.request = ''

    def push_slowly(self, response):
        """Useful for testing."""
        for c in response:
            self.push(c)
            time.sleep(0.02)

    def onCapa(self, command, args):
        """POP3 CAPA command.  This lies about supporting pipelining for
        test purposes - the POP3 proxy *doesn't* support pipelining, and
        we test that it correctly filters out that capability from the
        proxied capability list."""
        lines = ["+OK Capability list follows",
                 "PIPELINING",
                 "TOP",
                 ".",
                 ""]
        return '\r\n'.join(lines)

    def onStat(self, command, args):
        """POP3 STAT command."""
        maildropSize = reduce(operator.add, map(len, self.maildrop))
        maildropSize += len(self.maildrop) * len(HEADER_EXAMPLE)
        return "+OK %d %d\r\n" % (len(self.maildrop), maildropSize)

    def onList(self, command, args):
        """POP3 LIST command, with optional message number argument."""
        if args:
            try:
                number = int(args)
            except ValueError:
                number = -1
            if 0 < number <= len(self.maildrop):
                return "+OK %d\r\n" % len(self.maildrop[number-1])
            else:
                return "-ERR no such message\r\n"
        else:
            returnLines = ["+OK"]
            for messageIndex in range(len(self.maildrop)):
                size = len(self.maildrop[messageIndex])
                returnLines.append("%d %d" % (messageIndex + 1, size))
            returnLines.append(".")
            return '\r\n'.join(returnLines) + '\r\n'

    def _getMessage(self, number, maxLines):
        """Implements the POP3 RETR and TOP commands."""
        if 0 < number <= len(self.maildrop):
            message = self.maildrop[number-1]
            headers, body = message.split('\n\n', 1)
            bodyLines = body.split('\n')[:maxLines]
            message = headers + '\r\n\r\n' + '\n'.join(bodyLines)
            return "+OK\r\n%s\r\n.\r\n" % message
        else:
            return "-ERR no such message\r\n"

    def onRetr(self, command, args):
        """POP3 RETR command."""
        try:
            number = int(args)
        except ValueError:
            number = -1
        return self._getMessage(number, 12345)

    def onTop(self, command, args):
        """POP3 RETR command."""
        try:
            number, lines = map(int, args.split())
        except ValueError:
            number, lines = -1, -1
        return self._getMessage(number, lines)

    def onUnknown(self, command, args):
        """Unknown POP3 command."""
        return "-ERR Unknown command: %s\r\n" % repr(command)


def test():
    """Runs a self-test using TestPOP3Server, a minimal POP3 server
    that serves the example emails above.
    """
    # Run a proxy and a test server in separate threads with separate
    # asyncore environments.
    import threading
    state.isTest = True
    testServerReady = threading.Event()
    def runTestServer():
        testSocketMap = {}
        TestListener(socketMap=testSocketMap)
        testServerReady.set()
        asyncore.loop(map=testSocketMap)

    proxyReady = threading.Event()
    def runUIAndProxy():
        httpServer = UserInterfaceServer(8881)
        proxyUI = ProxyUserInterface(state, _recreateState)
        httpServer.register(proxyUI)
        BayesProxyListener('localhost', 8110, ('', 8111))
        state.bayes.learn(tokenizer.tokenize(spam1), True)
        state.bayes.learn(tokenizer.tokenize(good1), False)
        proxyReady.set()
        Dibbler.run()

    threading.Thread(target=runTestServer).start()
    testServerReady.wait()
    threading.Thread(target=runUIAndProxy).start()
    proxyReady.wait()

    # Connect to the proxy and the test server.
    proxy = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    proxy.connect(('localhost', 8111))
    response = proxy.recv(100)
    assert response == "+OK ready\r\n"
    pop3Server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    pop3Server.connect(('localhost', 8110))
    response = pop3Server.recv(100)
    assert response == "+OK ready\r\n"

    # Verify that the test server claims to support pipelining.
    pop3Server.send("capa\r\n")
    response = pop3Server.recv(1000)
    assert response.find("PIPELINING") >= 0

    # Ask for the capabilities via the proxy, and verify that the proxy
    # is filtering out the PIPELINING capability.
    proxy.send("capa\r\n")
    response = proxy.recv(1000)
    assert response.find("PIPELINING") == -1

    # Stat the mailbox to get the number of messages.
    proxy.send("stat\r\n")
    response = proxy.recv(100)
    count, totalSize = map(int, response.split()[1:3])
    assert count == 2

    # Loop through the messages ensuring that they have judgement
    # headers.
    for i in range(1, count+1):
        response = ""
        proxy.send("retr %d\r\n" % i)
        while response.find('\n.\r\n') == -1:
            response = response + proxy.recv(1000)
        assert response.find(options["Hammie", "header_name"]) >= 0

    # Smoke-test the HTML UI.
    httpServer = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    httpServer.connect(('localhost', 8881))
    httpServer.sendall("get / HTTP/1.0\r\n\r\n")
    response = ''
    while 1:
        packet = httpServer.recv(1000)
        if not packet: break
        response += packet
    assert re.search(r"(?s)<html>.*Spambayes proxy.*</html>", response)

    # Kill the proxy and the test server.
    proxy.sendall("kill\r\n")
    proxy.recv(100)
    pop3Server.sendall("kill\r\n")
    pop3Server.recv(100)

def run():
    # Read the arguments.
    try:
        opts, args = getopt.getopt(sys.argv[1:], 'htz')
    except getopt.error, msg:
        print >>sys.stderr, str(msg) + '\n\n' + __doc__
        sys.exit()

    runSelfTest = False
    for opt, arg in opts:
        if opt == '-h':
            print >>sys.stderr, __doc__
            sys.exit()
        elif opt == '-t':
            state.isTest = True
            state.runTestServer = True
        elif opt == '-z':
            state.isTest = True
            runSelfTest = True

    state.createWorkers()

    if runSelfTest:
        print "\nRunning self-test...\n"
        state.buildServerStrings()
        test()
        print "Self-test passed."   # ...else it would have asserted.

    elif state.runTestServer:
        print "Running a test POP3 server on port 8110..."
        TestListener()
        asyncore.loop()

    else:
        print >>sys.stderr, __doc__

if __name__ == '__main__':
    run()





More information about the Spambayes-checkins mailing list