[Spambayes-checkins] spambayes pop3proxy.py,NONE,1.1

Richie Hindle richiehindle@users.sourceforge.net
Mon, 16 Sep 2002 00:57:22 -0700


Update of /cvsroot/spambayes/spambayes
In directory usw-pr-cvs1:/tmp/cvs-serv5459

Added Files:
	pop3proxy.py 
Log Message:
pop3proxy.py is a spam-classifying POP3 proxy, plus associated test code.


--- NEW FILE: pop3proxy.py ---
#!/usr/bin/env python

# pop3proxy is released under the terms of the following MIT-style license:
#
# Copyright (c) Entrian Solutions 2002
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

"""A POP3 proxy designed to work with classifier.py, to add an X-Bayes-Score
header to each incoming email.  The header gives a floating point number
between 0.00 and 1.00, to two decimal places.  You point pop3proxy at your
POP3 server, and configure your email client to collect mail from the proxy
and filter on the X-Bayes-Score header.  Usage:

    pop3proxy.py [options] <server> [<server port>]
        <server> is the name of your real POP3 server
        <port>   is the port number of your real POP3 server, which
                 defaults to 110.

        options (the same as hammie):   
            -p FILE : use the named data file
            -d      : the file is a DBM file rather than a pickle

    pop3proxy -t
        Runs a test POP3 server on port 8110; useful for testing.

    pop3proxy -h
        Displays this help message.

For safety, and to help debugging, the whole POP3 conversation is written
out to _pop3proxy.log for each run.
"""

import sys, re, operator, errno, getopt, cPickle, socket, asyncore, asynchat
import classifier, tokenizer, hammie
from classifier import GrahamBayes, WordInfo     # So we can unpickle these.

HEADER_FORMAT = 'X-Bayes-Score: %1.2f\r\n'
HEADER_EXAMPLE = 'X-Bayes-Score: 0.12\r\n'


class Listener( asyncore.dispatcher ):
    """Listens for incoming socket connections and spins off dispatchers
    created by a factory callable."""
    
    def __init__( self, port, factory, factoryArgs=(),
                  socketMap=asyncore.socket_map ):
        asyncore.dispatcher.__init__( self, map=socketMap )
        self.socketMap = socketMap
        self.factory = factory
        self.factoryArgs = factoryArgs
        s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
        s.setblocking( False )
        self.set_socket( s, socketMap )
        self.set_reuse_addr()
        self.bind( ( '', port ) )
        self.listen( 5 )

    def handle_accept( self ):
        clientSocket, clientAddress = self.accept()
        args = [ clientSocket ] + list( self.factoryArgs )
        if self.socketMap != asyncore.socket_map:
            self.factory( *args, **{ 'socketMap': self.socketMap } )
        else:
            self.factory( *args )


