Need advice on multithreading problem
Gerhard Häring
gerhard at bigfoot.de
Tue Jun 4 09:07:50 EDT 2002
This morning, I've tried to seriously use multithreading in Python for the
first time. So I'd like to get any comments on the design of my
proof-of-concept that I'll paste below at full length. Hope it's not too long
for usenet, but I don't want to throw my comments/docstrings away.
The problem is this: I have a C library with a function of the following
interface:
def sqlite_exec(conn, sql, callback, arg1):
The only important parameter here is 'callback', it is a callback function that
you need to give to sqlite_exec. The callback function will be called with the
items sqlite_exec returns. If the callback functions returns a value other than
zero, sqlite_exec will stop its processing.
On the Python side, I wanted to force this C library call into a generator.
Which I did using a thread that's spawned for a new sqlite_exec call and
Queue.Queue.
The proof-of-concept *seems* to work, but there might be ugly errors hidden
still. Any comments are welcome.
Gerhard
--- begin code ---
"""
This is a proof-of-concept for the implementation of a new alternative
Cursor class that will exploit Python 2.2 generators and the SQLite
sqlite_exec function with a custom callback.
The point of this exercise is to provide a means to _not_ have to keep the
entire result set in memory, but to return row by row to the caller on demand,
using generators.
Of course it will be slower than the current implementation. If nothing else,
it's a nice programming exercise for me to learn how to handle multithreading
reliably.
"""
from __future__ import generators
from Queue import Queue, Empty, Full
import threading
# Singleton to mark that the last element of a queue. If this is put into a
# queue, the consumer will stop reading from the queue.
END_OF_QUEUE = 1
# SQLite's sqlite_exec function looks similar to this one:
def sqlite_exec(conn, sql, callback, arg1):
# Let's assume that SQLite would return 15 results
for i in range(15):
# According to the docs, if the callback returns something other
# than 0, the query will be aborted.
if callback(arg1, ["a", "b", i]) != 0:
break
class Producer(threading.Thread):
"""A separate producer thread that wraps sqlite_exec, provides all callback
function for sqlite_exec and fills a queue with the result rows for the
consumer to read from."""
def __init__(self, *args, **kwargs):
self.stopflag = 0
self.queue = kwargs["queue"]
self.conn = kwargs["conn"]
self.sql = kwargs["sql"]
del kwargs["queue"]
del kwargs["conn"]
del kwargs["sql"]
threading.Thread.__init__(self, *args, **kwargs)
def run(self):
def callback(arg1, items):
"""Callback function for sqlite_exec."""
if self.stopflag:
self.queue.put(END_OF_QUEUE, 1)
# We want to abort the query:
return 1
else:
self.queue.put(items, 1)
return 0
sqlite_exec(self.conn, self.sql, callback, None)
self.queue.put(END_OF_QUEUE, 1)
class Cursor:
def __init__(self):
self.conn = None
self.queue = None
self.producer = None
self.__invalidate()
def __invalidate(self):
"""Invalidate is called when a new query is processed with execute or
executemany or the cursor is closed or the cursor object gets out of
scope."""
self.current_sql = None
if self.producer is not None:
self.producer.stopflag = 1
# Try to empty the queue, so the producer can insert its
# END_OF_QUEUE and won't block forever.
try:
while 1:
item = self.queue.get(0)
except Empty, reason:
pass
def execute(self, sql, *parms):
# The query isn't actually processed at this point. It will only be
# processed in the fetchone/fetchmany/fetchall methods, using __execute
self.__invalidate()
self.current_sql = sql
def __execute(self):
"""Build a queue and start the producer thread."""
# We keep max 10 result rows in memory at any given point in time:
self.queue = Queue(10)
self.producer = Producer(conn=None, sql=self.current_sql, queue=self.queue)
self.producer.start()
self.current_sql = None
def fetchall(self):
if self.current_sql is not None:
self.__execute()
while 1:
item = self.queue.get(1)
if item is END_OF_QUEUE:
raise StopIteration
yield item
def fetchmany(self, howmany=15):
if self.current_sql is not None:
self.__execute()
for i in range(howmany):
item = self.queue.get(1)
if item is END_OF_QUEUE:
raise StopIteration
yield item
def fetchone(self):
if self.current_sql is not None:
self.__execute()
item = self.queue.get(1)
if item is END_OF_QUEUE:
return None
else:
return item
def close(self):
self.__invalidate()
def __del__(self):
self.__invalidate()
c = Cursor()
c.execute("select * from foo")
print "--- fetchone() ---"
print c.fetchone()
print "--- fetchmany(5) ---"
for res in c.fetchmany(5):
print res
print "--- fetchall() ---"
for res in c.fetchall():
print res
c.close()
More information about the Python-list
mailing list