[Conferences] List settings - answer to private address

Carl Karsten carl at personnelware.com
Wed Apr 21 03:16:20 CEST 2010


On Tue, Apr 20, 2010 at 5:40 PM, Laura Creighton <lac at openend.se> wrote:
> In a message of Tue, 20 Apr 2010 16:29:37 EDT, Carl Karsten writes:
>
> <a python program>
>
> ....
>
> Way cool Carl. :-)

Thanks.  version 1 of that code was my first Python ever.  It was so
easy I completely missed how cool Python was.   I seriously thought
something like "huh, I guess this is like a better bash."   Then  2
years later I had reason to look at Python again, and only then did I
get it.   I kick myself often.

>
> Mind If I send your solutuion to others I find who don't want to give
> up their mailers?

Not at all.  here is the whole thing, complete with my honey pot and
white list anti-spam processing tangled up in it.


#!/usr/bin/python

import sys, email, email.Message, MySQLdb, re, string

def DebugOut( tcX ):
    if debugOutput:
        print tcX

# Define the WhereConstuct class
class WhereConstuct:

    def __init__(self):
        self.lWhereClauseExpressions = []
        self.lValues = []

    def addToWhereClause(self, tuValue, tcExpr ):
        tuValue = tuValue.strip()
        if tuValue:
            DebugOut( "#1 tuValue='" + tuValue + "'" )
            self.lValues = self.lValues + [tuValue]
            self.lWhereClauseExpressions =
self.lWhereClauseExpressions + [tcExpr+"=%s"]

    def getValues(self):
        return tuple( self.lValues )
    def getWhereClause(self, tcOperator ):
        return string.join( self.lWhereClauseExpressions, " " +
tcOperator + " " )


def getCursor():

    import connectInfo

    user = connectInfo.user
    host = connectInfo.host
    db = connectInfo.db
    password = connectInfo.password

    connection = MySQLdb.Connect(user=user, host=host, db=db, passwd=password)

    return connection.cursor()

def LogStat( tcDesc ):
    if not cursor.execute(
        "update Stats set nCount = nCount + 1 where cDesc = %s", tcDesc ):
        cursor.execute(
            "insert into Stats ( nCount, cDesc ) values ( 1, %s )", tcDesc )

def MarkMessage( tcMessage, tcSpamness ):
    DebugOut( tcSpamness )
    LogStat( tcSpamness )
    tcMessage.add_header("X-Spamness", tcSpamness )
    return True

def InWhiteList( txCursor, tcName, tcAddr ):

    oWhere = WhereConstuct()
    DebugOut( "tcName:'" + tcName + "'" )
    oWhere.addToWhereClause( tcName, "cFromName" )
    oWhere.addToWhereClause( tcAddr, "cFromAddr" )

    lcWhere = oWhere.getWhereClause("or")

    if lcWhere:
        llVars = oWhere.getValues()
        DebugOut( lcWhere )
        DebugOut( llVars )
        return txCursor.execute("update WhiteList "
          "set nCount = nCount +1"
                   " where " + lcWhere, llVars )

    return False

def HasCodeWord( txCursor, tcBody ):
    # Search the message for some tagline
    return False

def IsSpamSubject( txCursor, tcSubject, rcRule ):

    if not tcSubject:
        rcRule = "None"
        return True
    elif re.search( '.[\s\.]{10,}.', tcSubject ):
        rcRule = "10 spaces or dots"
        return True
    elif re.search( '[\x00-\x1f,\x7b-\xff]{10,}', tcSubject ):
        rcRule = "Odd Chars"
        return True
    elif re.search( '(&#[0-9]*;){3,}', tcSubject ):
        rcRule = "html"
        return True
    elif len( tcSubject ) > 20 and tcSubject.count(' ') > 2:

        txCursor.execute("select kSpamSubjects_pk "
                       "  from SpamSubjects "
                       " where cSubject = %s ", (tcSubject) )

        dataSet = cursor.fetchall()
        numberOfEntries = len(dataSet)

        if numberOfEntries > 0:

            kSpamSubjects = dataSet[0][0]
            cursor.execute("update SpamSubjects "
                "set nBlocks = nBlocks + 1"
                " where kSpamSubjects_pk = %s ",  (kSpamSubjects))
            rcRule = "Found in List"
            return True
    return False