class POP3ProxyBase( asynchat.async_chat ):
    """An async dispatcher that understands POP3 and proxies to a POP3
    server, calling `self.onTransaction( request, response )` for each
    transaction. Responses are not un-byte-stuffed before reaching
    self.onTransaction() (they probably should be for a totally generic
    POP3ProxyBase class, but BayesProxy doesn't need it and it would mean
    re-stuffing them afterwards).  self.onTransaction() should return the
    response to pass back to the email client - the response can be the
    verbatim response or a processed version of it.  The special command
    'KILL' kills it (passing a 'QUIT' command to the server)."""
    
    def __init__( self, clientSocket, serverName, serverPort ):
        asynchat.async_chat.__init__( self, clientSocket )
        self.request = ''
        self.isClosing = False
        self.set_terminator( '\r\n' )
        serverSocket = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
        serverSocket.connect( ( serverName, serverPort ) )
        self.serverFile = serverSocket.makefile()
        self.push( self.serverFile.readline() )
    
    def handle_connect( self ):
        """Suppress the asyncore "unhandled connect event" warning."""
        pass
    
    def onTransaction( self, command, args, response ):
        """Overide this.  Takes the raw request and the response, and
        returns the (possibly processed) response to pass back to the
        email client."""
        raise NotImplementedError
    
    def isMultiline( self, command, args ):
        """Returns True if the given request should get a multiline response
        (assuming the response is positive)."""
        if command in [ 'USER', 'PASS', 'APOP', 'QUIT',
                        'STAT', 'DELE', 'NOOP', 'RSET', 'KILL' ]:
            return False
        elif command in [ 'RETR', 'TOP' ]:
            return True
        elif command in [ 'LIST', 'UIDL' ]:
            return len( args ) == 0
        else:
            # Assume that unknown commands will get an error response.
            return False
    
    def readResponse( self, command, args ):
        """Reads the POP3 server's response.  Also sets self.isClosing to
        True if the server closes the socket, which tells found_terminator()
        to close when the response has been sent."""
        isMulti = self.isMultiline( command, args )
        responseLines = []
        isFirstLine = True
        while True:
            line = self.serverFile.readline()
            if not line:
                # The socket has been closed by the server, probably by QUIT.
                self.isClosing = True
                break
            elif not isMulti or ( isFirstLine and line.startswith( '-ERR' ) ):
                # A single-line response.
                responseLines.append( line )
                break
            elif line == '.\r\n':
                # The termination line.
                responseLines.append( line )
                break
            else:
                # A normal line - append it to the response and carry on.
                responseLines.append( line )
            
            isFirstLine = False
        
        return ''.join( responseLines )
    
    def collect_incoming_data( self, data ):
        """Asynchat override."""
        self.request = self.request + data
    
    def found_terminator( self ):
        """Asynchat override."""
        # Send the request to the server and read the reply.
        # XXX When the response is huge, the email client can time out.
        # It should read as much as it can from the server, then if the
        # response is still coming after say 30 seconds, it should classify
        # the message based on that and send back the headers and the body
        # so far.  Then it should become a simple one-packet-at-a-time proxy
        # for the rest of the response.
        if self.request.strip().upper() == 'KILL':
            self.serverFile.write( 'QUIT\r\n' )
            self.serverFile.flush()
            self.send( "+OK, dying.\r\n" )
            self.shutdown( 2 )
            self.close()
            raise SystemExit
        self.serverFile.write( self.request + '\r\n' )
        self.serverFile.flush()
        if self.request.strip() == '':
            # Someone just hit the Enter key.
            command, args = ( '', '' )
        else:
            splitCommand = self.request.strip().split( None, 1 )
            command = splitCommand[ 0 ].upper()
            args = splitCommand[ 1: ]
        rawResponse = self.readResponse( command, args )
        
        # Pass the request/reply to the subclass and send back its response.
        cookedResponse = self.onTransaction( command, args, rawResponse )
        self.push( cookedResponse )
        self.request = ''
        
        # If readResponse() decided that the server had closed its socket,
        # close this one when the response has been sent.
        if self.isClosing:
            self.close_when_done()
    
    def handle_error( self ):
        """Let SystemExit cause an exit."""
        type, v, t = sys.exc_info()
        if type == SystemExit:
            raise
        else:
            asynchat.async_chat.handle_error( self )
   

class BayesProxyListener( Listener ):
    """Listens for incoming email client connections and spins off
    BayesProxy objects to serve them."""
    
    def __init__( self, serverName, serverPort, proxyPort, bayes ):
        proxyArgs = ( serverName, serverPort, bayes )
        Listener.__init__( self, proxyPort, BayesProxy, proxyArgs )


