collecting results in threading app
george.sakkis at gmail.com
Fri Apr 4 19:54:51 CEST 2008
On Apr 4, 11:27 am, Gerardo Herzig <gher... at fmed.uba.ar> wrote:
> John Nagle wrote:
> >Gerardo Herzig wrote:
> >>Hi all. Newbee at threads over here. Im missing some point here, but cant
> >>figure out which one.
> >>This little peace of code executes a 'select count(*)' over every table
> >>in a database, one thread per table:
> >>class TableCounter(threading.Thread):
> >> def __init__(self, conn, table):
> >> self.connection = connection.Connection(host=conn.host,
> >>port=conn.port, user=conn.user, password='', base=conn.base)
> >> threading.Thread.__init__(self)
> >> self.table = table
> >> def run(self):
> >> result = self.connection.doQuery("select count(*) from %s" %
> >>self.table, )
> >> print result
> >> return result
> >>class DataChecker(metadata.Database):
> >> def countAll(self):
> >> for table in self.tables:
> >> t = TableCounter(self.connection, table.name)
> >> t.start()
> >> return
> >>It works fine, in the sense that every run() method prints the correct
> >>But...I would like to store the result of t.start() in, say, a list. The
> >>thing is, t.start() returns None, so...what im i missing here?
> >>Its the desing wrong?
> > 1. What interface to MySQL are you using? That's not MySQLdb.
> > 2. If SELECT COUNT(*) is slow, check your table definitions.
> > For MyISAM, it's a fixed-time operation, and even for InnoDB,
> > it shouldn't take that long if you have an INDEX.
> > 3. Threads don't return "results" as such; they're not functions.
> >As for the code, you need something like this:
> >class TableCounter(threading.Thread):
> > def __init__(self, conn, table):
> > self.result = None
> > ...
> > def run(self):
> > self.result = self.connection.doQuery("select count(*) from %s" %
> > self.table, )
> > def countAll(self):
> > mythreads =  # list of TableCounter objects
> > # Start all threads
> > for table in self.tables:
> > t = TableCounter(self.connection, table.name)
> > mythreads.append(t) # list of counter threads
> > t.start()
> > # Wait for all threads to finish
> > totalcount = 0
> > for mythread in mythreads: # for all threads
> > mythread.join() # wait for thread to finish
> > totalcount += mythread.result # add to result
> > print "Total size of all tables is:", totalcount
> > John Nagle
> Thanks John, that certanly works. According to George's suggestion, i
> will take a look to the Queue module.
> One question about
> for mythread in mythreads: # for all threads
> mythread.join() # wait for thread to finish
> That code will wait for the first count(*) to finish and then continues
> to the next count(*). Because if is that so, it will be some kind of
> 'use threads, but execute one at the time'.
> I mean, if mytreads is a very longer one, all the others will be
No, all will be executed in parallel; only the main thread will be
waiting for the first thread to finish. So if only the first job is
long, as soon as it finishes and join()s, all the others will already
have finished and their join() will be instantaneous.
> There is an approach in which i can 'sum' after *any* thread finish?
> Could a Queue help me there?
Yes, you can push each result to a queue and have the main thread wait
in a loop doing a queue.get() every time. After each get() you can do
whatever with the results so far (partial sum, update a progress bar,
You can take a look at papyros , a small package I wrote for hiding
the details behind a simple Pythonic API. Using papyros, your example
would look something like this:
from papyros import Job
from papyros.multithreaded import MultiThreadedMaster
# a papyros.Job subclass for each type of task you want to run
def __call__(self, connection, table_name):
return connection.doQuery("select count(*) from %s" %
sum_count = 0
# create a pool of 4 threads
master = MultiThreadedMaster(4)
# issue all the jobs
for table in self.tables:
# get each processed job as soon as it finishes
for job in iter(master.popProcessedJob, None):
# the job arguments are available as job.args
table_name = job.args
try: # try to get the result
count = job.result
except Exception, ex:
# some exception was raised when executing this job
print '* Exception raised for table %s: %s' %
# job finished successfully
sum_count += count
print 'Table %s: count=%d (running total=%d)' % (
table_name, count, sum_count)
As you can see, any exception raised in a thread is stored and
reraised on the main thread when you attempt to get the result. You
can also specify a timeout in popProcessedJob() so that the main
thread doesn't wait forever in case a job hangs.
Last but not least, the same API is implemented both for threads and
processes (using Pyro) so it's not restricted by the GIL in case the
jobs are CPU-intensive.
More information about the Python-list