Multi thread reading a file
Mag Gam
magawake at gmail.com
Wed Jul 1 07:41:29 EDT 2009
Thanks for the response Gabriel.
On Wed, Jul 1, 2009 at 12:54 AM, Gabriel
Genellina<gagsl-py2 at yahoo.com.ar> wrote:
> En Tue, 30 Jun 2009 22:52:18 -0300, Mag Gam <magawake at gmail.com> escribió:
>
>> I am very new to python and I am in the process of loading a very
>> large compressed csv file into another format. I was wondering if I
>> can do this in a multi thread approach.
>
> Does the format conversion involve a significant processing time? If not,
> the total time is dominated by the I/O time (reading and writing the file)
> so it's doubtful you gain anything from multiple threads.
The format does inolve significant time processing each line.
>
>> Here is the pseudo code I was thinking about:
>>
>> Let T = Total number of lines in a file, Example 1000000 (1 million
>> files)
>> Let B = Total number of lines in a buffer, for example 10000 lines
>>
>>
>> Create a thread to read until buffer
>> Create another thread to read buffer+buffer ( So we have 2 threads
>> now. But since the file is zipped I have to wait until the first
>> thread is completed. Unless someone knows of a clever technique.
>> Write the content of thread 1 into a numpy array
>> Write the content of thread 2 into a numpy array
>
> Can you process each line independently? Is the record order important? If
> not (or at least, some local dis-ordering is acceptable) you may use a few
> worker threads (doing the conversion), feed them thru a Queue object, put
> the converted lines into another Queue, and let another thread write the
> results onto the destination file.
Yes, each line can be independent. The original file is a time series
file which I am placing it into a Numpy array therefore I don't think
the record order is important. The writing is actually done when I
place a value into a "dset" object.
Let me show you what I mean.
reader=csv.reader(open("100000.csv"))
for s,row in enumerate(reader):
if s!=0 and s%bufsize==0:
dset[s-bufsize:s] = t #here is where I am writing the data to
the data structure. Using a range or broadcasting.
#15 columns
if len(row) != 15:
break
t[s%bufsize] = tuple(row)
#Do this all the way at the end for flushing.
if (s%bufsize != 0):
dset[(s//bufsize)*bufsize:s]=t[0:s%bufsize]
>
> import Queue, threading, csv
>
> def convert(in_queue, out_queue):
> while True:
> row = in_queue.get()
> if row is None: break
> # ... convert row
> out_queue.put(converted_line)
>
> def write_output(out_queue):
> while True:
> line = out_queue.get()
> if line is None: break
> # ... write line to output file
>
> in_queue = Queue.Queue()
> out_queue = Queue.Queue()
> tlist = []
> for i in range(4):
> t = threading.Thread(target=convert, args=(in_queue, out_queue))
> t.start()
> tlist.append(t)
> output_thread = threading.Thread(target=write_output, args=(out_queue,))
> output_thread.start()
>
> with open("...") as csvfile:
> reader = csv.reader(csvfile, ...)
> for row in reader:
> in_queue.put(row)
>
> for t in tlist: in_queue.put(None) # indicate end-of-work
> for t in tlist: t.join() # wait until finished
> out_queue.put(None)
> output_thread.join() # wait until finished
>
> --
> Gabriel Genellina
>
> --
> http://mail.python.org/mailman/listinfo/python-list
>
More information about the Python-list
mailing list