class BayesProxy( POP3ProxyBase ):
    """Proxies between an email client and a POP3 server, inserting
    X-Bayes-Score headers.  It acts on the following POP3 commands:
    
     o STAT:
        o Adds the size of all the X-Bayes-Score headers to the maildrop
          size.
    
     o LIST:
        o With no message number: adds the size of an X-Bayes-Score header
          to the message size for each message in the scan listing.
        o With a message number: adds the size of an X-Bayes-Score header
          to the message size.
    
     o RETR:
        o Adds the X-Bayes-Score header based on the raw headers and body
          of the message.
    
     o TOP:
        o Adds the X-Bayes-Score header based on the raw headers and as much
          of the body as the TOP command retrieves.  This can mean that the
          header might have a different value for different calls to TOP, or
          for calls to TOP vs. calls to RETR.  I'm assuming that the email
          client will either not make multiple calls, or will cope with the
          headers being different.
    """

    def __init__( self, clientSocket, serverName, serverPort, bayes ):
        # Open the log file *before* calling __init__ for the base class,
        # 'cos that might call send or recv.
        self.bayes = bayes
        self.logFile = open( '_pop3proxy.log', 'wb' )
        POP3ProxyBase.__init__( self, clientSocket, serverName, serverPort )
        self.handlers = { 'STAT': self.onStat, 'LIST': self.onList,
                          'RETR': self.onRetr, 'TOP': self.onTop }
    
    def send( self, data ):
        """Logs the data to the log file."""
        self.logFile.write( data )
        self.logFile.flush()
        return POP3ProxyBase.send( self, data )
    
    def recv( self, size ):
        """Logs the data to the log file."""
        data = POP3ProxyBase.recv( self, size )
        self.logFile.write( data )
        self.logFile.flush()
        return data
    
    def onTransaction( self, command, args, response ):
        """Takes the raw request and response, and returns the (possibly
        processed) response to pass back to the email client."""
        handler = self.handlers.get( command, self.onUnknown )
        return handler( command, args, response )

    def onStat( self, command, args, response ):
        """Adds the size of all the X-Bayes-Score headers to the maildrop
        size."""
        match = re.search( r'^\+OK\s+(\d+)\s+(\d+)(.*)\r\n', response )
        if match:
            count = int( match.group( 1 ) )
            size = int( match.group( 2 ) ) + len( HEADER_EXAMPLE ) * count
            return '+OK %d %d%s\r\n' % ( count, size, match.group( 3 ) )
        else:
            return response
    
    def onList( self, command, args, response ):
        """Adds the size of an X-Bayes-Score header to the message
        size(s)."""
        if response.count( '\r\n' ) > 1:
            # Multiline: all lines but the first contain a message size.
            lines = response.split( '\r\n' )
            outputLines = [ lines[ 0 ] ]
            for line in lines[ 1: ]:
                match = re.search( '^(\d+)\s+(\d+)', line )
                if match:
                    number = int( match.group( 1 ) )
                    size = int( match.group( 2 ) ) + len( HEADER_EXAMPLE )
                    line = "%d %d" % ( number, size )
                outputLines.append( line )
            return '\r\n'.join( outputLines )
        else:
            # Single line.
            match = re.search( '^\+OK\s+(\d+)(.*)\r\n', response )
            if match:
                size = int( match.group( 1 ) ) + len( HEADER_EXAMPLE )
                return "+OK %d%s\r\n" % ( size, match.group( 2 ) )
            else:
                return response
    
    def onRetr( self, command, args, response ):
        """Adds the X-Bayes-Score header based on the raw headers and body
        of the message."""
        # Use '\n\r?\n' to detect the end of the headers in case of broken
        # emails that don't use the proper line separators.
        if re.search( r'\n\r?\n', response ):
            # Break off the first line, which will be '+OK'.
            ok, message = response.split( '\n', 1 )
            
            # Now find the spam probability and add the header.
            prob = self.bayes.spamprob( tokenizer.tokenize( message ) )
            headers, body = re.split( r'\n\r?\n', response, 1 )
            headers = headers + '\r\n' + HEADER_FORMAT % prob + '\r\n'
            return headers + body
        else:
            # Must be an error response.
            return response

    def onTop( self, command, args, response ):
        """Adds the X-Bayes-Score header based on the raw headers and as
        much of the body as the TOP command retrieves."""
        # Easy (but see the caveat in BayesProxy.__doc__).
        return self.onRetr( command, args, response )

    def onUnknown( self, command, args, response ):
        """Default handler - just returns the server's response verbatim."""
        return response


def createBayes( pickleName=None, useDB=False ):
    """Create a GrahamBayes object to score the emails."""
    bayes = None
    if useDB:
        bayes = hammie.PersistentGrahamBayes( pickleName )
    elif pickleName:
        try:
            fp = open( pickleName, 'rb' )
        except IOError, e:
            if e.errno <> errno.ENOENT:
                raise
        else:
            print "Loading database...",
            bayes = cPickle.load( fp )
            fp.close()
            print "Done."
    
    if bayes is None:
        bayes = GrahamBayes()
    
    return bayes
    

