Multi thread reading a file
Gabriel Genellina
gagsl-py2 at yahoo.com.ar
Wed Jul 1 00:54:45 EDT 2009
En Tue, 30 Jun 2009 22:52:18 -0300, Mag Gam <magawake at gmail.com> escribió:
> I am very new to python and I am in the process of loading a very
> large compressed csv file into another format. I was wondering if I
> can do this in a multi thread approach.
Does the format conversion involve a significant processing time? If not,
the total time is dominated by the I/O time (reading and writing the file)
so it's doubtful you gain anything from multiple threads.
> Here is the pseudo code I was thinking about:
>
> Let T = Total number of lines in a file, Example 1000000 (1 million
> files)
> Let B = Total number of lines in a buffer, for example 10000 lines
>
>
> Create a thread to read until buffer
> Create another thread to read buffer+buffer ( So we have 2 threads
> now. But since the file is zipped I have to wait until the first
> thread is completed. Unless someone knows of a clever technique.
> Write the content of thread 1 into a numpy array
> Write the content of thread 2 into a numpy array
Can you process each line independently? Is the record order important? If
not (or at least, some local dis-ordering is acceptable) you may use a few
worker threads (doing the conversion), feed them thru a Queue object, put
the converted lines into another Queue, and let another thread write the
results onto the destination file.
import Queue, threading, csv
def convert(in_queue, out_queue):
while True:
row = in_queue.get()
if row is None: break
# ... convert row
out_queue.put(converted_line)
def write_output(out_queue):
while True:
line = out_queue.get()
if line is None: break
# ... write line to output file
in_queue = Queue.Queue()
out_queue = Queue.Queue()
tlist = []
for i in range(4):
t = threading.Thread(target=convert, args=(in_queue, out_queue))
t.start()
tlist.append(t)
output_thread = threading.Thread(target=write_output, args=(out_queue,))
output_thread.start()
with open("...") as csvfile:
reader = csv.reader(csvfile, ...)
for row in reader:
in_queue.put(row)
for t in tlist: in_queue.put(None) # indicate end-of-work
for t in tlist: t.join() # wait until finished
out_queue.put(None)
output_thread.join() # wait until finished
--
Gabriel Genellina
More information about the Python-list
mailing list