[Twisted-Python] Idiom for database access within iterator/generator?

Greetings twisted gurus! I'm hoping someone with a better algorithmic brain than I could offer some advice on a good way to do something in a twisty/pythonic way. The pseudocode for what I want to do is: for item in generator_that_fetches_rows_from_database: ... do_stuff() ... where generator_that_fetches_rows_from_database is an object that uses enterprise.adbapi (or anything else) to fetch rows from a database and return them to the for..in.. loop. Now, I'm probably looking at this from too much of a blocking code perspective, rather than setting up a Deferred() chain of some sort, but rather than hacking something functional but ugly, it occurs to me that someone is bound to have done this before. It would be nice if the database accessing object could fetch rows as required, via cursors or whatever, rather than sucking an entire table into memory and then doling out one row at a time... which is what I've got working at the moment. The morbidly curious can recoil in horror at the current code in the SQLIterator class here: http://modipy.seafelt.com/browser/trunk/lib/modipy/iterator.py Any free clues? -- Justin Warren <daedalus@eigenmagic.com>

On Tuesday 03 June 2008, Justin Warren wrote:
I think decorating the method with @defer.inlineCallbacks and code like this might do the trick: for d in generator_that_fetches_rows_from_database: try: row = yield d except SomeError: ... else: do_stuff(row) Depending on how you want to handle errors, you could put the "try" statement outside the loop. The generator_that_fetches_rows_from_database does not yield actual rows, but a sequence of Deferreds, each of which delivers one row in its callback. This is all untested, so it might contain a fatal flaw, but I hope it's useful. Bye, Maarten

On Tue, 2008-06-03 at 17:03 +1000, Justin Warren wrote:
runInteraction is your friend: def getData(txn, key): # this runs in thread txn.execute("SELECT item FROM table WHERE key = :key", [key]) return [l[o] for l in txn.fetchall()] def gotItems(items): # this runs in Twisted thread for item in items: # etc... dbpool.runInteraction(getData, 23).addCallback(gotItems) Once you've gotten the list down to the Twisted thread you may find twisted.internet.task.coiterate useful.

Itamar Shtull-Trauring wrote:
Sure, but that's exactly what the OP didn't want; doing a bulk fetch from the SQL (which yes will be non-blocking for the most part) I think he wanted: def query(txn): txn.execute("...") while True: rs = txn.fetchone() if blah: continue if foo: transform data yield data @defer.inlineCallbacks def sqlQuery(): deferred_iter = runInteraction(query) for def in deferred_iter: row = yield def # do a thing with the row e.g. push it to an Athena page i.e. the function that runs inside the thread pool to be able to yield values, and the function that runs in the reactor to be able to iterate through them in a deferred-compatible fashion. AFAIK there is no such thing, which is a shame because for *really* huge queries it has the potential to significantly reduce memory usage. On a not-very related issue, it would be highly useful IMHO for

Justin Warren ha scritto:
This can be done, but you need a "pure" asynchronous PostgreSQL client. Here is an implementation in Twisted: http://hg.mperillo.ath.cx/twisted/pglib Each row is processed using the IRowConsumer interface. To keep it simple, a callback function is called for each row received from the backend. Using Python 2.5 generators you can (probabily) achieve what you want.
[...]
Manlio Perillo

On Tuesday 03 June 2008, Justin Warren wrote:
I think decorating the method with @defer.inlineCallbacks and code like this might do the trick: for d in generator_that_fetches_rows_from_database: try: row = yield d except SomeError: ... else: do_stuff(row) Depending on how you want to handle errors, you could put the "try" statement outside the loop. The generator_that_fetches_rows_from_database does not yield actual rows, but a sequence of Deferreds, each of which delivers one row in its callback. This is all untested, so it might contain a fatal flaw, but I hope it's useful. Bye, Maarten

On Tue, 2008-06-03 at 17:03 +1000, Justin Warren wrote:
runInteraction is your friend: def getData(txn, key): # this runs in thread txn.execute("SELECT item FROM table WHERE key = :key", [key]) return [l[o] for l in txn.fetchall()] def gotItems(items): # this runs in Twisted thread for item in items: # etc... dbpool.runInteraction(getData, 23).addCallback(gotItems) Once you've gotten the list down to the Twisted thread you may find twisted.internet.task.coiterate useful.

Itamar Shtull-Trauring wrote:
Sure, but that's exactly what the OP didn't want; doing a bulk fetch from the SQL (which yes will be non-blocking for the most part) I think he wanted: def query(txn): txn.execute("...") while True: rs = txn.fetchone() if blah: continue if foo: transform data yield data @defer.inlineCallbacks def sqlQuery(): deferred_iter = runInteraction(query) for def in deferred_iter: row = yield def # do a thing with the row e.g. push it to an Athena page i.e. the function that runs inside the thread pool to be able to yield values, and the function that runs in the reactor to be able to iterate through them in a deferred-compatible fashion. AFAIK there is no such thing, which is a shame because for *really* huge queries it has the potential to significantly reduce memory usage. On a not-very related issue, it would be highly useful IMHO for

Justin Warren ha scritto:
This can be done, but you need a "pure" asynchronous PostgreSQL client. Here is an implementation in Twisted: http://hg.mperillo.ath.cx/twisted/pglib Each row is processed using the IRowConsumer interface. To keep it simple, a callback function is called for each row received from the backend. Using Python 2.5 generators you can (probabily) achieve what you want.
[...]
Manlio Perillo
participants (6)
-
Itamar Shtull-Trauring
-
Jean-Paul Calderone
-
Justin Warren
-
Maarten ter Huurne
-
Manlio Perillo
-
Phil Mayers