def IsSpamFrom(txCursor, tcName, tcAddr):

    txCursor.execute("select kBlackList_pk "
                    "  from BlackList "
                    " where cFromName=%s "
                    "   and cFromAddr=%s ", (tcName, tcAddr))

    dataSet = txCursor.fetchall()
    numberOfEntries = len(dataSet)

    if numberOfEntries > 0:
        lkBlackList = dataSet[0][0]
        txCursor.execute("update BlackList "
        "set nBlocks = nBlocks + 1"
        " where kBlackList_pk = %i ", (lkBlackList))
        return True
    else:
        return False

# begin main

# look for command line parameters
debugOutput = False
for arg in sys.argv:
    if arg == '-d':
        debugOutput = True
"""
for argNum in range(len(sys.argv)):
 # if argNum == 0:
  # ignore, as it is the name of this script
    if sys.argv[argNum] == '-d':
        debugOutput = True
"""

# Check to see if anything was piped in. If not, don't continue
if sys.stdin.isatty():
    DebugOut( "Expected a piped email message, got nothing." )
else:
    # Get stdin text (sys.stdin is a file object):
    stdin = sys.stdin.read()

    message = email.message_from_string(stdin)

    fromHeader = message["From"]
    lcSubject = message["Subject"]
    lcBody = message["Body"]

    if lcSubject:
        DebugOut( "lcSubject %s." % lcSubject )

    if fromHeader:
            DebugOut( "Message is from %s." % fromHeader )

    if lcBody:
            DebugOut( "Body %s" % lcBody )

    # Split the From header into separate name and address fields:
    # lcName, lcAddress = email.Message.utils.parseaddr(fromHeader)
    lcName, lcAddress = email.Message.Utils.parseaddr(fromHeader)
    DebugOut( " Name: %s\n Addr: %s" % (lcName, lcAddress) )

    cursor = getCursor()
    lcRule = "test"

    del message["X-Priority"]

    if InWhiteList(cursor, lcName, lcAddress):
        MarkMessage( message, "Ham - whitelist name>" + lcName + "<" )
        message["X-Priority"] = "2"

    elif HasCodeWord( cursor, lcBody ):
        MarkMessage( message, "Ham - CodeWord." )
    elif IsSpamSubject( cursor, lcSubject, lcRule ):
        MarkMessage( message, "Spam - Subject " + lcRule )
    elif IsSpamFrom( cursor, lcName, lcAddress ):
        MarkMessage( message, "Spam - blacklist." )
    else:
        MarkMessage( message, "Ham - Unknown." )

    message.add_header("X-MI-Name", lcName )
    message.add_header("X-MI-Addr", lcAddress )

    # Munge the Reply-To for lists that don't.
    if not message["Reply-To"]:

        # Look for an address here...
        # check X-BeenThere, List-Post, Cc, Return-Path
        xrt=message.get('X-BeenThere')
        if not xrt:
            # or burried in here:
            xrt=message.get('List-Post')
            if xrt:
                xrt = xrt.split(':')[1].rstrip('>')
            else:
                xrt=message.get('Cc')
                if not xrt:
                    # Return-Path: <owner-nanog at merit.edu>
                    xrt = message.get('Return-Path')

        message["X-xrt"] = xrt

        # if we have an address, see if the domain is special
        if xrt:
            specials=['python.org','lists.idyll.org',
                    'lists.ubuntu.com', 'ubuntu.org',
                    'lists.sourceforge.net',
                    'lists.mysql.com',
                    'redhat.com', ]

            if True in [special in xrt for special in specials]:
                message["Reply-To"] = xrt

    print message
    sys.exit(0)





-- 
Carl K


More information about the Conferences mailing list