new to python network programming is async_chat.push thread-safe? python3.0

davy zhang davyzhang at gmail.com
Thu Oct 23 23:03:11 EDT 2008


I wrote this server to handle incoming messages in a process using
multiprocessing named "handler", and sending message in a Thread named
"sender",  'cause I think the async_chat object can not pass between
processes.

My project is a network gate server with many complex logic handler
behind, so I use multiprocessing to handle them separately and send
back the clients later when done.
To use the server multicore cpu I tried to separate the send and
receive function in different process but it seems can not be done :)

I just get questions about this design:
1. is async_chat.push thread-safe? 'Cause I found random errors
reporting push fifo queue out of index 0 sometimes
2. is the whole design odd in any way?


here is my code

import asyncore, asynchat
import os, socket, string
from multiprocessing import Process,Manager
import pickle
import _thread

PORT = 80

policyRequest = b"<policy-file-request/>"
policyReturn = b"""<cross-domain-policy>
             <allow-access-from  domain="*"  to-ports="*"  />
             </cross-domain-policy> \x00"""

def handler(taskList,msgList):
    while 1:
        print('getting task')
        item = pickle.loads(taskList.get())
        print('item before handle ', item)
        item['msg'] += b' hanlded done'
        msgList.put(pickle.dumps(item))

def findClient(id):
    for item in clients:
        if item.idx == id:
            return item

def sender():
    global msgList
    while 1:
        item = pickle.loads(msgList.get())
        #print time()
        c = findClient(item['cid'])
        #print time()
        c.push(item['msg'])
        print('msg sent ',item['msg'])
        #print time()

class HTTPChannel(asynchat.async_chat):

    def __init__(self, server, sock, addr):
        global cid;
        asynchat.async_chat.__init__(self, sock)
        self.set_terminator(b"\x00")
        self.data = b""
        cid += 1
        self.idx = cid
        if not self in clients:
            clients.append(self)

    def collect_incoming_data(self, data):
        self.data = self.data + data
        print(data)

    def found_terminator(self):
        global taskList
        print("found",self.data)
        if self.data == policyRequest:
            self.push(policyReturn)
        else:
            d = {'cid':self.idx,'msg':self.data}
            taskList.put(pickle.dumps(d))
        self.data = b""

    def handle_close(self):
        if self in clients:
            clients.remove(self)

class HTTPServer(asyncore.dispatcher):

    def __init__(self, port):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(("", port))
        self.listen(5)

    def handle_accept(self):
        conn, addr = self.accept()
        HTTPChannel(self, conn, addr)


#
# try it out
if __name__ == "__main__":
    s = HTTPServer(PORT)
    print ("serving at port", PORT, "...")

    #clients sock obj list stored for further use
    clients=[]

    #client id auto increasement
    cid = 0

    manager = Manager()
    taskList = manager.Queue()
    msgList = manager.Queue()


    h = Process(target=handler,args=(taskList,msgList))
    h.start()


    _thread.start_new_thread(sender,())
    print('entering loop')

    asyncore.loop()



More information about the Python-list mailing list