[Python-ideas] sharedmutable locking solution

Aaron Brady castironpi at comcast.net
Sun Jan 13 02:20:49 CET 2008

Practical concurrency solution attached and produced below.  Includes worked
example.  Brainstorm attached for referencing.


1.  Put a single thread in charge
2.  Lock mutation and/or all operations
3.  @mutation decorator
4.  List-only solution
5.  @onfulfill decorator
6.  Ideal solution (implemented)
      Timing:     Important   Unimportant
      Return value:
      Ignore      (i.)        (ii.)
      Use         (i.)        (iii.)

from __future__ import with_statement
import threading
import thread
from Queue import Queue
class Ref:
    '''To adjust a value a reference to which is
    not available.'''
    def __init__( self, val= None ): self.val= val
    def __repr__( self ): return repr( self.val )

class LockerItem( object ):
    def __call__( self, farevent ):
        raise NotImplemented

class LockerItemBlock( LockerItem ):
    '''Turn-taking Locker item.  Call frees its
    event, frees and waits on its parameter (must
    have 'clear' and 'wait' methods.  Use 'wait'
    to block for Call to set event, and block
    calling event.'''
    def __init__( self ):
        self.event= threading.Event()
    def __call__( self, farevent ):
    def wait( self, timeout= None ):
        self.event.wait( timeout )

class LockerItemFaF( LockerItem ):
    '''Fire-and-Forget Locker item; calling just calls
    func.  Use to polymorphisize LockerItemBlock.'''
    def __init__( self, func, *ar, **kwar ):
        self.func, self.ar, self.kwar= func, ar, kwar
        self.ret= None
    def __call__( self, _ ):
        self.ret= self.func( *self.ar, **self.kwar )

