# modified from http://code.activestate.com/recipes/498245-lru-and-lfu-cache-decorators/ import collections import functools from itertools import ifilterfalse from heapq import nsmallest from operator import itemgetter class Counter(dict): 'Mapping where default values are zero' def __missing__(self, key): return 0 class LRUCache(object): ''' Least-recently-used cache decorator. Arguments to the cached function must be hashable. Cache performance statistics stored in .hits and .misses. Clear the cache with .clear(). Cache object can be accessed as f.cache, where f is the decorated function. http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used ''' def __init__(self, maxsize=100): self.maxsize = maxsize self.maxqueue = maxsize*10 self.store = dict() # mapping of args to results self.queue = collections.deque() # order that keys have been used self.refcount = Counter() # times each key is in the queue self.hits = self.misses = 0 def decorate(self, user_function, len=len, iter=iter, tuple=tuple, sorted=sorted, KeyError=KeyError): sentinel = object() # marker for looping around the queue kwd_mark = object() # separate positional and keyword # lookup optimizations (ugly but fast) store = self.store queue = self.queue refcount = self.refcount queue_append, queue_popleft = queue.append, queue.popleft queue_appendleft, queue_pop = queue.appendleft, queue.pop @functools.wraps(user_function) def wrapper(*args, **kwds): # cache key records both positional and keyword args key = args if kwds: key += (kwd_mark,) + tuple(sorted(kwds.items())) # record recent use of this key queue_append(key) refcount[key] += 1 # get cache entry or compute if not found try: result = store[key] self.hits += 1 except KeyError: result = user_function(*args, **kwds) store[key] = result self.misses += 1 # purge least recently used cache entry if len(store) > self.maxsize: key = queue_popleft() refcount[key] -= 1 while refcount[key]: key = queue_popleft() refcount[key] -= 1 del store[key], refcount[key] # periodically compact the queue by eliminating duplicate keys # while preserving order of most recent access if len(queue) > self.maxqueue: refcount.clear() queue_appendleft(sentinel) for key in ifilterfalse(refcount.__contains__, iter(queue_pop, sentinel)): queue_appendleft(key) refcount[key] = 1 return result wrapper.cache = self return wrapper def clear(self): self.store.clear() self.queue.clear() self.refcount.clear() self.hits = self.misses = 0 __call__ = decorate class LFUCache(object): '''Least-frequenty-used cache decorator. Arguments to the cached function must be hashable. Cache performance statistics stored in .hits and .misses. Clear the cache with .clear(). Cache object can be accessed as f.cache, where f is the decorated function. http://en.wikipedia.org/wiki/Least_Frequently_Used ''' def __init__(self, maxsize=100): self.maxsize = maxsize self.store = dict() # mapping of args to results self.use_count = Counter() # times each key has been accessed self.hits = self.misses = 0 def decorate(self, user_function): kwd_mark = object() # separate positional and keyword args # lookup optimizations (ugly but fast) store = self.store use_count = self.use_count @functools.wraps(user_function) def wrapper(*args, **kwds): key = args if kwds: key += (kwd_mark,) + tuple(sorted(kwds.items())) use_count[key] += 1 # get cache entry or compute if not found try: result = store[key] self.hits += 1 except KeyError: result = user_function(*args, **kwds) store[key] = result self.misses += 1 # purge least frequently used cache entry if len(store) > self.maxsize: for key, _ in nsmallest(self.maxsize // 10, use_count.iteritems(), key=itemgetter(1)): del store[key], use_count[key] return result wrapper.cache = self return wrapper __call__ = decorate def clear(self): self.store.clear() self.use_count.clear() self.hits = self.misses = 0 if __name__ == '__main__': @LRUCache(maxsize=20) def f(x, y): return 3*x+y domain = range(5) from random import choice for i in range(1000): r = f(choice(domain), choice(domain)) print(f.cache.hits, f.cache.misses) @LFUCache(maxsize=20) def f(x, y): return 3*x+y domain = range(5) from random import choice for i in range(1000): r = f(choice(domain), choice(domain)) print(f.cache.hits, f.cache.misses)