I'm building a threaded file searcher that uses some of Fredrik Lundh's (<a href="http://effbot.org/zone/wide-finder.htm">http://effbot.org/zone/wide-finder.htm</a>) suggestions for parsing text very quickly in pure python, as I have about a 10GB log file to parse every day. A naiive approach would be to just parse the 1MB chunks, add the results into a list, and just traverse that list.<br>
<br>I want to take this idea a bit further. I want to process results as they're being found. A great way to implement this is to use the Queue class that python provides. My idea is to exploit the iterator protocol to have it block until a result is found, if any, and return the result until we're finished parsing the file then we can raise StopIteration. <br>
<br>My idea is sort of similar to a producer / consumer, but it follows something of this idiom:<br> producer produces the file chunks<br> consumer consumes the file chunks<br> -> consumer parsers the file chunks and produces results<br>
class waits on the production of the original consumer and processes it as they come.<br><br>I am having a bit of trouble with the concurrency, but I'm using this as an exercise to understand how concurrency works from a broader scale. I am not trying to get into a debate of whether this is really needed or a python-concurrency debate:)<br>
<br>Without further ado, my class is as follows:<br><br>class ThreadedFileSearcher(object):<br> def __init__(self, filename, rx_pat, blocking_timeout = 10):<br> <a href="http://self.name">self.name</a> = filename<br>
self.pattern = rx_pat<br> self.blocking_timeout = blocking_timeout<br><br> #need to find a better way to do this with more threads that can return<br> #stable results (aka chunks are in order)<br>
self._thread_count = 1<br><br> #the queues<br> self._results = Queue.Queue()<br> self._queue = Queue.Queue() <br><br> #returns the get_chunk() implementation<br> self._engine = LogParsingEngine(filename)<br>
<br> #start acquiring file offsets for the file<br> #as daemon threads<br> self._initialize_worker_threads(self._prime_queue)<br><br> #experimental...should probably be some type of conditional variable<br>
self._stop_processing = False<br> <br> def __iter__(self):<br> #start the worker threads<br> self._initialize_worker_threads(self._target_worker)<br> return self.next()<br><br> def _initialize_worker_threads(self, callback):<br>
#should really use just one thread<br> for i in xrange(self._thread_count):<br> t = threading.Thread(target=callback)<br> t.setDaemon(True)<br> t.start()<br><br> def _prime_queue(self):<br>
"""put code chunk offsets on the queue"""<br> #get_chunks() just returns 1MB offsets in the file<br> for chunk in self._engine.get_chunks():<br> self._queue.put(chunk)<br>
<br> def _target_worker(self):<br> """code chunk to parse queue"""<br> #loop infinitely<br> while True:<br> try:<br> #get next chunk offset from the queue<br>
start_pos, bytes_to_read = self._queue.get(<br> timeout=self.blocking_timeout<br> )<br> except (TypeError, Queue.Empty):<br>
#None was returned from the .get() <br> #this will throw a TypeError as it tries to unpack None<br> #or the Queue was empty<br> self._stop_processing = True<br>
#exit loop<br> break<br><br> #process the cunk here<br> f = open(<a href="http://self.name">self.name</a>, 'r')<br> f.seek(start_pos)<br> #find all matching lines in the chunk<br>
for chunk in self.pattern.findall(f.read(bytes_to_read)):<br> #an non-overlapping matches of self.pattern<br> #placed on the queue as a string<br> self._results.put(chunk)<br>
f.close() <br><br> #done!<br> self._queue.task_done()<br><br> def next(self):<br> while True:<br> try:<br> #wait for the results to be put on<br> matchedlist = self._results.get(timeout=self.blocking_timeout)<br>
except Queue.Empty:<br> #if the worker thread finished<br> if self._stop_processing:<br> raise StopIteration<br> else:<br> self._results.task_done()<br>
yield matchedlist<br><br>To use the following class, I wanted to have some kind of interface like this:<br><br>regex = re.compile("-{3}Processing..-{3}") #---Processing..---<br>f = ThreadedFileSearcher("LogFile.log", regex)<br>
for line in f:<br> #guaranteed to be a line that matches regex<br> #process something...<br> print line<br><br><br>I am looking for some suggestions, comments, and better ways to modify this. <br><br>One thing someone will realize when using this class is that the initial parsing will be incredibly quick, but if the blocking_timeout is set to 10, then there will be a 10second wait at the end to test if the worker threads should set the stop conditions. A glaring hole leading from this is if the blocking_timeout is set to something like 1second and by the time a user attempts to iterate over the results, the worker threads will prematurely stop processing.<br>
<br>Speaking of stop processing, should self._stop_processing be a conditional variable. Right now it's a boolean, and I think that's pretty hacky. I don't like the StopIteration stop condition, maybe someone has a better suggestion?<br>
<br>A future modification I'm looking for is to launch multiple threads that process different parts of the file (different chunks) and return their results, probably indexed by their chunk offset. Then I can iterate over that sequentially. I think that would be a trivial parallel optimization. <br>
<br>Thoughts? Comments?<br><br>Thanks very much,<br><br>Mahmoud Abdelkader<br><a href="mailto:mahmoud@linux.com">mahmoud@linux.com</a><br><a href="http://blog.mahmoudimus.com/">http://blog.mahmoudimus.com/</a><br>