def main( serverName, serverPort, proxyPort, pickleName, useDB ):
    """Runs the proxy forever or until a 'KILL' command is received or
    someone hits Ctrl+Break."""
    bayes = createBayes( pickleName, useDB )
    BayesProxyListener( serverName, serverPort, proxyPort, bayes )
    asyncore.loop()



# ===================================================================
# Test code.
# ===================================================================

# One example of spam and one of ham - both are used to train, and are then
# classified.  Not a good test of the classifier, but a perfectly good test
# of the POP3 proxy.  The bodies of these came from the spambayes project,
# and I added the headers myself because the originals had no headers.

spam1 = """From: friend@public.com
Subject: Make money fast

Hello tim_chandler , Want to save money ?
Now is a good time to consider refinancing. Rates are low so you can cut
your current payments and save money.

http://64.251.22.101/interest/index%38%30%300%2E%68t%6D

Take off list on site [s5]
"""

good1 = """From: chris@example.com
Subject: ZPT and DTML

Jean Jordaan wrote:
> 'Fraid so ;>  It contains a vintage dtml-calendar tag.
>   http://www.zope.org/Members/teyc/CalendarTag
>
> Hmm I think I see what you mean: one needn't manually pass on the
> namespace to a ZPT?

Yeah, Page Templates are a bit more clever, sadly, DTML methods aren't :-(

Chris
"""

class TestListener( Listener ):
    """Listener for TestPOP3Server.  Works on port 8110, to co-exist with
    real POP3 servers."""
    
    def __init__( self, socketMap=asyncore.socket_map ):
        Listener.__init__( self, 8110, TestPOP3Server, socketMap=socketMap )


class TestPOP3Server( asynchat.async_chat ):
    """Minimal POP3 server, for testing purposes.  Doesn't support TOP or
    UIDL.  USER, PASS, APOP, DELE and RSET simply return "+OK" without doing
    anything.  Also understands the 'KILL' command, to kill it.  The mail
    content is the example messages in classifier.py."""
   
    def __init__( self, clientSocket, socketMap=asyncore.socket_map ):
        # Grumble: asynchat.__init__ doesn't take a 'map' argument, hence
        # the two-stage construction.
        asynchat.async_chat.__init__( self )
        asynchat.async_chat.set_socket( self, clientSocket, socketMap )
        self.maildrop = [ spam1, good1 ]
        self.set_terminator( '\r\n' )
        self.okCommands = [ 'USER', 'PASS', 'APOP', 'NOOP',
                            'DELE', 'RSET', 'QUIT', 'KILL' ]
        self.handlers = { 'STAT': self.onStat,
                          'LIST': self.onList,
                          'RETR': self.onRetr }
        self.push( "+OK ready\r\n" )
        self.request = ''
   
    def handle_connect( self ):
        """Suppress the asyncore "unhandled connect event" warning."""
        pass
    
    def collect_incoming_data( self, data ):
        """Asynchat override."""
        self.request = self.request + data
   
    def found_terminator( self ):
        """Asynchat override."""
        if ' ' in self.request:
            command, args = self.request.split( None, 1 )
        else:
            command, args = self.request, ''
        command = command.upper()
        if command in self.okCommands:
            self.push( "+OK (we hope)\r\n" )
            if command == 'QUIT':
                self.close_when_done()
            if command == 'KILL':
                raise SystemExit
        else:
            handler = self.handlers.get( command, self.onUnknown )
            self.push( handler( command, args ) )
        self.request = ''
   
    def handle_error( self ):
        """Let SystemExit cause an exit."""
        type, v, t = sys.exc_info()
        if type == SystemExit:
            raise
        else:
            asynchat.async_chat.handle_error( self )
   
    def onStat( self, command, args ):
        maildropSize = reduce( operator.add, map( len, self.maildrop ) )
        maildropSize += len( self.maildrop ) * len( HEADER_EXAMPLE )
        return "+OK %d %d\r\n" % ( len( self.maildrop ), maildropSize )
   
    def onList( self, command, args ):
        if args:
            number = int( args )
            if 0 < number <= len( self.maildrop ):
                return "+OK %d\r\n" % len( self.maildrop[ number - 1 ] )
            else:
                return "-ERR no such message\r\n"
        else:
            returnLines = [ "+OK" ]
            for messageIndex in range( len( self.maildrop ) ):
                size = len( self.maildrop[ messageIndex ] )
                returnLines.append( "%d %d" % ( messageIndex + 1, size ) )
            returnLines.append( "." )
            return '\r\n'.join( returnLines ) + '\r\n'
   
    def onRetr( self, command, args ):
        number = int( args )
        if 0 < number <= len( self.maildrop ):
            message = self.maildrop[ number - 1 ]
            return "+OK\r\n%s\r\n.\r\n" % message
        else:
            return "-ERR no such message\r\n"

    def onUnknown( self, command, args ):
        return "-ERR Unknown command: '%s'\r\n" % command


