[Python-Dev] Fwd: [ python-Patches-1744382 ] Read Write lock

Mike Klaas mike.klaas at gmail.com
Fri Jul 6 19:47:16 CEST 2007


On 6-Jul-07, at 6:45 AM, Yaakov Nemoy wrote:

>
> I can do the other three parts, but I am wondering, how do I write a
> deterministic test unit for my patch?  How is it done with the
> threading model in python in general?

I don't know how it is done in general, but for reference, here are  
some of the unittests for my read/write lock class:

     def testReadCount(self):
         wrlock = ReadWriteLock()
         read, write = wrlock.reader, wrlock.writer

         self.assertEqual(wrlock.readerCount, 0)
         read.acquire()
         self.assertEqual(wrlock.readerCount, 1)
         read.acquire()
         self.assertEqual(wrlock.readerCount, 2)
         read.release()
         self.assertEqual(wrlock.readerCount, 1)
         read.release()
         self.assertEqual(wrlock.readerCount, 0)

     def testContention(self):
         wrlock = ReadWriteLock()
         read, write = wrlock.reader, wrlock.writer

         class Writer(Thread):
             gotit = False
             def run(self):
                 write.acquire()
                 self.gotit = True
                 write.release()
         writer = Writer()

         self.assertEqual(wrlock.readerCount, 0)
         read.acquire()
         self.assertEqual(wrlock.readerCount, 1)
         writer.start()
         self.assertFalse(writer.gotit)

         read.acquire()
         self.assertEqual(wrlock.readerCount, 2)
         self.assertFalse(writer.gotit)

         read.release()
         self.assertEqual(wrlock.readerCount, 1)
         self.assertFalse(writer.gotit)

         read.release()
         self.assertEqual(wrlock.readerCount, 0)
         time.sleep(.1)
         self.assertTrue(writer.gotit)

     def testWRAcquire(self):
         wrlock = ReadWriteLock()
         read, write = wrlock.reader, wrlock.writer

         self.assertEqual(wrlock.readerCount, 0)
         write.acquire()
         write.acquire()
         write.release()
         write.release()

         read.acquire()
         self.assertEqual(wrlock.readerCount, 1)
         read.acquire()
         self.assertEqual(wrlock.readerCount, 2)
         read.release()
         self.assertEqual(wrlock.readerCount, 1)
         read.release()
         self.assertEqual(wrlock.readerCount, 0)
         write.acquire()
         write.release()

     def testOwnAcquire(self):
         wrlock = ReadWriteLock()
         read, write = wrlock.reader, wrlock.writer

         class Writer(Thread):
             gotit = False
             def run(self):
                 write.acquire()
                 self.gotit = True
                 write.release()
         writer = Writer()

         self.assertEqual(wrlock.readerCount, 0)
         read.acquire()
         self.assertEqual(wrlock.readerCount, 1)
         writer.start()
         self.assertFalse(writer.gotit)

         # can acquire the write lock if only
         # this thread has the read lock
         write.acquire()
         write.release()

         read.acquire()
         self.assertEqual(wrlock.readerCount, 2)
         self.assertFalse(writer.gotit)

         read.release()
         self.assertEqual(wrlock.readerCount, 1)
         self.assertFalse(writer.gotit)

         read.release()
         self.assertEqual(wrlock.readerCount, 0)
         time.sleep(.1)
         self.assertTrue(writer.gotit)


     def testDeadlock(self):
         wrlock = ReadWriteLock()
         read, write = wrlock.reader, wrlock.writer

         errors = []

         # a situation which can readily deadlock if care isn't taken
         class LockThread(threading.Thread):
             def __init__(self):
                 threading.Thread.__init__(self)
                 self.q = Queue.Queue()
             def run(self):
                 while True:
                     task, lock, delay = self.q.get()
                     if not task:
                         break
                     time.sleep(delay)
                     if task == 'acquire':
                         for delay in waittime(maxTime=5.0):
                             if lock.acquire(False):
                                 break
                             time.sleep(delay)
                         else:
                             errors.append("Couldn't acquire %s" % str 
(lock))
                     else:
                         lock.release()

         thrd = LockThread()
         thrd.start()

         thrd.q.put(('acquire', read, 0))
         time.sleep(.2)
         read.acquire()
         thrd.q.put(('acquire', write, 0))
         thrd.q.put(('release', write, .5))
         thrd.q.put(('release', read, 0))
         write.acquire()
         time.sleep(0.0)
         write.release()
         read.release()

         # end
         thrd.q.put((None, None, None))
         thrd.join()

         self.assertFalse(errors, "Errors: %s" % errors)



More information about the Python-Dev mailing list