[Tutor] multiprocessing question

Albert-Jan Roskam fomcl at yahoo.com
Thu Nov 27 22:01:46 CET 2014





----- Original Message -----
> From: Cameron Simpson <cs at zip.com.au>
> To: Python Mailing List <tutor at python.org>
> Cc: 
> Sent: Monday, November 24, 2014 11:16 PM
> Subject: Re: [Tutor] multiprocessing question
> 
> On 24Nov2014 12:56, Albert-Jan Roskam <fomcl at yahoo.com> wrote:
>>  > From: Cameron Simpson <cs at zip.com.au>
>>>  On 23Nov2014 22:30, Albert-Jan Roskam 
> <fomcl at yahoo.com.dmarc.invalid>
>>>  wrote:
>>>>  I created some code to get records from a potentially giant .csv 
> file. This
>>>  implements a __getitem__ method that gets records from a memory-mapped 
> csv file.
>>>  In order for this to work, I need to build a lookup table that maps 
> line numbers
>>>  to line starts/ends. This works, BUT building the lookup table could be
>>>  time-consuming (and it freezes up the app). [...]
>>> 
>>>  First up, multiprocessing is not what you want. You want threading for 
> this.
>>> 
>>>  The reason is that your row index makes an in-memory index. If you do 
> this in a
>>>  subprocess (mp.Process) then the in-memory index is in a different 
> process, and
>>>  not accessable.
>> 
>> Hi Cameron,  Thanks for helping me. I read this page before I decided to go 
> for multiprocessing: 
> http://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python. 
> I never *really* understood why cPython (with GIL) could have threading anyway. 
> I am confused: I thought the idea of mutliprocessing.Manager was to share 
> information.
> 
> Regarding the GIL, it will prevent the raw python interpreter from using more 
> than one CPU: no two python opcodes run concurrently. However, any calls to C 
> libraries or the OS which may block release the GIL (broadly speaking). So 
> while the OS is off reading data from a hard drive or opening a network 
> connection or something, the Python interpreter is free to run opcodes for 
> other python threads. It is timesharing at the python opcode level. And if the 
> OS or a C library is off doing work with the GIL released then you get true 
> multithreading.
> 
> Most real code is not compute bound at the Python level, most of the time.  
> Whenever you block for I/O or delegate work to a library or another process, 
> your current Python Thread is stalled, allowing other Threads to run.
> 
> For myself, I use threads when algorithms naturally fall into parallel 
> expression or for situations like yours where some lengthy process must run but 
> I want the main body of code to commence work before it finishes. As it 
> happens, one of my common uses cases for  the latter is reading a CSV file:-)
> 
> Anywhere you want to do things in parallel, ideally I/O bound, a Thread is a 
> reasonable thing to consider. It lets you write the separate task in a nice 
> linear fashion.
> 
> With a Thread (coding errors aside) you know where you stand: the data 
> structures it works on are the very same ones used by the main program. (Of 
> course, therein lie the hazards as well.)
> 
> With multiprocessing the subprocess works on distinct data sets and (from my 
> reading) any shared data is managed by proxy objects that communicate between 
> the processes. That gets you data isolation for the subprocess, but also higher 
> latency in data access between the processes and of course the task of 
> arranging those proxy objects.
> 
> For your task I would go with a Thread.


