[Twisted-Python] 2 questions: adbapi.ConnectionPool ; Defer.DeferredSemaphore , threads.deferToThread
a- can someone clarify adbapi.ConnectionPool for me? i'm specifically wondering if connect/disconnect returns a handle to the pool and it the pool blocks until a handle is ready. i think it probably doesn't work that way. but i feel the need to check. b- has anyone ever used Defer.DeferredSemaphore with threads.deferToThread ? if so, how? all of my attempts thus far have not given the desired results specifically, i'm trying to have a scheduled job queue seed 1-50 'tasks' at once, each one using a seperate DB connection and running in a seperate thread. once they all complete, the job reschedules itself to run in the future. i'm probably approaching this problem wrong. so feel free to tell me i'm an idiot and point in a better direction. the closest thing I could think of was: 1- imports= [ self.semaphore.run( threads.deferToThread ( self.import_request, queue_item ) ) for queue_item in queue_import ] 2- imports= [ threads.deferToThread( self.semaphore.run ( self.import_request, queue_item ) ) for queue_item in queue_import ] neither of which work as: 1- gives me imports all at once, as deferToThread immediately returns a deferred object 2- self.import_request blocks , as its not in its own thread
Jonathan Vanasco wrote:
a- can someone clarify adbapi.ConnectionPool for me? i'm specifically wondering if connect/disconnect returns a handle to the pool and it the pool blocks until a handle is ready. i think it probably doesn't work that way. but i feel the need to check.
I can only answer your first question. Connect/disconnect are blocking functions and are generally only called from the connection pool itself. User code generally only needs to call runInteraction, runQuery, or runOperation, and the pool will make connections as needed. dave
a- can someone clarify adbapi.ConnectionPool for me? i'm specifically wondering if connect/disconnect returns a handle to the pool and it the pool blocks until a handle is ready. i think it probably doesn't work that way. but i feel the need to check.
I can only answer your first question. Connect/disconnect are blocking functions and are generally only called from the connection pool itself.
User code generally only needs to call runInteraction, runQuery, or runOperation, and the pool will make connections as needed.
hm... i'll have to play with this. i'm trying to fork some already written scripts that block as threads... grabbing a lock on a db from the pool, then returning it when done. i answered part b for me. i don't know if this example below would be of use to anyone, and its definitely poorly written. but it is both functional and example of running a bunch of blocking processes in a finite number of threads managed by DeferredSemaphore ( i could only find docs/examples for infinite threads or non- threaded DeferredSemaphore ) ------------------------------------------------------------------------ ------------------ from twisted.application import internet from twisted.internet import defer , reactor , threads import time CHECK_PERIOD= 5 SIMULTANEOUS= 2 HELP= 1 class ThreadedSemaphoreService(object): semaphore= defer.DeferredSemaphore( tokens= SIMULTANEOUS ) def __init__(self): self.interval_action() def interval_action( self ): if HELP: print "interval_action()" # this would likely be a sql select action_queue= range(1,6) if len ( action_queue ): actions= [] for action_item in action_queue: d= self.act_on( action_item ) actions.append(d) finished= defer.DeferredList( actions ) finished.addCallback( self.re_register__interval_action ) else: self.re_register__interval_action() def re_register__interval_action( self , deferlist ): if HELP: print "re_register__interval_action()" reactor.callLater( CHECK_PERIOD , self.interval_action ) def act_on( self , queue_item ): if HELP: print "in main - act_on()" t= threads.deferToThread( self._act_on ) return t def _act_on( self ): if HELP: print "\t thread - _act_on()" d= self.semaphore.acquire() d.addCallback( self.sleeper ) d.addCallback( self.awake ) def sleeper( self , deferredSemaphore ): if HELP: print "\t thread - sleeper() - i want to sleep in my own thread tonight" time.sleep(2) return deferredSemaphore def awake( self , deferredSemaphore ): if HELP: print "\t thead - awake() - now awake" deferredSemaphore.release() if __name__ == '__main__': myService= ThreadedSemaphoreService() reactor.run()
On Thu, Sep 14, 2006 at 11:31:38PM -0400, Jonathan Vanasco wrote: [...]
b- has anyone ever used Defer.DeferredSemaphore with threads.deferToThread ? if so, how? all of my attempts thus far have not given the desired results
specifically, i'm trying to have a scheduled job queue seed 1-50 'tasks' at once, each one using a seperate DB connection and running in a seperate thread. once they all complete, the job reschedules itself to run in the future.
i'm probably approaching this problem wrong. so feel free to tell me i'm an idiot and point in a better direction.
the closest thing I could think of was: 1- imports= [ self.semaphore.run( threads.deferToThread ( self.import_request, queue_item ) ) for queue_item in queue_import ] 2- imports= [ threads.deferToThread( self.semaphore.run ( self.import_request, queue_item ) ) for queue_item in queue_import ]
neither of which work as: 1- gives me imports all at once, as deferToThread immediately returns a deferred object 2- self.import_request blocks , as its not in its own thread
You probably want something more like: [self.semaphore.run(threads.deferToThread, self.import_request, queue_import) for queue_item in queue_import] -Andrew.
participants (3)
-
Andrew Bennetts
-
Dave Peticolas
-
Jonathan Vanasco