Multi thread reading a file

Gabriel Genellina gagsl-py2 at yahoo.com.ar
Wed Jul 1 00:54:45 EDT 2009


En Tue, 30 Jun 2009 22:52:18 -0300, Mag Gam <magawake at gmail.com> escribió:

> I am very new to python and I am in the process of loading a very
> large compressed csv file into another format.  I was wondering if I
> can do this in a multi thread approach.

Does the format conversion involve a significant processing time? If not,  
the total time is dominated by the I/O time (reading and writing the file)  
so it's doubtful you gain anything from multiple threads.

> Here is the pseudo code I was thinking about:
>
> Let T  = Total number of lines in a file, Example 1000000 (1 million  
> files)
> Let B = Total number of lines in a buffer, for example 10000 lines
>
>
> Create a thread to read until buffer
> Create another thread to read buffer+buffer  ( So we have 2 threads
> now. But since the file is zipped I have to wait until the first
> thread is completed. Unless someone knows of a clever technique.
> Write the content of thread 1 into a numpy array
> Write the content of thread 2 into a numpy array

Can you process each line independently? Is the record order important? If  
not (or at least, some local dis-ordering is acceptable) you may use a few  
worker threads (doing the conversion), feed them thru a Queue object, put  
the converted lines into another Queue, and let another thread write the  
results onto the destination file.

import Queue, threading, csv

def convert(in_queue, out_queue):
   while True:
     row = in_queue.get()
     if row is None: break
     # ... convert row
     out_queue.put(converted_line)

def write_output(out_queue):
   while True:
     line = out_queue.get()
     if line is None: break
     # ... write line to output file

in_queue = Queue.Queue()
out_queue = Queue.Queue()
tlist = []
for i in range(4):
   t = threading.Thread(target=convert, args=(in_queue, out_queue))
   t.start()
   tlist.append(t)
output_thread = threading.Thread(target=write_output, args=(out_queue,))
output_thread.start()

with open("...") as csvfile:
   reader = csv.reader(csvfile, ...)
   for row in reader:
     in_queue.put(row)

for t in tlist: in_queue.put(None) # indicate end-of-work
for t in tlist: t.join() # wait until finished
out_queue.put(None)
output_thread.join() # wait until finished

-- 
Gabriel Genellina




More information about the Python-list mailing list