cooperate and keeping data integrity in response
Sorry about the double post. But copy/paste failed me. I left out part of the code. Now included I have been looking at JPCalderones example of using web.request with JSON which seems much like what I want. One thing I am not clear about is if I get a lot of queries coming in more or less simultaneously and I am using cooperate to allow other functions to run, will I need to guard against my data in a list being overwritten by subsequent requests. The way I see it the functions to read data and and store it in my list are in danger of impacting each other. The response is being built cooperatively bit by bit to permit other functions to run so it could happen that the next request overwrites my list where the database query is being stored. If this is a danger, then I need to prevent this, which seems to imply that I will need to block each request and not service another request until the previous one has completed. Have I got that right or am I way off target or have I missed the obvious What would be good is to keep on servicing requests so that the response is good but keep the data integrity. The test code I am using is JP's with some minor variations shown below. I hope it formats correctly. Thanks for any help. Regards John Aherne #Asynchronous JSON #Today in #twisted.web the topic of generating large JSON responses #in a Twisted Web server came up. The problem was that the data being #serialized into JSON was so large that the JSON serialization process #itself would block the web server, preventing other requests from being #serviced. # #The first solution that came up was to split the web server into two #pieces, so that the URLs which could have these JSON responses were #served by a different process than was serving the rest. This is a #pretty decent solution, and it also provides the benefit of using extra #CPU cores if there are any available. In this case, it complicated #things a little since it meant sharing a session across two processes. #So we went looking for another approach. # #It turns out that the json module supports incremental serialization. #When I saw the JSONEncoder.iterencode method, I thought it would be #great used in combination with cooperate to create a producer. This #would let an application serialize a large structure to JSON without #multiple processes, threads, or unreasonably blocking the reactor. # #Here's the little bit of glue necessary to make things work: import cgi from json import JSONEncoder from twisted.enterprise import adbapi from twisted.internet.task import cooperate #db = sqlite3.connect('c:\\sqlite\\test.db') #cur = db.cursor() dbpool = adbapi.ConnectionPool("pyodbc","DSN=testsql",cp_reconnect='True') class AsyncJSON(object): def __init__(self, value): self._value = value def beginProducing(self, consumer): #print 'value', self._value self._consumer = consumer self._iterable = JSONEncoder().iterencode(self._value) #print 'iterable', self._iterable self._consumer.registerProducer(self, True) self._task = cooperate(self._produce()) d = self._task.whenDone() d.addBoth(self._unregister) return d def pauseProducing(self): self._task.pause() def resumeProducing(self): self._task.resume() def stopProducing(self): self._task.stop() def _produce(self): for chunk in self._iterable: #print 'chunk', chunk self._consumer.write(chunk) yield None def _unregister(self, passthrough): self._consumer.unregisterProducer() return passthrough #By using the iterencode method, this avoids spending too much time #generating json output at once. Instead, a little bit of the input #will be serialized at a time, and each short resulting string is available #from the iterator returned by iterencode. # #By using cooperate, the _produce generator will iterated in a way that #lets it cooperate with the reactor and other event sources/handlers. #A few chunks of json data will be written to the consumer, then execution #will switch away to something else, then come back and a few more will #be written, and so on. # #And by using the producer/consumer interface, if the HTTP client which #issued the request doesn't read the results as fast as they're being #generated, the server will stop generating new output until the client #catches up. # #Altogether, this provides a very cool, efficient way to generate JSON #output. # #Here's an example to make it easier to see how one might use AsyncJSON #in a resource: # from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET from twisted.web.server import Site from twisted.internet import reactor def read_pcodes(pcode, request): """ Read postcode data and premise data for single postocde """ sql_mail = """select rcmplc01.substreet,rcmplc01.street, rcmplc01.sublocality, rcmplc01.locality, rcmplc01.town, rcmplc01.postcode, rcmplc02.data from rcmplc01 left outer join rcmplc02 on rcmplc01.postcode = rcmplc02.postcode where rcmplc01.postcode = ? """ pcode = pcode.strip().upper() def run(): return dbpool.runQuery(sql_mail,(pcode,)) d = run() d.addCallback(read_result, request) d.addErrback(read_failure,request) return d def read_failure(o, request): print 'failure', str(o) request.finish() def read_result(res, request): """ read result for postcode lookup. Build return list""" #print 'res', res print 'len res', len(res) my_list = [] for item in res: my_list.append([item[0], item[1], item[2], item[3], item[4], item[5]]) d = AsyncJSON(my_list).beginProducing(request) d.addCallback(lambda ignored: request.finish()) d.addErrback(got_error, request) class PostcodeFinder(Resource): """ Handle http POST requests for postcode lookup""" def render_POST(self, request): print 'req', request.args if request.args.get('pcode', None): pcode = cgi.escape(request.args['pcode'][0]).strip().upper() if (pcode[-3:][0].isdigit() and pcode[-2:].isalpha() and len(pcode[:len(pcode)-3].strip()) < 5): postcode2 = '%s %s' % (pcode[:len(pcode)-3].strip(),pcode[-3:]) d = read_pcodes(postcode2, request) else: return 'Not a VALID REQUEST' elif request.args.get('street', None): m25 = True if request.args.get('M25', None) else False d = read_street(cgi.escape(request.args['street'][0]).strip().upper(), m25, request) elif request.args.get('orgname', None): d = read_orgname(cgi.escape(request.args['orgname'][0]).strip().upper(), request) else: return 'Not a VALID REQUEST' return NOT_DONE_YET root = Resource() root.putChild("json", PostcodeFinder()) factory = Site(root) reactor.listenTCP(8086, factory) reactor.run()
participants (1)
-
John Aherne