Re: [Twisted-Python] Synchronous Code Fishbowl

Glyph wrote:
Well of course, Glyph's "interesting module" comment was just enough of a table scrap to get me running, tail wagging furiously. The result (unit testing in progress) is a full-fledged SynchronousTasks object that runs a priority queue of synchronous tasks with niceness scheduling. See * http://foss.eepatents.com/sAsync/browser/trunk/sasync/syncbridge.py * http://foss.eepatents.com/sAsync/browser/trunk/test/syncbridge.py Once fully tested, would SynchronousTasks be considered as an addition to twisted.internet.threads? Best regards, Ed

Ed Suominen wrote:
Just scanned through that module and the idea looks useful to me too. In fact, AFAICT, it's basically the same technique I used for laxdb, <http://twistedmatrix.com/trac/browser/sandbox/mg/laxdb.py>. One small comment, and it's totally untested ... I don't think an already empty SynchronousQueue can be shutdown because nothing will wake the processing loop in _workOnTasks. - Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.

Ed Suominen wrote:
I was going to post again last night about how the PriorityQueue.get() would never block once something had been put() into it, but I see you've fixed that bug by clearing the event semaphore. Unfortunately, the code now has a race condition. If the SynchronousQueue._workOnTasks thread is pre-empted in PriorityQueue.get() between "if self.empty():" and "self.event.clear()", and another thread calls PriorityQueue.put() then the event semaphore set during put() will be cleared when get() continues. OK, that was horrible to write so here's a picture instead ;-) ... Thread 1 Thread 2 # Calls get() self.event.wait() result = heapq.heappop(self.list) if self.empty(): <------------------ Thread 2 preempts Thread 1 ----------------------> # Calls put() heapq.heappush(self.list, item) self.event.set() <---------------------- Thread 1 continues --------------------------> self.event.clear() PriorityQueue should probably be using a Condition to protect access to the heapq list *and* wait for something to be posted to it. See <http://docs.python.org/lib/condition-objects.html>. Cheers, Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.

Matt Goodall wrote:
I notice you've updated syncbridge to use a Condition now. Looks better to me, although there's another bug and an improvement suggestion. First up is that it should be "while self.empty(): self.cv.wait()". Whenever something is put on the queue notify() is always called to wake a consumer thread. However, there may already be a consumer thread tearing around a loop taking items from the queue and, critically, never waiting until the queue is empty. By the time the newly woken consumer thread actually calls heappop to take an item the existing consumer thread may have emptied the queue. Ths improvement suggestion is to put the conditional's release() in a finally block to ensure it actually happens. (Why isn't the Condition example in the documentation written expecting exceptions?) Anyway, this all makes get() and put() look something like: def get(self): self.cv.acquire() try: while self.empty(): self.cv.wait() return heapq.heappop(self.list) finally: self.cv.release() def put(self, item): self.cv.acquire() try: heapq.heappush(self.list, item) self.cv.notify() finally: self.cv.release() After that, I think the queue implementation is ok. And some people insist that threading is easy ;-). - Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.

On May 31, 2006, at 7:10 AM, Matt Goodall wrote:
After that, I think the queue implementation is ok.
And some people insist that threading is easy ;-).
Alternatively you could take advantage of python's built in thread- safe Queue.Queue. Simply extend it and overwrite the private methods (_put, _get, etc.) -- Philip Jenvey

Philip Jenvey wrote:
Good idea. The usual comment about not overriding _ method applies but it looks safe in this case because Queue explicitly documents the _ methods that are safe to override. - Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.

Ed Suominen wrote:
Just scanned through that module and the idea looks useful to me too. In fact, AFAICT, it's basically the same technique I used for laxdb, <http://twistedmatrix.com/trac/browser/sandbox/mg/laxdb.py>. One small comment, and it's totally untested ... I don't think an already empty SynchronousQueue can be shutdown because nothing will wake the processing loop in _workOnTasks. - Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.

Ed Suominen wrote:
I was going to post again last night about how the PriorityQueue.get() would never block once something had been put() into it, but I see you've fixed that bug by clearing the event semaphore. Unfortunately, the code now has a race condition. If the SynchronousQueue._workOnTasks thread is pre-empted in PriorityQueue.get() between "if self.empty():" and "self.event.clear()", and another thread calls PriorityQueue.put() then the event semaphore set during put() will be cleared when get() continues. OK, that was horrible to write so here's a picture instead ;-) ... Thread 1 Thread 2 # Calls get() self.event.wait() result = heapq.heappop(self.list) if self.empty(): <------------------ Thread 2 preempts Thread 1 ----------------------> # Calls put() heapq.heappush(self.list, item) self.event.set() <---------------------- Thread 1 continues --------------------------> self.event.clear() PriorityQueue should probably be using a Condition to protect access to the heapq list *and* wait for something to be posted to it. See <http://docs.python.org/lib/condition-objects.html>. Cheers, Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.

Matt Goodall wrote:
I notice you've updated syncbridge to use a Condition now. Looks better to me, although there's another bug and an improvement suggestion. First up is that it should be "while self.empty(): self.cv.wait()". Whenever something is put on the queue notify() is always called to wake a consumer thread. However, there may already be a consumer thread tearing around a loop taking items from the queue and, critically, never waiting until the queue is empty. By the time the newly woken consumer thread actually calls heappop to take an item the existing consumer thread may have emptied the queue. Ths improvement suggestion is to put the conditional's release() in a finally block to ensure it actually happens. (Why isn't the Condition example in the documentation written expecting exceptions?) Anyway, this all makes get() and put() look something like: def get(self): self.cv.acquire() try: while self.empty(): self.cv.wait() return heapq.heappop(self.list) finally: self.cv.release() def put(self, item): self.cv.acquire() try: heapq.heappush(self.list, item) self.cv.notify() finally: self.cv.release() After that, I think the queue implementation is ok. And some people insist that threading is easy ;-). - Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.

On May 31, 2006, at 7:10 AM, Matt Goodall wrote:
After that, I think the queue implementation is ok.
And some people insist that threading is easy ;-).
Alternatively you could take advantage of python's built in thread- safe Queue.Queue. Simply extend it and overwrite the private methods (_put, _get, etc.) -- Philip Jenvey

Philip Jenvey wrote:
Good idea. The usual comment about not overriding _ method applies but it looks safe in this case because Queue explicitly documents the _ methods that are safe to override. - Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.
participants (3)
-
Ed Suominen
-
Matt Goodall
-
Philip Jenvey