[Tutor] using multiprocessing efficiently to process large data file
eryksun
eryksun at gmail.com
Sun Sep 2 07:48:31 CEST 2012
On Sat, Sep 1, 2012 at 9:14 AM, Wayne Werner <wayne at waynewerner.com> wrote:
>
> with open('inputfile') as f:
> for line1, line2, line3, line4 in zip(f,f,f,f):
> # do your processing here
Use itertools.izip_longest (zip_longest in 3.x) for this. Items in the
final batch are set to fillvalue (defaults to None) if the iterator
has reached the end of the file.
Below I've included a template that uses a multiprocessing.Pool, but
only if there are cores available. On a single-core system it falls
back to using itertools.imap (use built-in map in 3.x).
from multiprocessing import Pool, cpu_count
from itertools import izip_longest, imap
FILE_IN = '...'
FILE_OUT = '...'
NLINES = 1000000 # estimate this for a good chunk_size
BATCH_SIZE = 8
def func(batch):
""" test func """
import os, time
time.sleep(0.001)
return "%d: %s\n" % (os.getpid(), repr(batch))
if __name__ == '__main__': # <-- required for Windows
file_in, file_out = open(FILE_IN), open(FILE_OUT, 'w')
nworkers = cpu_count() - 1
with file_in, file_out:
batches = izip_longest(* [file_in] * BATCH_SIZE)
if nworkers > 0:
pool = Pool(nworkers)
chunk_size = NLINES // BATCH_SIZE // nworkers
result = pool.imap(func, batches, chunk_size)
else:
result = imap(func, batches)
file_out.writelines(result)
More information about the Tutor
mailing list