Re: [Twisted-Python] Synchronous Code Fishbowl

Glyph wrote:
syncbridge looks like an interesting module. I've often wanted to do something similar, to have a "one right way" for integrating with non-async-clean legacy code which also doesn't necessarily support threaded concurrency either.
However, I notice that nothing sets up the 'shutdown' method to be called on reactor shutdown automatically; this is a very tricky area, since mismanaged thread-pool shutdown can lock a process up hard as it's exiting.
Also, have you considered just using a twisted.python.threadpool of maximum size 1, rather than callInThread? The main reason I didn't document that as the aforementioned "one right way" was because of the associated shutdown issues. The major advantage of callInThread is that the reactor's own threadpool is definitely initialized and shut down at predictable points.
Well of course, Glyph's "interesting module" comment was just enough of a table scrap to get me running, tail wagging furiously. The result (unit testing in progress) is a full-fledged SynchronousTasks object that runs a priority queue of synchronous tasks with niceness scheduling. See * http://foss.eepatents.com/sAsync/browser/trunk/sasync/syncbridge.py * http://foss.eepatents.com/sAsync/browser/trunk/test/syncbridge.py Once fully tested, would SynchronousTasks be considered as an addition to twisted.internet.threads? Best regards, Ed

Ed Suominen wrote:
Glyph wrote:
syncbridge looks like an interesting module. I've often wanted to do something similar, to have a "one right way" for integrating with non-async-clean legacy code which also doesn't necessarily support threaded concurrency either. [...]
Well of course, Glyph's "interesting module" comment was just enough of a table scrap to get me running, tail wagging furiously. The result (unit testing in progress) is a full-fledged SynchronousTasks object that runs a priority queue of synchronous tasks with niceness scheduling. See
* http://foss.eepatents.com/sAsync/browser/trunk/sasync/syncbridge.py * http://foss.eepatents.com/sAsync/browser/trunk/test/syncbridge.py
Once fully tested, would SynchronousTasks be considered as an addition to twisted.internet.threads?
Just scanned through that module and the idea looks useful to me too. In fact, AFAICT, it's basically the same technique I used for laxdb, <http://twistedmatrix.com/trac/browser/sandbox/mg/laxdb.py>. One small comment, and it's totally untested ... I don't think an already empty SynchronousQueue can be shutdown because nothing will wake the processing loop in _workOnTasks. - Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.

Ed Suominen wrote:
Well of course, Glyph's "interesting module" comment was just enough of a table scrap to get me running, tail wagging furiously. The result (unit testing in progress) is a full-fledged SynchronousTasks object that runs a priority queue of synchronous tasks with niceness scheduling. See
* http://foss.eepatents.com/sAsync/browser/trunk/sasync/syncbridge.py * http://foss.eepatents.com/sAsync/browser/trunk/test/syncbridge.py
I was going to post again last night about how the PriorityQueue.get() would never block once something had been put() into it, but I see you've fixed that bug by clearing the event semaphore. Unfortunately, the code now has a race condition. If the SynchronousQueue._workOnTasks thread is pre-empted in PriorityQueue.get() between "if self.empty():" and "self.event.clear()", and another thread calls PriorityQueue.put() then the event semaphore set during put() will be cleared when get() continues. OK, that was horrible to write so here's a picture instead ;-) ... Thread 1 Thread 2 # Calls get() self.event.wait() result = heapq.heappop(self.list) if self.empty(): <------------------ Thread 2 preempts Thread 1 ----------------------> # Calls put() heapq.heappush(self.list, item) self.event.set() <---------------------- Thread 1 continues --------------------------> self.event.clear() PriorityQueue should probably be using a Condition to protect access to the heapq list *and* wait for something to be posted to it. See <http://docs.python.org/lib/condition-objects.html>. Cheers, Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.

Matt Goodall wrote:
Ed Suominen wrote:
Well of course, Glyph's "interesting module" comment was just enough of a table scrap to get me running, tail wagging furiously. The result (unit testing in progress) is a full-fledged SynchronousTasks object that runs a priority queue of synchronous tasks with niceness scheduling. See
* http://foss.eepatents.com/sAsync/browser/trunk/sasync/syncbridge.py * http://foss.eepatents.com/sAsync/browser/trunk/test/syncbridge.py
I was going to post again last night about how the PriorityQueue.get() would never block once something had been put() into it, but I see you've fixed that bug by clearing the event semaphore. Unfortunately, the code now has a race condition.
If the SynchronousQueue._workOnTasks thread is pre-empted in PriorityQueue.get() between "if self.empty():" and "self.event.clear()", and another thread calls PriorityQueue.put() then the event semaphore set during put() will be cleared when get() continues.
OK, that was horrible to write so here's a picture instead ;-) ...
Thread 1 Thread 2
# Calls get() self.event.wait() result = heapq.heappop(self.list) if self.empty():
<------------------ Thread 2 preempts Thread 1 ---------------------->
# Calls put() heapq.heappush(self.list, item) self.event.set()
<---------------------- Thread 1 continues -------------------------->
self.event.clear()
PriorityQueue should probably be using a Condition to protect access to the heapq list *and* wait for something to be posted to it. See <http://docs.python.org/lib/condition-objects.html>.
I notice you've updated syncbridge to use a Condition now. Looks better to me, although there's another bug and an improvement suggestion. First up is that it should be "while self.empty(): self.cv.wait()". Whenever something is put on the queue notify() is always called to wake a consumer thread. However, there may already be a consumer thread tearing around a loop taking items from the queue and, critically, never waiting until the queue is empty. By the time the newly woken consumer thread actually calls heappop to take an item the existing consumer thread may have emptied the queue. Ths improvement suggestion is to put the conditional's release() in a finally block to ensure it actually happens. (Why isn't the Condition example in the documentation written expecting exceptions?) Anyway, this all makes get() and put() look something like: def get(self): self.cv.acquire() try: while self.empty(): self.cv.wait() return heapq.heappop(self.list) finally: self.cv.release() def put(self, item): self.cv.acquire() try: heapq.heappush(self.list, item) self.cv.notify() finally: self.cv.release() After that, I think the queue implementation is ok. And some people insist that threading is easy ;-). - Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.

On May 31, 2006, at 7:10 AM, Matt Goodall wrote:
After that, I think the queue implementation is ok.
And some people insist that threading is easy ;-).
Alternatively you could take advantage of python's built in thread- safe Queue.Queue. Simply extend it and overwrite the private methods (_put, _get, etc.) -- Philip Jenvey

Philip Jenvey wrote:
On May 31, 2006, at 7:10 AM, Matt Goodall wrote:
After that, I think the queue implementation is ok.
And some people insist that threading is easy ;-).
Alternatively you could take advantage of python's built in thread- safe Queue.Queue. Simply extend it and overwrite the private methods (_put, _get, etc.)
Good idea. The usual comment about not overriding _ method applies but it looks safe in this case because Queue explicitly documents the _ methods that are safe to override. - Matt -- __ / \__ Matt Goodall, Pollenation Internet Ltd \__/ \ w: http://www.pollenation.net __/ \__/ e: matt@pollenation.net / \__/ \ t: +44 (0)113 2252500 \__/ \__/ / \ Any views expressed are my own and do not necessarily \__/ reflect the views of my employer.
participants (3)
-
Ed Suominen
-
Matt Goodall
-
Philip Jenvey