def test():
    """Runs a self-test using TestPOP3Server, a minimal POP3 server that
    serves the example emails above."""
    # Run a proxy and a test server in separate threads with separate
    # asyncore environments.
    import threading
    testServerReady = threading.Event()
    def runTestServer():
        testSocketMap = {}
        TestListener( socketMap=testSocketMap )
        testServerReady.set()
        asyncore.loop( map=testSocketMap )
    
    def runProxy():
        bayes = createBayes()
        BayesProxyListener( 'localhost', 8110, 8111, bayes )
        bayes.learn( tokenizer.tokenize( spam1 ), True )
        bayes.learn( tokenizer.tokenize( good1 ), False )
        asyncore.loop()

    threading.Thread( target=runTestServer ).start()
    testServerReady.wait()
    threading.Thread( target=runProxy ).start()
    
    # Connect to the proxy.
    proxy = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
    proxy.connect( ( 'localhost', 8111 ) )
    assert proxy.recv( 100 ) == "+OK ready\r\n"
    
    # Stat the mailbox to get the number of messages.
    proxy.send( "stat\r\n" )
    response = proxy.recv( 100 )
    count, totalSize = map( int, response.split()[ 1:3 ] )
    print "%d messages in test mailbox" % count
    assert count == 2
    
    # Loop through the messages ensuring that they have X-Bayes-Score
    # headers.
    for i in range( 1, count+1 ):
        response = ""
        proxy.send( "retr %d\r\n" % i )
        while response.find( '\n.\r\n' ) == -1:
            response = response + proxy.recv( 1000 )
        headerOffset = response.find( 'X-Bayes-Score' )
        assert headerOffset != -1
        headerEnd = headerOffset + len( HEADER_EXAMPLE )
        header = response[ headerOffset:headerEnd ].strip()
        print "Message %d: %s" % ( i, header )
    
    # Kill the proxy and the test server.
    proxy.sendall( "kill\r\n" )
    server = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
    server.connect( ( 'localhost', 8110 ) )
    server.sendall( "kill\r\n" )


# ===================================================================
# __main__ driver.
# ===================================================================

if __name__ == '__main__':
    # Read the arguments.
    try:
        opts, args = getopt.getopt( sys.argv[ 1: ], 'htdp:' )
    except getopt.error, msg:
        print >>sys.stderr, str( msg ) + '\n\n' + __doc__
        sys.exit()

    pickleName = hammie.DEFAULTDB
    useDB = False
    runTestServer = False
    for opt, arg in opts:
        if opt == '-h':
            print >>sys.stderr, __doc__
            sys.exit()
        elif opt == '-t':
            runTestServer = True
        elif opt == '-d':
            useDB = True
        elif opt == '-p':
            pickleName = arg
    
    # Do whatever we've been asked to do...
    if not opts and not args:
        print "Running a self-test (use 'pop3proxy -h' for help)"
        test()
        print "Self-test passed."   # ...else it would have asserted.
    
    elif runTestServer:
        print "Running a test POP3 server on port 8110..."
        TestListener()
        asyncore.loop()
    
    elif len( args ) == 1:
        # Named POP3 server, default port.
        main( args[ 0 ], 110, 110, pickleName, useDB )
    
    elif len( args ) == 2:
        # Named POP3 server, named port.
        main( args[ 0 ], int( args[ 1 ] ), 110, pickleName, useDB )
    
    else:
        print >>sys.stderr, __doc__