On 6-Jul-07, at 6:45 AM, Yaakov Nemoy wrote:
I can do the other three parts, but I am wondering, how do I write a deterministic test unit for my patch? How is it done with the threading model in python in general?
I don't know how it is done in general, but for reference, here are some of the unittests for my read/write lock class: def testReadCount(self): wrlock = ReadWriteLock() read, write = wrlock.reader, wrlock.writer self.assertEqual(wrlock.readerCount, 0) read.acquire() self.assertEqual(wrlock.readerCount, 1) read.acquire() self.assertEqual(wrlock.readerCount, 2) read.release() self.assertEqual(wrlock.readerCount, 1) read.release() self.assertEqual(wrlock.readerCount, 0) def testContention(self): wrlock = ReadWriteLock() read, write = wrlock.reader, wrlock.writer class Writer(Thread): gotit = False def run(self): write.acquire() self.gotit = True write.release() writer = Writer() self.assertEqual(wrlock.readerCount, 0) read.acquire() self.assertEqual(wrlock.readerCount, 1) writer.start() self.assertFalse(writer.gotit) read.acquire() self.assertEqual(wrlock.readerCount, 2) self.assertFalse(writer.gotit) read.release() self.assertEqual(wrlock.readerCount, 1) self.assertFalse(writer.gotit) read.release() self.assertEqual(wrlock.readerCount, 0) time.sleep(.1) self.assertTrue(writer.gotit) def testWRAcquire(self): wrlock = ReadWriteLock() read, write = wrlock.reader, wrlock.writer self.assertEqual(wrlock.readerCount, 0) write.acquire() write.acquire() write.release() write.release() read.acquire() self.assertEqual(wrlock.readerCount, 1) read.acquire() self.assertEqual(wrlock.readerCount, 2) read.release() self.assertEqual(wrlock.readerCount, 1) read.release() self.assertEqual(wrlock.readerCount, 0) write.acquire() write.release() def testOwnAcquire(self): wrlock = ReadWriteLock() read, write = wrlock.reader, wrlock.writer class Writer(Thread): gotit = False def run(self): write.acquire() self.gotit = True write.release() writer = Writer() self.assertEqual(wrlock.readerCount, 0) read.acquire() self.assertEqual(wrlock.readerCount, 1) writer.start() self.assertFalse(writer.gotit) # can acquire the write lock if only # this thread has the read lock write.acquire() write.release() read.acquire() self.assertEqual(wrlock.readerCount, 2) self.assertFalse(writer.gotit) read.release() self.assertEqual(wrlock.readerCount, 1) self.assertFalse(writer.gotit) read.release() self.assertEqual(wrlock.readerCount, 0) time.sleep(.1) self.assertTrue(writer.gotit) def testDeadlock(self): wrlock = ReadWriteLock() read, write = wrlock.reader, wrlock.writer errors = [] # a situation which can readily deadlock if care isn't taken class LockThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.q = Queue.Queue() def run(self): while True: task, lock, delay = self.q.get() if not task: break time.sleep(delay) if task == 'acquire': for delay in waittime(maxTime=5.0): if lock.acquire(False): break time.sleep(delay) else: errors.append("Couldn't acquire %s" % str (lock)) else: lock.release() thrd = LockThread() thrd.start() thrd.q.put(('acquire', read, 0)) time.sleep(.2) read.acquire() thrd.q.put(('acquire', write, 0)) thrd.q.put(('release', write, .5)) thrd.q.put(('release', read, 0)) write.acquire() time.sleep(0.0) write.release() read.release() # end thrd.q.put((None, None, None)) thrd.join() self.assertFalse(errors, "Errors: %s" % errors)