I'm building a threaded file searcher that uses some of Fredrik Lundh's (<a href="http://effbot.org/zone/wide-finder.htm">http://effbot.org/zone/wide-finder.htm</a>) suggestions for parsing text very quickly in pure python, as I have about a 10GB log file to parse every day. A naiive approach would be to just parse the 1MB chunks, add the results into a list, and just traverse that list.<br>

<br>I want to take this idea a bit further. I want to process results as they're being found. A great way to implement this is to use the Queue class that python provides. My idea is to exploit the iterator protocol to have it block until a result is found, if any, and return the result until we're finished parsing the file then we can raise StopIteration. <br>

<br>My idea is sort of similar to a producer / consumer, but it follows something of this idiom:<br>  producer produces the file chunks<br>  consumer consumes the file chunks<br>     -> consumer parsers the file chunks and produces results<br>

  class waits on the production of the original consumer and processes it as they come.<br><br>I am having a bit of trouble with the concurrency, but I'm using this as an exercise to understand how concurrency works from a broader scale. I am not trying to get into a debate of whether this is really needed or a python-concurrency debate:)<br>

<br>Without further ado, my class is as follows:<br><br>class ThreadedFileSearcher(object):<br>    def __init__(self, filename, rx_pat, blocking_timeout = 10):<br>        <a href="http://self.name">self.name</a> = filename<br>

        self.pattern = rx_pat<br>        self.blocking_timeout = blocking_timeout<br><br>        #need to find a better way to do this with more threads that can return<br>        #stable results (aka chunks are in order)<br>

        self._thread_count = 1<br><br>        #the queues<br>        self._results = Queue.Queue()<br>        self._queue = Queue.Queue() <br><br>        #returns the get_chunk() implementation<br>        self._engine = LogParsingEngine(filename)<br>

        <br>        #start acquiring file offsets for the file<br>        #as daemon threads<br>        self._initialize_worker_threads(self._prime_queue)<br><br>        #experimental...should probably be some type of conditional variable<br>

        self._stop_processing = False<br>        <br>    def __iter__(self):<br>        #start the worker threads<br>        self._initialize_worker_threads(self._target_worker)<br>        return self.next()<br><br>    def _initialize_worker_threads(self, callback):<br>

        #should really use just one thread<br>        for i in xrange(self._thread_count):<br>            t = threading.Thread(target=callback)<br>            t.setDaemon(True)<br>            t.start()<br><br>    def _prime_queue(self):<br>

        """put code chunk offsets on the queue"""<br>        #get_chunks() just returns 1MB offsets in the file<br>        for chunk in self._engine.get_chunks():<br>            self._queue.put(chunk)<br>

<br>    def _target_worker(self):<br>        """code chunk to parse queue"""<br>        #loop infinitely<br>        while True:<br>            try:<br>                #get next chunk offset from the queue<br>

                start_pos, bytes_to_read = self._queue.get(<br>                                              timeout=self.blocking_timeout<br>                                           )<br>            except (TypeError, Queue.Empty):<br>

                #None was returned from the .get() <br>                #this will throw a TypeError as it tries to unpack None<br>                #or the Queue was empty<br>                self._stop_processing = True<br>

                #exit loop<br>                break<br><br>            #process the cunk here<br>            f = open(<a href="http://self.name">self.name</a>, 'r')<br>            f.seek(start_pos)<br>            #find all matching lines in the chunk<br>

            for chunk in self.pattern.findall(f.read(bytes_to_read)):<br>                #an non-overlapping matches of self.pattern<br>                #placed on the queue as a string<br>                self._results.put(chunk)<br>

            f.close() <br><br>            #done!<br>            self._queue.task_done()<br><br>    def next(self):<br>        while True:<br>            try:<br>                #wait for the results to be put on<br>                matchedlist = self._results.get(timeout=self.blocking_timeout)<br>

            except Queue.Empty:<br>                #if the worker thread finished<br>                if self._stop_processing:<br>                    raise StopIteration<br>            else:<br>                self._results.task_done()<br>

                yield matchedlist<br><br>To use the following class, I wanted to have some kind of interface like this:<br><br>regex = re.compile("-{3}Processing..-{3}") #---Processing..---<br>f = ThreadedFileSearcher("LogFile.log", regex)<br>

for line in f:<br>    #guaranteed to be a line that matches regex<br>    #process something...<br>    print line<br><br><br>I am looking for some suggestions, comments, and better ways to modify this. <br><br>One thing someone will realize when using this class is that the initial parsing will be incredibly quick, but if the blocking_timeout is set to 10, then there will be a 10second wait at the end to test if the worker threads should set the stop conditions. A glaring hole leading from this is if the blocking_timeout is set to something like 1second and by the time a user attempts to iterate over the results, the worker threads will prematurely stop processing.<br>

<br>Speaking of stop processing, should self._stop_processing be a conditional variable. Right now it's a boolean, and I think that's pretty hacky. I don't like the StopIteration stop condition, maybe someone has a better suggestion?<br>

<br>A future modification I'm looking for is to launch multiple threads that process different parts of the file (different chunks) and return their results, probably indexed by their chunk offset. Then I can iterate over that sequentially. I think that would be a trivial parallel optimization. <br>

<br>Thoughts? Comments?<br><br>Thanks very much,<br><br>Mahmoud Abdelkader<br><a href="mailto:mahmoud@linux.com">mahmoud@linux.com</a><br><a href="http://blog.mahmoudimus.com/">http://blog.mahmoudimus.com/</a><br>