[Python-es] Problema con Queue.put

Chema Cortes pych3m4 en gmail.com
Mie Mayo 9 03:32:59 CEST 2012


EL 2012/5/8 Daπid <davidmenhur en gmail.com>:
> Esto es lo que tengo ahora:
>
> http://pastebin.com/pa2dtNuN
>
> Y esto lo que obtengo:
>
> Started!
> Standard map:
> 1 , 2
> ---
> 2 , 4
> ---
>
> Parallel map
> ==Saving==
> [1, 2]
> Process Process-3:
> Traceback (most recent call last):
>  File "C:\Python26\lib\multiprocessing\process.py", line 232, in _bootstrap
>    self.run()
>  File "C:\Python26\lib\multiprocessing\process.py", line 88, in run
>    self._target(*self._args, **self._kwargs)
>  File "F:\Mis documentos\eclipse\Research\Networkx\src\Statistics\Mod_infty_II-b\Parallel\paralell_test.py",
> line 35, in saving
>    savefile.write(item[0])
> ValueError: I/O operation on closed file
> 2 , 4
> 1 , 2
> 1
> End!
>
> Ocurre lo siguiente:
>
> - El map normal funciona, y es capaz de poner objetos en la cola sin problemas.
> - Desde el map de multithreading los hilos se atascan al llegar al
> q.put (no llegan a ponerlo en la cola), pero sin embargo, el pool.join
> se desbloquea.
> - El proceso de guardado no es capaz de acceder al fichero, al aparecer cerrado.
>
> ¿Alguna idea?

Creo que tienes un problema de tiempos, que no dejas que terminen los
procesos por sí sólos. Además de ésto, no te fíes del 'print' para
comprobar los resultados de los procesos puesto que el módulo
'multiprocessing' cachea la salida y que sólo muestra cuando el buffer
se llena o se invoca explícitamente el 'sys.stdout.flush()'

El guión de lo que sucese podría ser el siguiente:

LLenado del queue de datos:

        print 'Standard map:'
        map(calculate, pars)
        print

Creación del pool:

        print 'Parallel map'
        pool = Pool(processes=min(ncpu, len(pars)))
        pool.map_async(calculate, pars, chunksize=1)
        pool.close()

Invocación del proceso "saving":

        k=Process(target=saving, args=(q,savefile,))
        k.start()

Prácticamente se ejecuta al instante con los datos del queue guardados
con anterioridad a la creación del pool. En el proceso "saving" se
produce una excepción al quedar vacia la queue, imprimiendo un "#"
(que no ves por el cacheo del stdout comentado) y queda en espera
(time.sleep(0.5)).

Ejecución de los procesos del pool:

        pool.join()

Prácticamente se ejecuta al instante, como el caso del proceso
"saving". Se llena el queue de nuevo, pero el proceso "saving" sigue a
la espera durante un rato (time.sleep(0.5))

Finalización del queue:

        print q.qsize()
        q.close()
        q.join_thread()

Aquí, el proceso principal espera hasta que el queue se vacíe del
todo. Simultánemente al momento en que el proceso "saving" obtenga el
último dato de la queue, el proceso principal ejecuta lo siguiente:

        k.terminate()
        pool.terminate()

O sea, el proceso "saving" prácticamente se ve interrumpido en el
mismo momento que extrae el último dato de la queue, que en el caso de
windows se nota por un fallo general de las I/O.


SOLUCIÓN: emplea JoinableQueue . En este tipo de queue hay que
emparejar cada get() con un task_done(), un modo de señalizar que el
procesado del dato ha acabado.

 def saving(q, savefile):
    print '==Saving=='
    while True:
        item=q.get()
        print item
        print >>savefile, item[0],',',item[1],'\r\n'
        q.task_done()

if __name__=="__main__":
    print 'Started!'

    q=JoinableQueue()

....

    q.close()
    q.join()



-- 
Hyperreals *R: http://ch3m4.org/blog
Quarks, bits y otras criaturas infinitesimales


Más información sobre la lista de distribución Python-es