class Locker( object ):
    '''Synchronization class.  Operations can honor:
        i.  Fire, block for completion (timing
            important: do here.)
        ii. Fire, launch new thread with result upon
            completion (timing unimportant, return
            important: do something with result later,
            in turn.)
        iii.    Fire, forget, continue (timing & return
            ignored: just do.)

        Timing:     Important   Unimportant
        Return value:
        Ignore      (i.)        (ii.)
        Use         (i.)        (iii.)
    Corresponding instance functions are:
        i.  Locker.op( operation & args )
            ...code block...
            with Locker:
                ...code block...
        ii. Decorator or otherwise:
            Locker.onfulfill( operation & args )
            ( oncompletioncallable )
        iii.    Fire-and-forget (faf) + options:
            Locker.faf( operation & args )
    To accomplish this, a Locker instance has a
    Queue of callables, and -produces- LockerItemFaF
    (fire-and-forget) and LockerItemBlock instances,
    which are enqueued and granted in order received.
    acq() and op() "get the caller in line" for
    ownership; faf() gets something else in line: in
    particular, a LockerItemFaF instance.

    A LockerItem subclass instance is called when
    "it's its turn" in Queue, with 'noget' as the
    parameter; '_mainth', started in initialization,
    blocks on 'noget' after each 'get' operation.
    instance.rel() clears 'noget', and
    LockerItemFaF.__call__ does not set it.
    (LockerItemBlock.__call__ does; acq() enqueues
    LockerItemBlock instances.)


    "with 'instance': (suite)" is equivalent to:
    (suite) may not call acq().
    'op' can be used for simple-statement suites.
        instance.op( listA.append, None )
        instance.op( listA.reverse )
    Decorator 'instance.onfulfill' spawns a new
    thread, whence, when it's its turn, it calls
    its decoratee with the result of the operation
    as the only parameter.
        @instance.onfulfull( dequeA.popleft )
        def onfulfill1( result ):
            #do something with the popped value
            print '%s was popped\n'% result,

    Use @ThreadUtils.ThFork for something complex.
        def fork1():
            with instance:
                #something complex
                print listA
    def __init__( self ):
        self.queue= Queue()
        self.noget= threading.Event()
        self.thread= th= threading.Thread( target= self._mainth )
        th.setDaemon( True )
    def _mainth( self ):
        while 1:
            item= self.queue.get()
            item( self.noget )
    def acq( self ):
        item= LockerItemBlock()
        self.queue.put( item )
        return item
    def rel( self ):
    def op( self, func, *ar, **kwar ):
        ret= func( *ar, **kwar )
        return ret
    def __enter__( self ):
        return self.acq()
    def __exit__( self, *excs ):
    def onfulfill( self, func, *ar, **kwar ):
        '''decorator launches the decoratee in separate
        thread upon completion of func.'''
        locfunc= Ref()
        def callee():
            result= self.op( func, *ar, **kwar )
            locfunc.val( result )
        def prefulfill( func ):
            locfunc.val= func
            th= threading.Thread( target= callee )
        return prefulfill
    def faf( self, func, *ar, **kwar ):
        '''fire and forget'''
        return self.fafroot( False, None, func, *ar, **kwar )
    def fafoption( self, func, *ar, **kwar ):
        return self.fafroot( True, None, func, *ar, **kwar )
    def faftimeout( self, timeout, func, *ar, **kwar ):
        return self.fafroot( False, timeout, func, *ar, **kwar )
    def fafroot( self, block, timeout, func, *ar, **kwar ):
        item= LockerItemFaF( func, *ar, **kwar )
        self.queue.put( item, block, timeout )
        return item

if __name__== '__main__':
    '''Thing to notice: fulfill1 prints a >0 value
    when thTry producers are adding integers in the
    closed interval [300,600].  By design there is a
    small chance of assertion failure, unobserved yet.'''
    import time
    import random
    counter= 0
    def simplecounter():
        global counter
        ret= counter
        counter+= 1
        time.sleep( random.uniform( 0, 0.01 ) )
        return counter
    listA= []
    lockerA= Locker()
    def thTry():
        while 1:
            with lockerA:
                ret= simplecounter()
                listA.append( ret )
                print ret,
                '''this assertion fails outside of locker with.'''
                assert all( [ listA[i]< listA[i+1] 
                    for i in range( len( listA )- 1 ) ] )
            if random.random()< 0.8 or len( listA )> 10:
                '''fire-and-forget example.  80% chance of
                removing an element (hence may fail), and 100%
                if listA has 'a lot' of elements.'''
                lockerA.faf( listA.pop, 0 )
            '''return is important on this one; must block for.'''
            ret= list( lockerA.op( reversed, listA ) )
            if len( ret )> 1:
                assert all( [ ret[i]> ret[i+1] 
                    for i in range( len( ret )- 1 ) ] )
            if random.random()< .05:
                '''return is important, but timing is not.'''
                @lockerA.onfulfill( set, listA )
                def fulfill1( result ):
                    count= 0
                    for si in result:
                        if 300<= si<= 600:
                            count+= 1
                    print "\n\t%i counts in [300,600]\n"% count,
    def thInterfere():
        while 1:
            with lockerA:
                '''remove a multiple of 2 from somewhere.'''
                ret= None
                for i in range( len( listA ) ):
                    if listA[ i ]% 2== 0:
                        ret= listA.pop( i )
            if ret is not None:
                assert ret% 2== 0
                print '\n\tremoved %i\n'% ret,
            time.sleep( 0.5 )
    def thMon():
        while 1:
            print '\n\t%s\n'% listA,
            time.sleep( 0.5 )
    thread.start_new_thread( thMon, () )
    thread.start_new_thread( thInterfere, () )
    for _ in range( 10 ):
        '''start a bunch of producer threads.'''
        thread.start_new_thread( thTry, () )
        time.sleep( 0.1 )
    '''and wait.'''
    time.sleep( 1000 )
-------------- next part --------------
A non-text attachment was scrubbed...
Name: sharedmutable.zip
Type: application/octet-stream
Size: 3833 bytes
Desc: not available
URL: <http://mail.python.org/pipermail/python-ideas/attachments/20080112/60509819/attachment.obj>

More information about the Python-ideas mailing list