# # First attempt at a fuzzy compare of two addresses using a form of Edit Distance algorithm on tokens # v0.5 # Andrew McLean, 23 January 2005 # # The main routine editDistance takes two lists of tokens and returns a distance measure # Allowed edits are replace, insert, delete and concatenate a pair of tokens. # The cost of these operations depends on the value of the tokens and their position within the sequence. # # The tokens consist of a tuple containing a string representation and it's soundex encoding # The program assumes that some normalisation has already been carried out, for instance converting all # text to lowercase. # # The routine has undergone limited testing, but it appeared to work quite well for my application, # with a reasonablly low level of false positives # # I'm not convinced that I have got the logic quite right in the dynamic programming, dealing correctly with # token pair concatenation is non-trivial. # # It would be neater to have an out of band flag for impossible/infinite cost. Could abstract this into a # Cost class. But I am a bit concerned about efficiency. Could use a negative number for infinite cost # and use a modified min function to reflect this. The approach I am using with a very big number for INFINITY # will be fine for any sensible tokens relating to addresses. # # The code could probably do with more test caases. # # Also, if I was going to refactor the code I would either # 1. Make this a bit more object oriented by introducing a Token class. # 2. Not precompute the soundex encodings. It is probably sufficient to use a memoized soundex routine. # # Standard library module imports import re, sys, os # Kludge! sys.path.append(os.path.abspath('../ZODB')) # Paul Moore's Memoize class from Python cookbook # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52201 from memoize import Memoize # Public domain soundex implementation, by Skip Montanaro, 21 December 2000 # http://manatee.mojam.com/~skip/python/soundex.py import soundex # Memoize soundex for speed get_soundex = Memoize(soundex.get_soundex) # List of numbers spelt out numbers_spelt = ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven', 'twelve', 'thirteen', 'fourteen', 'fifteen', 'sixteen', 'seventeen', 'eighteen', 'nineteen', 'twenty'] digitsToTextMap = dict([(str(i+1), numbers_spelt[i]) for i in range(len(numbers_spelt))]) # Set up a dictonary mapping abbreviations to the full text version # Each abbreviation can map to a single string expansion or a list of possible expansions abbrev = {'cott': 'cottage', 'rd':'road', 'fm':'farm', 'st':['street', 'saint']} # ... include number to text mapping abbrev.update(digitsToTextMap) # Regular expression to find tokens containing a number contains_number = re.compile('\d') # ... could include numbers spelt in words, but it causes more problems than it solves # ... as lots of words include character sequences like "one" ##contains_number = re.compile('\d|'+'|'.join(numbers_spelt)) # List of minor tokens minorTokenList = ['the', 'at'] # Various weights POSITION_WEIGHT = 1.2 REPLACEMENT_COST = 50 SOUNDEX_MATCH_COST = 0.95 PLURAL_COST = 0.25 ABBREV_COST = 0.25 INSERT_TOKEN_WITH_NUMBER = 5 INSERT_TOKEN_AFTER_NUMBER = 10 INSERT_TOKEN = 2 INSERT_MINOR_TOKEN = 0.5 CONCAT_COST = 0.2 INFINITY = 10000000 def containsNumber(token): """Does the token contain any digits""" return contains_number.match(token[0]) # Memoize it containsNumber = Memoize(containsNumber) def replaceCost(token1, token2, pos, allowSoundex=True): """Cost of replacing token1 with token2 (or vice versa) at a specific normalised position within the sequence""" # Make sure token1 is shortest m, n = len(token1[0]), len(token2[0]) if m > n: token1, token2 = token2, token1 m, n = n, m # Look for exact matches if token1[0] == token2[0]: return 0 # Look for plurals if (n - m == 1) and (token2[0] == token1[0] + 's'): return PLURAL_COST # Look for abbreviations try: expansion = abbrev[token1[0]] except KeyError: pass else: if type(expansion) == list and token2[0] in expansion: return ABBREV_COST elif type(expansion) == str and token2[0] == expansion: return ABBREV_COST # Look for soundex matches of tokens that don't contain numbers # and which are of similar lengths. # We don't allow soundex matching of concatenated tokens, otherwise # you get problems. Consider two sequences [T1,T2,T3,T4] and [T1,T3,T4], # T1 and T2 are concatenated in seq1 and soundex match with T1 in seq2, with a lower # cost than T2 being delete from seq1. if allowSoundex and \ (not containsNumber(token1)) and \ (not containsNumber(token2)) and \ (len(token2[0]) - len(token1[0]) < 3) and \ (token1[1] == token2[1]): return SOUNDEX_MATCH_COST # No match seen return REPLACEMENT_COST + POSITION_WEIGHT * (1-pos) def insCost(tokenList, indx, pos): """The cost of inserting a specific token at a specific normalised position along the sequence.""" if containsNumber(tokenList[indx]): return INSERT_TOKEN_WITH_NUMBER + POSITION_WEIGHT * (1 - pos) elif indx > 0 and containsNumber(tokenList[indx-1]): return INSERT_TOKEN_AFTER_NUMBER + POSITION_WEIGHT * (1 - pos) elif tokenList[indx][0] in minorTokenList: return INSERT_MINOR_TOKEN else: return INSERT_TOKEN + POSITION_WEIGHT * (1 - pos) def delCost(tokenList, indx, pos): """The cost of deleting a specific token at a specific normalised position along the sequence. This is exactly the same cost as inserting a token.""" return insCost(tokenList, indx, pos) def minmin(L): """Overall minimum of a list of lists""" return min([min(x) for x in L]) def concat(token1, token2): """Concatenate two tokens consisting of (string, soundex code) tuples calculating the soundex code of the result of concatenating the strings. Use a memoized soundex algorithm""" tok = token1[0] + token2[0] return (tok, get_soundex(tok)) def editDistance(seq1, seq2): """Edit distance of two lists of (string, soundex code) tuples using a dynamic programming technique. The edits allowed are inserting, deleting and replacing tokens and concatenating a pair of tokens. Concatenating runs of more than two tokens is not allowed. The weights vary according to the values of the tokens and their position within the sequence.""" # Uses some ideas 'borrowed' from Alex Martelli # http://wdiff.progiciels-bpi.ca/showfile.html?name=ideas/courriel/fuzzy-diffs&index=4 # Length of the token lists m = len(seq1) n = len(seq2) # Length of the shortest (used for normalising position within the sequence) minmn = float(min(m, n)) # Initialise the nested list of optimal distances # It is indexed as d[i+1][j+1][k][l] # It represents the best distance between seq1[:i] and seq2[:j] where: # k=1 if seq1[i-1] and seq1[i] are concatenated to acheive this value and k=0 if they are not # l=1 if seq2[j-1] and seq1[j] are concatenated to acheive this value and l=0 if they are not d = [[[[0,0] for k in range(2)] for j in range(n+1)] for i in range(m+1)] # Initialisation of the cases where one of the sequences is of length zero # these costs simply involve the cost of deleting or inserting. Concatenation # is not relevant and is assigned infinite cost for i in range(m): d[i+1][0][0][0] = d[i][0][0][0] + delCost(seq1, i, 0) for i in range(m+1): d[i][0][1][0] = INFINITY d[i][0][0][1] = INFINITY d[i][0][1][1] = INFINITY for j in range(n): d[0][j+1][0][0] = d[0][j][0][0] + insCost(seq2, j, 0) for j in range(n+1): d[0][j][1][0] = INFINITY d[0][j][0][1] = INFINITY d[0][j][1][1] = INFINITY # Now do the real work populating the cost matrix d for i in range(m): for j in range(n): # Work out the cheapest values for d[i+1][j+1][*][*] # No concatenation [0][0] replace = min(d[i][j][0][0], d[i][j][0][1], d[i][j][1][0], d[i][j][1][1]) \ + replaceCost(seq1[i], seq2[j], min(i,j)/minmn) insert = min(d[i+1][j][0][0], d[i+1][j][0][1], d[i+1][j][1][1]) + insCost(seq2, j, j/minmn) delete = min(d[i][j+1][0][0], d[i][j+1][1][0], d[i][j+1][1][1]) + delCost(seq1, i, i/minmn) d[i+1][j+1][0][0] = min(replace, insert, delete) # Concatenate seq1 only [1][0] if i == 0: # Concatenation not possible so it gets INFINITE cost best = INFINITY else: replace = d[i-1][j][0][0] + \ replaceCost(concat(seq1[i-1], seq1[i]), seq2[j], min(i-1,j)/minmn, allowSoundex=False) + CONCAT_COST insert = d[i+1][j][1][0] + insCost(seq2, j, j/minmn) best = min(replace, insert) d[i+1][j+1][1][0] = best # Concatenate seq2 only [0][1] if j == 0: # Concatenation not possible so it gets INFINITE cost best = INFINITY else: replace = d[i][j-1][0][0] + \ replaceCost(seq1[i], concat(seq2[j-1], seq2[j]), min(i,j-1)/minmn, allowSoundex=False) + CONCAT_COST delete = d[i][j+1][0][1] + delCost(seq1, i, i/minmn) best = min(replace, delete) d[i+1][j+1][0][1] = best # Concatenate both seq1 and seq2 [1][1] if (i == 0) or (j == 0): # Concatenation not possible so it gets INFINITE cost replace = INFINITY else: replace = d[i-1][j-1][0][0] + \ replaceCost(concat(seq1[i-1], seq1[i]), concat(seq2[j-1], seq2[j]), min(i-1,j-1)/minmn, allowSoundex=False) + CONCAT_COST * 2 d[i+1][j+1][1][1] = replace return minmin(d[m][n]) ################################################################################################## # The rest of the file relates solely to test cases def addSoundex(seq): """Take a list of strings and return a list of (string, soundex code) tuples""" return [(a, get_soundex(a)) for a in seq] def stringifyTokens(tokens): """ Return a list of the string part of a list of tokens.""" return [t[0] for t in tokens] def editDistanceTest(a, b, d): """Test editDifference""" a, b = addSoundex(a), addSoundex(b) # Check (a,b) result1 = editDistance(a, b) passed1 = (result1 == d) if not passed1: print '%s, %s, result: %f, expected: %f' % \ (stringifyTokens(a), stringifyTokens(b), result1, d) # Also check (b,a) # The wieghts I am giving to the operations mean that # editDistance(a,b) == editDistance(b,a) result2 = editDistance(b, a) passed2 = (result2 == d) if not passed2: print '%s, %s, result: %f, expected: %f' % \ (stringifyTokens(b), stringifyTokens(a), result1, d) # Set return code return (passed1 and passed2) if __name__ == '__main__': # A list of test cases testList = [ \ (['apple', 'bridport'], ['apple', 'bridport'], 0), (['bridport'], ['apple', 'bridport'], INSERT_TOKEN + POSITION_WEIGHT), (['apple'], ['apple', 'bridport'], INSERT_TOKEN + POSITION_WEIGHT * (1 - 1/1.)), (['apple', 'cerne'], ['apple', 'bridport'], 2 * (INSERT_TOKEN + POSITION_WEIGHT * (1 - 1/2.))), (['pear', 'apple', 'cerne'], ['pear', 'apple', 'bridport'], 2 * (INSERT_TOKEN + POSITION_WEIGHT * (1 - 2/3.))), (['pear', 'apple', 'cat', 'portesham'], ['pear', 'apple', 'bridport', 'portesham'], 2 * (INSERT_TOKEN + POSITION_WEIGHT * (1 - 2/4.))), (['boxer', 'codeword'], ['apple', 'boxer', 'codeword'], INSERT_TOKEN + POSITION_WEIGHT * (1 - 0/2.)), (['apple', 'codeword'], ['apple', 'boxer', 'codeword'], INSERT_TOKEN + POSITION_WEIGHT * (1 - 1/2.)), (['apple', 'code', 'word'], ['apple', 'boxer', 'codeword'], INSERT_TOKEN + POSITION_WEIGHT * (1 - 1/3.) + CONCAT_COST), (['steepleford', 'acacia', 'dorchester'], ['steepleford', 'acacia', 'road', 'dorchester'], INSERT_TOKEN + POSITION_WEIGHT * (1-2/3.) ), (['acacia', 'dorchester'], ['acacia', 'road', 'dorchester'], INSERT_TOKEN + POSITION_WEIGHT * (1 - 1/2.) ), (['st', 'michael'], ['saint', 'michaels'], ABBREV_COST + PLURAL_COST), # No soundex matches on concatenation (['whytecliff', 'bridport'], ['white', 'cliff', 'bridport'], INSERT_TOKEN * 3 + POSITION_WEIGHT * ( (1-0/2.) + (1-0/2.) + (1-1/2.))), (['apple', 'bridport', 'one'], ['apple', 'bridport', '1'], ABBREV_COST), (['1', 'two', 'cott'], ['one', '2', 'cottage'], ABBREV_COST*3), (['apple', 'bridport'], ['apple', 'bridport', 'the'], INSERT_MINOR_TOKEN), (['apple', 'bridport'], ['apple', 'the', 'bridport'], INSERT_MINOR_TOKEN), (['apple', 'bridport'], ['the', 'apple', 'bridport'], INSERT_MINOR_TOKEN), (['apple', 'one', 'bridport'], ['apple', '1', 'bridport'], ABBREV_COST), (['apple', 'codeword'], ['apple', 'code', 'word'], CONCAT_COST), (['farmhouse', 'bridport'], ['farm', 'house', 'bridport'], CONCAT_COST), (['apple', 'codeword', 'fred'], ['apple', 'code', 'word', 'fred'], CONCAT_COST), (['apple', 'codeword'], ['apple', 'code', 'word'], CONCAT_COST), (['a', 'pplication'], ['app', 'lication'], CONCAT_COST*2), (['a', 'pplication', 'fred'], ['app', 'lication', 'fred'], CONCAT_COST*2), (['george', 'a', 'pplication', 'fred'], ['george', 'app', 'lication', 'fred'], CONCAT_COST*2), (['5', 'Bridport', 'Road'], ['5', 'Woodlands', 'Bridport', 'Road'], INSERT_TOKEN_AFTER_NUMBER + POSITION_WEIGHT * (1 - 1/3.)), (['5', 'Bridport', 'Road'], ['Firlake', '5', 'Bridport', 'Road'], INSERT_TOKEN + POSITION_WEIGHT * (1 - 0/3.)), (['apple'], ['apple'], 0)] # Count of failed tests nFailed = 0 for test in testList: passed = editDistanceTest(test[0], test[1], test[2]) if not passed: nFailed += 1 # Print a summanry of the test results if nFailed: print "Failed %d tests out of %d" % (nFailed, len(testList)) else: print "Passed all %d tests" % len(testList)