[Tutor] using multiprocessing efficiently to process large data file

eryksun eryksun at gmail.com
Sun Sep 2 07:48:31 CEST 2012


On Sat, Sep 1, 2012 at 9:14 AM, Wayne Werner <wayne at waynewerner.com> wrote:
>
> with open('inputfile') as f:
>     for line1, line2, line3, line4 in zip(f,f,f,f):
>         # do your processing here

Use itertools.izip_longest (zip_longest in 3.x) for this. Items in the
final batch are set to fillvalue (defaults to None) if the iterator
has reached the end of the file.

Below I've included a template that uses a multiprocessing.Pool, but
only if there are cores available. On a single-core system it falls
back to using itertools.imap (use built-in map in 3.x).

    from multiprocessing import Pool, cpu_count
    from itertools import izip_longest, imap

    FILE_IN = '...'
    FILE_OUT = '...'

    NLINES = 1000000 # estimate this for a good chunk_size
    BATCH_SIZE = 8

    def func(batch):
        """ test func """
        import os, time
        time.sleep(0.001)
        return "%d: %s\n" % (os.getpid(), repr(batch))

    if __name__ == '__main__': # <-- required for Windows

        file_in, file_out = open(FILE_IN), open(FILE_OUT, 'w')
        nworkers = cpu_count() - 1

        with file_in, file_out:
            batches = izip_longest(* [file_in] * BATCH_SIZE)
            if nworkers > 0:
                pool = Pool(nworkers)
                chunk_size = NLINES // BATCH_SIZE // nworkers
                result = pool.imap(func, batches, chunk_size)
            else:
                result = imap(func, batches)
            file_out.writelines(result)


More information about the Tutor mailing list