import ctypes import threading import time import Queue ################################################# # # NOTE: I am using unicode APIs in case this code is used on WinCE in future BYTE = ctypes.c_ubyte CHAR = ctypes.c_char LPVOID = ctypes.c_void_p LPCWSTR = ctypes.c_wchar_p WORD = ctypes.c_ushort DWORD = ctypes.c_uint HANDLE = DWORD INVALID_HANDLE_VALUE = -1 #0xffffffffL GENERIC_READ = 0x80000000L GENERIC_WRITE = 0x40000000L GENERIC_EXECUTE = 0x20000000L GENERIC_ALL = 0x10000000L CREATE_NEW = 1 CREATE_ALWAYS = 2 OPEN_EXISTING = 3 OPEN_ALWAYS = 4 TRUNCATE_EXISTING = 5 def CheckHANDLE(value): if value == INVALID_HANDLE_VALUE: raise ctypes.WinError() return value def CheckBOOL(value): if not value: raise ctypes.WinError() return value class DCB(ctypes.Structure): FLAG_BINARY = 0x1 FLAG_PARITY = 0x2 FLAG_OUTXCTSFLOW = 0x4 FLAG_OUTXDSRFLOW = 0x8 MASK_DTRCONTROL = 0x30 DTR_CONTROL_ENABLE = 0x10 FLAG_DSRSENSITIVITY = 0x40 FLAG_TXCONTINUEONXOFF = 0x80 FLAG_OUTX = 0x100 FLAG_INX = 0x200 FLAG_ERROR_CHAR = 0x400 FLAG_NULL = 0x800 MASK_RTS_CONTROL = 0x3000 RTS_CONTROL_ENABLE = 0x1000 NOPARITY = 0 ODDPARITY = 1 EVENPARITY = 2 MARKPARITY = 3 SPACEPARITY = 4 ONESTOPBIT = 0 ONE5STOPBITS = 1 TWOSTOPBITS = 2 _fields_ = [ ('DCBlength', DWORD), ('BaudRate', DWORD), ('flags', DWORD), ('wReserved', WORD), ('XonLim', WORD), ('XoffLim', WORD), ('ByteSize', BYTE), ('Parity', BYTE), ('StopBits', BYTE), # 0=1, 1=1.5, 2=2 ('XonChar', CHAR), ('XoffChar', CHAR), ('ErrorChar', CHAR), ('EofChar', CHAR), ('EvtChar', CHAR), ('wReserved1', WORD), ] def __init__(self): ctypes.Structure.__init__(self) self.DCBlength = ctypes.sizeof(self) MAXDWORD = 0xffffffffL class COMMTIMEOUTS(ctypes.Structure): _fields_ = [ ('ReadIntervalTimeout', DWORD), ('ReadTotalTimeoutMultiplier', DWORD), ('ReadTotalTimeoutConstant', DWORD), ('WriteTotalTimeoutMultiplier', DWORD), ('WriteTotalTimeoutConstant', DWORD), ] SETXOFF = 1 SETXON = 2 SETRTS = 3 CLRRTS = 4 SETDTR = 5 CLRDTR = 6 SETBREAK = 8 CLRBREAK = 9 kernel32 = ctypes.windll.kernel32 CreateFileW = kernel32.CreateFileW # TODO: support SECURITY_ATTRIBUTES CreateFileW.argtypes = [ LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE ] CreateFileW.restype = CheckHANDLE CloseHandle = kernel32.CloseHandle CloseHandle.argtypes = [ HANDLE ] CloseHandle.restype = CheckBOOL ReadFile = kernel32.ReadFile # TODO: support OVERLAPPED ReadFile.argtypes = [ HANDLE, LPVOID, DWORD, ctypes.POINTER(DWORD), LPVOID ] ReadFile.restype = CheckBOOL WriteFile = kernel32.WriteFile # TODO: support OVERLAPPED WriteFile.argtypes = [ HANDLE, LPVOID, DWORD, ctypes.POINTER(DWORD), LPVOID ] WriteFile.restype = CheckBOOL GetCommState = kernel32.GetCommState GetCommState.argtypes = [ HANDLE, ctypes.POINTER(DCB) ] GetCommState.restype = CheckBOOL SetCommState = kernel32.SetCommState SetCommState.argtypes = [ HANDLE, ctypes.POINTER(DCB) ] SetCommState.restype = CheckBOOL GetCommTimeouts = kernel32.GetCommTimeouts GetCommTimeouts.argtypes = [ HANDLE, ctypes.POINTER(COMMTIMEOUTS) ] GetCommTimeouts.restype = CheckBOOL SetCommTimeouts = kernel32.SetCommTimeouts SetCommTimeouts.argtypes = [ HANDLE, ctypes.POINTER(COMMTIMEOUTS) ] SetCommTimeouts.restype = CheckBOOL SetupComm = kernel32.SetupComm SetupComm.argtypes = [ HANDLE, DWORD, DWORD ] SetupComm.restype = CheckBOOL EscapeCommFunction = kernel32.EscapeCommFunction EscapeCommFunction.argtypes = [ HANDLE, DWORD ] EscapeCommFunction.restype = CheckBOOL ################################################# class SerialPort(object): def __init__(self, name): if not name.startswith('\\\\.\\'): name = '\\\\.\\' + name self.name = name self.handle = CreateFileW(name, GENERIC_READ | GENERIC_WRITE, 0, None, OPEN_EXISTING, 0, 0) def close(self): if self.handle: CloseHandle(self.handle) self.handle = None def __del__(self): self.close() def read(self, n): dwRead = DWORD(0) buf = ctypes.create_string_buffer(n) ReadFile(self.handle, buf, n, ctypes.byref(dwRead), None) return buf.raw[:dwRead.value] def write(self, data): dwWritten = DWORD(0) WriteFile(self.handle, data, len(data), ctypes.byref(dwWritten), None) assert dwWritten.value == len(data) def Setup(self, baud=9600, bits=8, parity=None, non_blocking=False): cto = COMMTIMEOUTS() if non_blocking: cto.ReadIntervalTimeout = MAXDWORD cto.ReadTotalTimeoutConstant = 0 cto.ReadTotalTimeoutMultiplier = 0 cto.WriteTotalTimeoutMultiplier = 0 #cto.WriteTotalTimeoutConstant = 500 SetCommTimeouts(self.handle, ctypes.byref(cto)) SetupComm(self.handle, 4096, 4096) dcb = DCB() GetCommState(self.handle, ctypes.byref(dcb)) dcb.BaudRate = baud dcb.flags = DCB.FLAG_BINARY #dcb.flags = DCB.FLAG_BINARY | DCB.DTR_CONTROL_ENABLE | DCB.RTS_CONTROL_ENABLE | DCB.FLAG_TXCONTINUEONXOFF dcb.ByteSize = bits if parity is None: parity = DCB.NOPARITY dcb.Parity = parity dcb.StopBits = DCB.ONESTOPBIT dcb.XonLim = 0 dcb.XoffLim = 0 SetCommState(self.handle, ctypes.byref(dcb)) self.SetDTR() def SetDTR(self): EscapeCommFunction(self.handle, SETDTR) class SerialPortQueue(object): def __init__(self, port): self.port = port self.lock = threading.RLock() self.receive_thread = threading.Thread(target=self.ReceiveThreadProc) self.send_thread = threading.Thread(target=self.SendThreadProc) self.receive_queue = Queue.Queue(0) self.send_queue = Queue.Queue(0) def ReceiveThreadProc(self): while not self.stopping: self.lock.acquire() data = '' try: data = self.port.read(16 * 1024) finally: self.lock.release() if len(data) > 0: self.receive_queue.put(data) time.sleep(0.030) def SendThreadProc(self): while not self.stopping: try: data = self.send_queue.get_nowait() except Queue.Empty: time.sleep(0.030) else: self.lock.acquire() try: self.port.write(data) finally: self.lock.release() def Start(self): self.stopping = False self.receive_thread.start() self.send_thread.start() def Stop(self): self.stopping = True self.receive_thread.join() self.send_thread.join() def Send(self, data): self.send_queue.put(data) def CheckReceive(self): while True: try: data = self.receive_queue.get_nowait() except Queue.Empty: break else: self.OnReceive(data) def OnReceive(self, data): pass