# vi:ts=4:sw=4:et import ctypes from ctypes.com import STDMETHOD from ctypes.com.storage import IStream _rapi = ctypes.windll.rapi ########################################################### # # Internal helpers # _DWORD = ctypes.c_ulong _WCHAR = ctypes.c_wchar _BOOL = ctypes.c_int def _SUCCEEDED(hr): return hr >= 0 def _FAILED(hr): return hr < 0 ########################################################### # # Error handling # CeRapiGetError = _rapi.CeRapiGetError CeRapiGetError.argtypes = [] CeRapiGetError.restype = _DWORD # really a HRESULT CeGetLastError = _rapi.CeGetLastError CeGetLastError.argtypes = [] CeGetLastError.restype = _DWORD class RapiError(Exception): """This error is raised by some functions in this module when an error occurs""" def __init__(self, message): self.rapi_errno = CeRapiGetError() self.ce_errno = CeGetLastError() if self.rapi_errno == 0: if self.ce_errno == 0: msg = message + " (No error code)" else: msg = "%s (CeGetLastError: %d)" % (message, self.ce_errno) else: msg = "%s (CeRapiGetError: %d)" % (message, self.rapi_errno) Exception.__init__(self, msg) ########################################################### # # Initialisation # class RAPIINIT(ctypes.Structure): _fields_ = [ ("cbSize", _DWORD), ("heRapiInit", _DWORD), ("hrRapiInit", ctypes.HRESULT), ] def __init__(self): self.cbSize = ctypes.sizeof(self) CeRapiInitEx = _rapi.CeRapiInitEx CeRapiInitEx.argtypes = [ ctypes.POINTER(RAPIINIT) ] CeRapiInitEx.restype = ctypes.HRESULT CeRapiUninit = _rapi.CeRapiUninit CeRapiUninit.argtypes = [] CeRapiUninit.restype = ctypes.HRESULT def CeRapiInitTimeout(timeout_ms = 5000): """Initialises the RAPI connection, failing if it does not succeed within the specified timeout (milliseconds). Implemented by calling CeRapiInitEx, then waiting on the returned event handle with a timeout. Raises RapiError on failure. """ ri = RAPIINIT() CeRapiInitEx(ctypes.byref(ri)) # FIXME: how do you get the address of a ctypes.Structure field? hEvent = _DWORD(ri.heRapiInit) WAIT_OBJECT_0 = 0 rwait = ctypes.windll.user32.MsgWaitForMultipleObjects(1, ctypes.byref(hEvent), 0, timeout_ms, 0) if rwait == WAIT_OBJECT_0: if _FAILED(ri.hrRapiInit): raise RapiError("CeRapiInitEx failed: " + str(ri.hrRapiInit)) # else success else: # The wait timed out or failed ex = RapiError("Timeout waiting for CeRapiInitEx") CeRapiUninit() raise ex ########################################################### # # File I/O # INVALID_HANDLE_VALUE = 0xffffffff # HANDLE is an unsigned long ERROR_NO_MORE_FILES = 18 MAX_PATH = 260 class HANDLE(ctypes.c_ulong): def _check_HANDLE(handle): if handle == INVALID_HANDLE_VALUE: raise RapiError("RAPI call returned invalid handle") return handle _check_retval_ = staticmethod(_check_HANDLE) class BOOLRESULT(ctypes.c_int): def _check_BOOLRESULT(value): if not value: raise RapiError("RAPI call returned FALSE") return value _check_retval_ = staticmethod(_check_BOOLRESULT) class FILETIME(ctypes.Structure): _fields_ = [ ("dwLowDateTime", _DWORD), ("dwHighDateTime", _DWORD), ] class CE_FIND_DATA(ctypes.Structure): _fields_ = [ ("dwFileAttributes", _DWORD), ("ftCreationTime", FILETIME), ("ftLastAccessTime", FILETIME), ("ftLastWriteTime", FILETIME), ("nFileSizeHigh", _DWORD), ("nFileSizeLow", _DWORD), ("dwOID", _DWORD), ("cFileName", _WCHAR * MAX_PATH), ] _as_parameter_ = property(ctypes.byref) # always passed by reference CeFindFirstFile = _rapi.CeFindFirstFile CeFindFirstFile.argtypes = [ ctypes.c_wchar_p, CE_FIND_DATA ] CeFindFirstFile.restype = HANDLE CeFindNextFile = _rapi.CeFindNextFile CeFindNextFile.argtypes = [ HANDLE, CE_FIND_DATA ] CeFindClose = _rapi.CeFindClose # TODO: check for errors? CeFindClose.argtypes = [ HANDLE ] CeCopyFile = _rapi.CeCopyFile CeCopyFile.argtypes = [ ctypes.c_wchar_p, ctypes.c_wchar_p, _BOOL ] CeDeleteFile = _rapi.CeDeleteFile CeDeleteFile.argtypes = [ ctypes.c_wchar_p ] CeDeleteFile.restype = BOOLRESULT def listdir(path): """This function is meant to emulate os.listdir but lists files on the WinCE device""" pattern = unicode(path) if pattern[-1:] != u'\\': pattern += u'\\' pattern += u'*' lst = [] data = CE_FIND_DATA() try: hFind = CeFindFirstFile(pattern, data) except RapiError, ex: # A "no files" error is perfectly normal if ex.ce_errno == ERROR_NO_MORE_FILES: return lst raise try: lst.append(data.cFileName) while CeFindNextFile(hFind, data): lst.append(data.cFileName) finally: CeFindClose(hFind) return lst ########################################################### # # Miscellaneous # CeCloseHandle = _rapi.CeCloseHandle # TODO: check for errors? CeCloseHandle.argtypes = [ HANDLE ] RAPISTREAMFLAG = ctypes.c_int # enum STREAM_TIMEOUT_READ = 0 class IRAPIStream(IStream): #_iid_ = ? _methods_ = IStream._methods_ + [ STDMETHOD(ctypes.HRESULT, "SetRapiStat", RAPISTREAMFLAG, _DWORD), STDMETHOD(ctypes.HRESULT, "GetRapiStat", RAPISTREAMFLAG, ctypes.POINTER(_DWORD)), ] CeRapiInvoke = _rapi.CeRapiInvoke CeRapiInvoke.argtypes = [ ctypes.c_wchar_p, ctypes.c_wchar_p, _DWORD, ctypes.c_void_p, ctypes.POINTER(_DWORD), ctypes.POINTER(ctypes.c_void_p), ctypes.POINTER(ctypes.POINTER(IRAPIStream)), _DWORD ] CeRapiInvoke.restype = ctypes.HRESULT LocalFree = ctypes.windll.kernel32.LocalFree LocalFree.argtypes = [ ctypes.c_void_p ] class PROCESS_INFORMATION(ctypes.Structure): _fields_ = [ ("hProcess", HANDLE), ("hThread", HANDLE), ("dwProcessId", _DWORD), ("dwThreadId", _DWORD), ] _as_parameter_ = property(ctypes.byref) # always passed by reference CeCreateProcess = _rapi.CeCreateProcess CeCreateProcess.argtypes = [ ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_void_p, ctypes.c_void_p, _BOOL, _DWORD, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, PROCESS_INFORMATION ] CeCreateProcess.restype = BOOLRESULT def RunProcess(program, command_line, flags=0): if command_line: command_line = unicode(command_line) else: command_line = None pi = PROCESS_INFORMATION() CeCreateProcess(unicode(program), command_line, None, None, 0, 0, None, None, None, pi) CeCloseHandle(pi.hProcess) CeCloseHandle(pi.hThread) return pi.dwProcessId ########################################################### # # Testing # def test(): ri = RAPIINIT() print "Calling CeRapiInitEx" CeRapiInitEx(ctypes.byref(ri)) print "CeRapiInitEx started..." # FIXME: how do you get the address of a ctypes.Structure field? hEvent = _DWORD(ri.heRapiInit) WAIT_OBJECT_0 = 0 rwait = ctypes.windll.user32.MsgWaitForMultipleObjects(1, ctypes.byref(hEvent), 0, 5000, 0) if rwait == WAIT_OBJECT_0: if _SUCCEEDED(ri.hrRapiInit): print "CeRapiInitEx finished successfully" else: print "CeRapiInitEx failed: ", ri.hrRapiInit return else: print "CeRapiInitEx timed out: calling CeRapiUninit" CeRapiUninit() return # Init success # Clean up print "Calling CeRapiUninit" CeRapiUninit() if __name__ == '__main__': test()