Update of /cvsroot/python/python/nondist/sandbox/collections
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21444
Added Files:
pairing_heap.py pairing_heap_test.py
Log Message:
Add Dan Stutzbach's first pass at paired heaps
(after modifying his test suite to use unittests).
The pure python version is being used to work out API issues, develop
thorough tests, and to tease out implementation issues.
Ultimately, this will be added as collections.heap
--- NEW FILE: pairing_heap.py ---
#!/usr/bin/python
"""
Note: this implementation is not currently thread-safe.
The code in this file implements a pairing heap. By default it
operates as a min-heap, but it can also work as a max-heap via a flag
passed to the heap's __init__ routing.
The important papers on pairing heaps are these:
[1] Michael L. Fredman, Robert Sedjewick, Daniel D. Sleator, and
Robert E. Tarjan. The Pairing Heap: A New Form of Self-Adjusting
Heap. Algorithmica 1:111-129, 1986.
[2] John T. Stasko and Jeffrey Scott Vitter. Pairing heaps:
experiments and analysis. Communications of the ACM, Volume 30, Issue
3, Pages 234-249, 1987.
[3] Michael L. Fredman. On the efficiency of pairing heaps and
related data structures. Journal of the ACM, Volume 46, Issue 4,
Pages 473-501, 1999.
There are actually several varieties of pairing heaps. Five are
described in [1], and two more are described in [2]. This
implementation is the basic pairing heaps described in [1], also known
as the twopass variant. There isn't much published evidence that any
of the variants are significantly better than the others.
The twopass pairing heap works as follows.
A pairing heap is a tree that maintains the heap property: each node
has a lower value than all its children. The pairing heap tree need
not be binary nor balanced. The good amortized efficiency of the
pairing heap results from clever algorithms for .adjust_key() and
.delete().
The basic operation in the pairing heap is the comparison-link. Two
nodes are compared, and the largest is linked as a child of the
smaller. To represent the tree, each node maintains a pointer to the
leftmost child and to its right sibling. Each node also maintains a
pointer to its parent. When a comparison-link is performed, the new
child becomes the leftmost child.
[1] suggests a memory optimization where only the right-most sibling
stores a pointer to the parent, and each node maintains a single bit
indicating if it is that rightmost sibling. This theoretically saves
the size of a pointer in each node, but only if we can store a bit for
free. However, storing a bit takes just as much space as storing a
pointer, due to padding. So, we bite the bullet and store the
pointer. Also, [3] shows that the amortized cost of .adjust_key()
is at least Omega(log log n) if the parent pointer is *not* stored.
The amortized cost may be better using the parent pointer.
For .adjust_key(), the topmost element is removed and set aside as
the return value. A left-to-right pass is made over its children,
comparison-linking them pairwise. For example, if there are 6
children labeled A, B, C, D, E, and F, A and B will be
comparison-linked, as will C and D, and E and F. Next, the rightmost
node is repeatedly comparison-linked with its sibling until there is
only one node left. This node is the new root.
For .delete(), first the node is cut from its parent. This results in
a tree rooted at the node being deleted. The .adjust_key()
procedure is invoked on the node, then the resulting tree is
comparison-linked with the original tree.
See [1] for more details. It also includes figures which are helpful
for understanding.
"""
class WrongHeap (Exception):
"""The user attempted to .delete() or .adjust() from the wrong heap.
This can also occur if the user attempts to delete or adjust a
node that has already been removed from the heap.
"""
def __str__(self):
return 'WrongHeap'
class Underflow (Exception):
"The user attempted to .peek() or extract() from an empty heap."
def __str__(self):
return 'Underflow'
class IncompatibleHeaps (Exception):
"The user attempted to meld heaps that have different compare operations."
def __str__(self):
return 'IncompatibleHeaps'
class InternalError (Exception):
"This should never get raised in practice."
def __str__(self):
return 'InternalError'
class WrongAdjustKeyDirection (Exception):
"The user attempted to adjust a key in the wrong direction."
def __str__(self):
return 'WrongAdjustKeyDirection'
class heap_node:
"""This class is used internally to implement the heap.
Users will have be able to see references to this data structure,
as this is neccessary so they can pass the reference to the
.adjust_key() and .delete() methods. However, the data structure
should be considered opaque. The only public methods are .value()
and .values().
Users should not instantiate heap_node on their own.
In the C implementation, all of this will be enforced.
"""
__slots__ = ('_item', '_child', '_sibling', '_parents')
def __init__(self, item):
self._item = item
self._clean()
def value(self):
"Return the value associated with this node."
return self._item
def _clean(self):
self._child = None
self._sibling = None
self._parent = None
def _add_child(self, node):
node._parent = self
node._sibling = self._child
self._child = node
def _cut(self):
"Remove this node and its children from this node's parent."
n = self._parent
if id(n._child) == id(self):
n._child = self._sibling
else:
n = n._child
try:
while id(n._sibling) != id(self):
n = n._sibling
n._sibling = self._sibling
except AttributeError:
raise InternalError
self._parent = None
self._sibling = None
def _extract(self, heap):
"Link this node's children as a prelude to this node's removal"
# Make the left-to-right pairing pass
children = []
n = self._child
while n and n._sibling:
next = n._sibling._sibling
pair = n._sibling
n._parent = None
n._sibling = None
pair._parent = None
pair._sibling = None
children.append(heap._link(n, pair))
n = next
if n:
children.append(n)
if not children:
return None
# Repeated comparison-link to the rightmost node
root = children[-1]
for i in range(len(children)-2,-1,-1):
root = heap._link(root, children[i])
root._parent = None
# Return the new root
return root
def values(self):
"Return an unsorted sequence of the values of this part of the tree."
rv = [self._item,]
if self._child:
rv += self._child.values()
if self._sibling:
rv += self._sibling.values()
return rv
class pairing_heap:
"Implements a min-heap using the pairing-heap algorithm."
def __init__(self, values=None, cmpfunc=None, key=None, reverse=False):
self._cmpfunc = cmpfunc
self._key = key
self._reverse = reverse
self._root = None
self._len = 0
if values is not None:
for i in values:
self.insert(i)
def empty(self):
"True if there is nothing in the heap."
return self._root is None
def peek(self):
"Return, but do not remove, the top of the heap."
if self._root:
return self._root._item
else:
raise Underflow
def __len__(self):
"Return the number of items in the heap."
return self._len
def _cmp(self, item1, item2):
"Compare two items using the heap's comparison functions."
if self._key:
item1 = getattr(item1, self._key)
item2 = getattr(item2, self._key)
if self._cmpfunc:
c = self._cmpfunc(item1, item2)
else:
c = cmp(item1, item2)
if self._reverse:
c = -c
return c
def _link(self, node1, node2):
"Perform the comparison-link operation."
if not node1: return node2
if not node2: return node1
c = self._cmp(node1._item, node2._item)
if c > 0:
node2._add_child (node1)
return node2
else:
node1._add_child (node2)
return node1
def insert(self, item):
"Insert an item into the heap."
node = heap_node(item)
self._root = self._link(node, self._root)
self._len += 1
return node
def meld(self, other):
"""Merge another heap into this one.
other will be an empty heap after completion
"""
if not isinstance(other, pairing_heap):
raise TypeError
# Check that these heaps are compatible
if self._cmpfunc != other._cmpfunc or self._reverse != other._reverse \
or self._key != other._key:
raise IncompatibleHeaps
self._root = self._link(other._root, self._root)
self._len += other._len
# Destroy other
other._len = 0
other._root = None
def adjust_key(self, node, item):
"""Adjust the value of node to item.
This must be decrease for a min-heap or an increase for a max-heap.
If node is not part of this heap, both heaps may become gibberish.
"""
self._check_heap_node (node)
if self._cmp(node._item, item) < 0:
raise WrongAdjustKeyDirection
node._item = item
if id(self._root) == id(node):
return
# Cut this node out of the tree
node._cut()
# Link it with the root
self._root = self._link(node, self._root)
def delete(self, node):
"""Delete a node from the middle of the heap.
If node is not part of this heap, both heaps may become gibberish.
"""
self._check_heap_node (node)
if id(self._root) == id(node):
self.extract()
return
node._cut()
self._root = self._link (node._extract(self), self._root)
self._len -= 1
node._clean()
return node._item
def extract(self, n=0):
"Extract the top value of the heap."
if n > 0:
return [self.extract() for i in xrange(n)]
if not self._root:
raise Underflow
old_root = self._root
self._root = self._root._extract(self)
self._len -= 1
old_root._clean()
return old_root._item
def extract_all(self):
"Empty the heap into a sorted list of all the values"
return [self.extract() for i in xrange(self._len)]
def values(self):
"Return an unsorted list of all the values in the heap"
return self._root.values()
def _check_heap_node(self, node):
"""Raise an error if node will cause problems.
This function is a simple check to catch certain user errors.
It will raise TypeError if node isn't a heap_node object at
all. It *may* raise WrongHeap if node isn't part of this heap.
It will return if node is part of this heap.
However, no guarantees are made about the return value if node
is a heap_node, but part of a different heap.
"""
if not isinstance(node, heap_node):
raise TypeError
if self._root == node:
return
if not node._parent:
raise WrongHeap
return
--- NEW FILE: pairing_heap_test.py ---
#!/usr/bin/python
from pairing_heap import *
import random
import unittest
datasize = 256
class Holder:
def __init__(self, item):
self.xyzzy = item
def check_invariant(heap, node=None):
if node == None:
node = heap._root
if node == None:
return True
n2 = node._child
while n2:
if heap._cmp(node._item, n2._item) >= 0:
return False
check_invariant(heap, n2)
n2 = n2._sibling
return True
# Uncomment to speed things up when doing code-coverage analysis
##def check_invariant(heap, node=None):
## return True
class TestPairingHeap(unittest.TestCase):
def setUp(self):
self.data = [random.random() for i in xrange(datasize)]
self.data_sorted = sorted(self.data)
def test_random(self):
# Push random numbers and pop them off, verifying all's OK.
heap = pairing_heap()
self.assert_(check_invariant(heap))
for item in self.data:
heap.insert(item)
self.assert_(check_invariant(heap))
results = []
while not heap.empty():
item = heap.extract()
self.assert_(check_invariant(heap))
results.append(item)
self.assertEqual(self.data_sorted, results)
def test_naive_nbest(self):
heap = pairing_heap()
for item in self.data:
heap.insert(item)
if len(heap) > 10:
heap.extract()
result = heap.extract_all()
self.assertEqual(result, self.data_sorted[-10:])
def test_underflow(self):
heap = pairing_heap()
self.assertRaises(Underflow, heap.extract)
self.assertRaises(Underflow, heap.peek)
def test_return_unsorted(self):
heap = pairing_heap()
for item in self.data:
heap.insert(item)
self.assertEqual(set(heap.values()), set(self.data))
def test_implicit_creation_and_peek(self):
heap = pairing_heap(self.data)
self.assert_(check_invariant(heap))
self.assertEqual(heap.peek(), self.data_sorted[0])
self.assert_(check_invariant(heap))
result = heap.extract_all()
self.assertEqual(result, self.data_sorted)
def test_extract_with_arg(self):
heap = pairing_heap(self.data)
low10 = heap.extract(10)
self.assert_(check_invariant(heap))
self.assertEqual(low10, self.data_sorted[:10])
def test_deleting_from_the_middle(self):
heap = pairing_heap()
nodes = [heap.insert(item) for item in self.data]
data2 = self.data[:]
count = 0
for i in xrange(datasize-1,-1,-5):
heap.delete(nodes[i])
del nodes[i]
self.assert_(check_invariant(heap))
count += 1
self.assertEqual(datasize-count, len(heap))
del data2[i]
result = heap.extract_all()
data2_sorted = data2[:]
data2_sorted.sort()
self.assertEqual(data2_sorted, result)
def test_deleting_everything_in_random_order(self):
heap = pairing_heap()
nodes = [heap.insert(item) for item in self.data]
for i in xrange(datasize):
t = random.randrange(len(heap))
heap.delete(nodes[t])
del nodes[t]
self.assert_(check_invariant(heap))
self.assert_(heap.empty())
def test_adjust_key(self):
heap = pairing_heap()
nodes = [heap.insert(item) for item in self.data]
data2 = []
for i in xrange(datasize):
data2.append(min(self.data[i], random.random()))
idx = range(datasize)
random.shuffle(idx)
for i in idx:
heap.adjust_key(nodes[i], data2[i])
self.assert_(check_invariant(heap))
self.assertEqual(sorted(data2), heap.extract_all())
def test_build_tree_by_melding(self):
heap = pairing_heap()
i = 0
while i < datasize:
j = max(random.randrange(min(datasize - i, 5)), 1)
heap2 = pairing_heap(self.data[i:i+j])
i += j
heap.meld(heap2)
self.assert_(check_invariant(heap))
self.assertEqual(self.data_sorted, heap.extract_all())
def test_cmpfunc(self):
reverse = lambda x, y: -cmp(x,y)
heap = pairing_heap(self.data, cmpfunc=reverse)
self.assertEqual(sorted(self.data, reverse=True), heap.extract_all())
def test_reverse(self):
heap = pairing_heap(self.data, reverse=True)
self.assertEqual(sorted(self.data, reverse=True), heap.extract_all())
def test_cmpfunc_with_reverse(self):
reverse = lambda x, y: -cmp(x,y)
heap = pairing_heap(self.data, cmpfunc=reverse, reverse=True)
self.assertEqual(self.data_sorted, heap.extract_all())
def test_key(self):
holders = [Holder(item) for item in self.data]
heap = pairing_heap(holders, key='xyzzy')
results = heap.extract_all()
self.assertEqual(self.data_sorted,[item.xyzzy for item in results])
def test_extract_values_from_nodes(self):
heap = pairing_heap()
nodes = [heap.insert(datum) for datum in self.data]
for i in xrange(len(nodes)):
self.assertEqual(nodes[i].value(), self.data[i])
self.assert_(check_invariant(heap))
def test_meld_with_incompatible_heap(self):
heap1 = pairing_heap()
reverse = lambda x, y: -cmp(x,y)
self.assertRaises(IncompatibleHeaps, heap1.meld, pairing_heap(cmpfunc = reverse))
self.assertRaises(IncompatibleHeaps, heap1.meld, pairing_heap(reverse = True))
self.assertRaises(IncompatibleHeaps, heap1.meld, pairing_heap(key = 'xyzzy'))
def test_adj_key_wrong_direction(self):
heap = pairing_heap ()
nodes = [heap.insert(datum) for datum in range(5)]
self.assertRaises(WrongAdjustKeyDirection, heap.adjust_key, nodes[4], 100)
def test_adjust_key_on_existing_root(self):
heap = pairing_heap ()
data = self.data
nodes = [heap.insert(datum) for datum in data]
low = 0
for i in xrange(1, len(data)):
if data[i] == data[low]:
low_list.append (i)
if data[i] < data[low]:
low = i
low_list = [i]
for i in low_list:
heap.adjust_key(nodes[i], data[i] - random.random())
def test_bogus_node_parameter(self):
heap = pairing_heap ()
nodes = [heap.insert(datum) for datum in self.data]
self.assertRaises(TypeError, heap.adjust_key, "muhahahah!", 5)
self.assert_(check_invariant(heap))
self.assertRaises(TypeError, heap.delete, "muhahahah!")
def test_semibogus_node_parameter(self):
holders = [Holder(item) for item in self.data]
heap = pairing_heap(key='xyzzy')
nodes = [heap.insert(datum) for datum in holders]
for i in xrange(len(holders)):
holders[i].node = nodes[i]
top = heap.extract ()
self.assertRaises(WrongHeap, heap.delete, top.node)
self.assert_(check_invariant(heap))
if __name__ == '__main__':
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestPairingHeap))
unittest.TextTestRunner(verbosity=2).run(suite)