[Tutor] Starbucks does not use two-phase commit

Todd Maynard python-tutor at toddmaynard.com
Sat Jan 21 19:57:43 CET 2006


Danny,

I want to thank you for ruining my plans for a relaxing Saturday morning.  As 
a thread newbie I killed several hours playing around with your code.

One thing I noticed is that sometimes the program would hang, which I figured 
was the Queue code blocking in the Ticket claim function. I used exception 
handling to deal with that situation cleanly. 

I then decided that it wasn't very nice of Starbucks to close after accepting 
my order without giving me my Latte, so I changed that part of the code to:

         def schedule(self,job):
            """Schedules a job and returns a "ticket" the user can later use 
to get the result."""
            if self.acceptNew == True:
                  outputQueue=Queue()
                  self.queue.put((job,outputQueue))
                  return Ticket(outputQueue)
            else:
               print "Server not accepting any new requests."
               return None

         def scheduleShutdown(self):
            """Add a job that shuts the system down."""
            print "Telling server to shut down"
            self.queue.put((Server._QUIT_NICELY,None))

         def _jobLoop(self):
            """Continue looping through tasks."""
            while True:
               print "Looping ... "
               (nextJob, outputQueue) = self.queue.get()
               if nextJob is server._QUIT_NOW:
                  return
               if nextJob is server._QUIT_NICELY:
                    self.acceptNew = False
                    self.queue.put((Server._QUIT_NOW,None))
               else:
                  returnValue=self._doJob(nextJob)
                  outputQueue.put(returnValue)


I am 99.44% sure that this is thread safe, reasoning being:
	 setting the acceptNew to False and adding the QUIT_NOW happens in the same 
thread so it is impossible for another job to get scheduled after the 
QUIT_NOW - so no thread will end up hanging...

However, I would sleep a little better if you could reassure me that I am 
right, and would sleep even better if you could give me a method to test 
this.  This kinda stuff looks tricky to test with standard unittest 
methodology....

Thanks again for the enligntenment all you guys bring to this awesome 
language.

My bastardized code is below for reference.

--Todd Maynard < python-tutor at toddmaynard.com >


[It is] best to confuse only one issue at a time.
		-- K&R


**************************************


from threading import Thread
from Queue import Queue,Empty

class Ticket(object):
   """A small token we can use to claim our result."""
   def __init__(self, q):
      self.q = q
      self.result = None
      self.done=False

   def claim(self):
      if not self.done:
         try:
            self.result=self.q.get(True,5)
            self.done=True
         except Empty:
            print "We lost the server!"
            self.result=None
      return self.result


class Server(object):
         _QUIT_NOW=['Quit!']
         _QUIT_NICELY=['Quit Nicely']

         def __init__(self):
            """A queue will contain 2-tuples of (job, outputQueue) 
elements."""
            self.queue=Queue()
            self.acceptNew=True

         def startServer(self):
            """Brings the server online."""
            Thread(target=self._jobLoop).start()

         def schedule(self,job):
            """Schedules a job and returns a "ticket" the user can later use 
to get the result."""
            if self.acceptNew == True:
                  outputQueue=Queue()
                  self.queue.put((job,outputQueue))
                  return Ticket(outputQueue)
            else:
               print "Server not accepting any new requests."
               return None

         def scheduleShutdown(self):
            """Add a job that shuts the system down."""
            print "Telling server to shut down"
            self.queue.put((Server._QUIT_NICELY,None))

         def _jobLoop(self):
            """Continue looping through tasks."""
            while True:
               print "Looping ... "
               (nextJob, outputQueue) = self.queue.get()
               if nextJob is server._QUIT_NOW:
                  return
               if nextJob is server._QUIT_NICELY:
                    self.acceptNew = False
                    self.queue.put((Server._QUIT_NOW,None))
               else:
                  returnValue=self._doJob(nextJob)
                  outputQueue.put(returnValue)

         def _doJob(self,job):
            print "I'm doing " , job
            return job + job #Something to show that we are doing something

def separateCaller(server):
   for i in range(1000,1004+1):
      print " -- separateCaller asks %d" % i
      ticket = server.schedule(str(i))
      if ticket:
         print " -- separateCaller got %s" % ticket.claim()
      else:
         print " -- separateCaller couldn't get a ticket." 

if __name__=="__main__":
   server = Server()
   server.startServer()
   Thread(target=separateCaller, args=(server,)).start()

   result1=server.schedule("1")
   

   result2=server.schedule("2")
   

   result3=server.schedule("3")

   if result3:
      print "result3: %s" % result3.claim()
   else:
      print "result3: Couldn't get a ticket"

   if result2:
      print "result2: %s" % result2.claim()
   else:
      print "result2: Couldn't get a ticket"

   if result1:
      print "result1: %s" % result1.claim()
   else:
      print "result1: Couldn't get a ticket"
      
   server.scheduleShutdown()
   



More information about the Tutor mailing list