I made a comparison between multiprocessing and threading.  In the code below (it's also here: http://pastebin.com/BmbgHtVL, multiprocessing is more than 100 (yes: one hundred) times slower than threading! That is I-must-be-doing-something-wrong-ishly slow. Any idea whether I am doing something wrong? I can't believe the difference is so big.


 
from __future__ import print_function
import threading
import mmap
import multiprocessing as mp

try:
   xrange
except NameError:
   xrange = range  # python 3

class ThreadOrProcess(object):

    def __init__(self, data, use_threading=True):
        self.data = data
        self.use_threading = use_threading

        if self.use_threading:
            self.lookup = dict()
            self.thread = threading.Thread(target=self.threaded_create_lookup,
                                           name="lookup maker thread")
            self.thread.start()
            #self.check_progress()
            self.thread.join()

        else:
            self.lookup = mp.Manager().dict()
            self.process = mp.Process(target=self.mp_create_lookup, 
                                      name="lookup maker process")
            self.process.start()
            #self.check_progress()
            self.process.join()

    def check_progress(self):
        before, after = float("nan"), float("nan")
        while before != after:
            before = len(self.lookup)
            print("%s: %d items" % (time.asctime(), before)) 
            time.sleep(0.01)
            after = len(self.lookup)

    def threaded_create_lookup(self):
        lino, record_start = 0, 0
        for line in self.data:
            if not line:
                break
            self.lookup[lino] = record_start
            lino += 1
            record_start += len(line)
        return self.lookup

    def mp_create_lookup(self):
        lino, record_start = 0, 0
        for line in self.data:
            if not line:
                break
            self.lookup[lino] = record_start
            lino += 1 
            record_start += len(line)
        return self.lookup

if __name__ == "__main__":
    import os
    import csv
    import time
    import string

    # ---- create test data
    if not os.path.exists("testfile.csv"):
        with open("testfile.csv", "wb") as f:
            writer = csv.writer(f)
            record = list(string.ascii_letters)
            for i in xrange(10 ** 5):
                writer.writerow([i] + record)

    # ---- threading
    start = time.time()
    f = open("testfile.csv", "r") 
    #threader = ThreadOrProcess(f, True)
    #print(len(dict(threader.lookup)))
    print("threading approach took: %3.3f seconds" % (time.time() - start))

    print("***********************************************")   
    # ---- multiprocessing
    start = time.time()
    f.seek(0) 
    processer = ThreadOrProcess(f, False)
    print(len(dict(processer.lookup)))
    print("mp approach took: %3.3f seconds" % (time.time() - start))



    Here is the profile report when I just use the multiprocessing part:
         402381 function calls (402378 primitive calls) in 13.726 seconds

   Ordered by: cumulative time
   List reduced from 286 to 100 due to restriction <100>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.316    0.316   13.726   13.726 check_time.py:1(<module>)
        1    0.000    0.000    7.000    7.000 check_time.py:13(__init__)
        1    0.000    0.000    6.978    6.978 /usr/lib/python2.7/multiprocessing/process.py:139(join)
        2    0.000    0.000    6.978    3.489 /usr/lib/python2.7/multiprocessing/forking.py:130(poll)
        1    0.000    0.000    6.978    6.978 /usr/lib/python2.7/multiprocessing/forking.py:146(wait)
        2    6.978    3.489    6.978    3.489 {posix.waitpid}
   100000    0.163    0.000    6.380    0.000 <string>:1(__getitem__)
   100001    0.566    0.000    6.243    0.000 /usr/lib/python2.7/multiprocessing/managers.py:746(_callmethod)
   100006    4.490    0.000    4.490    0.000 {method 'recv' of '_multiprocessing.Connection' objects}
   100005    1.194    0.000    1.194    0.000 {method 'send' of '_multiprocessing.Connection' objects}
        1    0.000    0.000    0.026    0.026 <string>:1(keys)
        1    0.000    0.000    0.017    0.017 /usr/lib/python2.7/multiprocessing/__init__.py:90(Manager)
        1    0.000    0.000    0.008    0.008 /usr/lib/python2.7/multiprocessing/managers.py:504(start)




>>>  when needed. But how do I know when I should do this if I don't yet 
> know the
>>> 
>>>  total number of records?" Make __getitem__ _block_ until 
> self.lookup_done
>>>  is
>>>  True. At that point you should know how many records there are.
>>> 
>>>  Regarding blocking, you want a Condition object or a Lock (a Lock is 
> simpler,
>>>  and Condition is more general). Using a Lock, you would create the Lock 
> and

>>>  .acquire it. In create_lookup(), release() the Lock at the end. 


I experimented a bit with both. Don't locks and conditions and so on become really relevant when, unlike in my code, more than one thread (next to the main program, of course!) is used?


>In __getitem__
>>>  (or any other function dependent on completion of create_lookup), 
> .acquire()
>>>  and then .release() the Lock. That will cause it to block until the 
> index scan
>>>  is finished.
>> 
>>  So __getitem__ cannot be called while it is being created? But wouldn't 
> that defeat the purpose? My PyQt program around it initially shows the first 25 
> records. On many occasions that's all what's needed.  
> 
> That depends on the CSV and how you're using it. If __getitem__ is just 
> "give 
> me row number N", then all it really needs to do is check against the 
> current 
> count of rows read.  Keep such a counter, updated by the scanning/indexing 
> thread. If the requested row number is less than the counter, fetch it and 
> return it.  Otherwise block/wait until the counter becomes big enough. (Or 
> throw some exception if the calling code can cope with the notion of "data 
> not 
> ready yet".)
> 
> If you want __getitem__ to block, you will need to arrange a way to do that.  
> Stupid programs busy wait:
> 
>   while counter < index_value:
>     pass
> 
> Horrendous; it causes the CPU to max out _and_ gets in the way of other work, 
> slowing everything down. The simple approach is a poll:
> 
>   while counter < index_value:
>     sleep(0.1)
> 
> This polls 10 times a second. Tuning the sleep time is a subjective call: too 
> frequent will consume resources, to infrequent will make __getitem__ too slow 
> to respond when the counter finally catches up.
> 
> A more elaborate but truly blocking scheme is to have some kind of request 
> queue, where __getitem__ makes (for example) a Condition variable and queues a 
> request for "when the counter reaches this number". When the indexer 
> reaches 
> that number (or finsihes indexing) it wakes up the condition and __getitem__ 
> gets on with its task. This requires extra code in your indexer to (a) keep a 
> PriorityQueue of requests and (b) to check for the lowest one when it 
> increments its record count. When the record count reaches the lowest request, 
> wake up every request of that count, and then record the next request (if any) 
> as the next "wake up" number. That is a sketch: there are 
> complications, such 
> as when a new request comes in lower than the current "lowest" 
> request, and so 
> forth.
> 
> I'd go with the 0.1s poll loop myself. It is simple and easy and will work. 
> Use 
> a better scheme later if needed.
> 
>>>  A remark about the create_lookup() function on pastebin: you go:
>>> 
>>>    record_start += len(line)
>>> 
>>>  This presumes that a single text character on a line consumes a single 
> byte 
> or
>>>  memory or file disc space. However, your data file is utf-8 encoded, 
> and some
>>>  characters may be more than one byte or storage. This means that your
>>>  record_start values will not be useful because they are character 
> counts, not
>>>  byte counts, and you need byte counts to offset into a file if you are 
> doing
>>>  random access.
>>> 
>>>  Instead, note the value of unicode_csv_data.tell() before reading each 
> line
>>>  (you will need to modify your CSV reader somewhat to do this, and maybe 
> return
>>>  both the offset and line text). That is a byte offset to be used later.
>> 
>> THANKS!! How could I not think of this.. I initially started wth open(), 
> which returns bytestrings.I could convert it to bytes and then take the len() 
> 
> Converting to bytes relies on that conversion being symmetric and requires you 
> to know the conversion required. Simply noting the .tell() value before the 
> line is read avoids all that: wher am I? Read line. Return line and start 
> position. Simple and direct.
> 
> 
> Cheers,
> Cameron Simpson <cs at zip.com.au>
> _______________________________________________
> Tutor maillist  -  Tutor at python.org
> To unsubscribe or change subscription options:
> https://mail.python.org/mailman/listinfo/tutor
> 


More information about the Tutor mailing list