[pypy-svn] pypy default: Added kwargs/extended paramflags support and tests to ctypes.CFUNCTYPE.
tav
commits-noreply at bitbucket.org
Sun Mar 13 19:58:33 CET 2011
Author: tav <tav at espians.com>
Branch:
Changeset: r42572:b16c3795183e
Date: 2011-03-13 18:58 +0000
http://bitbucket.org/pypy/pypy/changeset/b16c3795183e/
Log: Added kwargs/extended paramflags support and tests to
ctypes.CFUNCTYPE.
diff --git a/lib_pypy/_ctypes/function.py b/lib_pypy/_ctypes/function.py
--- a/lib_pypy/_ctypes/function.py
+++ b/lib_pypy/_ctypes/function.py
@@ -1,17 +1,28 @@
-
-from _ctypes.basics import _CData, _CDataMeta, cdata_from_address
-from _ctypes.primitive import SimpleType
-from _ctypes.basics import ArgumentError, keepalive_key
-from _ctypes.builtin import set_errno, set_last_error
import _rawffi
import sys
import traceback
+from _ctypes.basics import ArgumentError, keepalive_key
+from _ctypes.basics import _CData, _CDataMeta, cdata_from_address
+from _ctypes.builtin import set_errno, set_last_error
+from _ctypes.primitive import SimpleType
+
# XXX this file needs huge refactoring I fear
PARAMFLAG_FIN = 0x1
PARAMFLAG_FOUT = 0x2
PARAMFLAG_FLCID = 0x4
+PARAMFLAG_COMBINED = PARAMFLAG_FIN | PARAMFLAG_FOUT | PARAMFLAG_FLCID
+
+VALID_PARAMFLAGS = (
+ 0,
+ PARAMFLAG_FIN,
+ PARAMFLAG_FIN | PARAMFLAG_FOUT,
+ PARAMFLAG_FIN | PARAMFLAG_FLCID
+ )
+
+WIN64 = sys.platform == 'win32' and sys.maxint == 2**63 - 1
+
def get_com_error(errcode, riid, pIunk):
"Win32 specific: build a COM Error exception"
# XXX need C support code
@@ -54,10 +65,11 @@
def _getargtypes(self):
return self._argtypes_
+
def _setargtypes(self, argtypes):
self._ptr = None
if argtypes is None:
- self._argtypes_ = None
+ self._argtypes_ = ()
else:
for i, argtype in enumerate(argtypes):
if not hasattr(argtype, 'from_param'):
@@ -65,35 +77,86 @@
"item %d in _argtypes_ has no from_param method" % (
i + 1,))
self._argtypes_ = argtypes
+
argtypes = property(_getargtypes, _setargtypes)
+ def _getparamflags(self):
+ return self._paramflags
+
+ def _setparamflags(self, paramflags):
+ if paramflags is None or not self._argtypes_:
+ self._paramflags = None
+ return
+ if not isinstance(paramflags, tuple):
+ raise TypeError("paramflags must be a tuple or None")
+ if len(paramflags) != len(self._argtypes_):
+ raise ValueError("paramflags must have the same length as argtypes")
+ for idx, paramflag in enumerate(paramflags):
+ paramlen = len(paramflag)
+ name = default = None
+ if paramlen == 1:
+ flag = paramflag[0]
+ elif paramlen == 2:
+ flag, name = paramflag
+ elif paramlen == 3:
+ flag, name, default = paramflag
+ else:
+ raise TypeError(
+ "paramflags must be a sequence of (int [,string [,value]]) "
+ "tuples"
+ )
+ if not isinstance(flag, int):
+ raise TypeError(
+ "paramflags must be a sequence of (int [,string [,value]]) "
+ "tuples"
+ )
+ _flag = flag & PARAMFLAG_COMBINED
+ if _flag == PARAMFLAG_FOUT:
+ typ = self._argtypes_[idx]
+ if getattr(typ, '_ffiargshape', None) not in ('P', 'z', 'Z'):
+ raise TypeError(
+ "'out' parameter %d must be a pointer type, not %s"
+ % (idx+1, type(typ).__name__)
+ )
+ elif _flag not in VALID_PARAMFLAGS:
+ raise TypeError("paramflag value %d not supported" % flag)
+ self._paramflags = paramflags
+
+ paramflags = property(_getparamflags, _setparamflags)
+
def _getrestype(self):
return self._restype_
+
def _setrestype(self, restype):
self._ptr = None
if restype is int:
from ctypes import c_int
restype = c_int
- if not isinstance(restype, _CDataMeta) and not restype is None and \
- not callable(restype):
- raise TypeError("Expected ctypes type, got %s" % (restype,))
+ if not (isinstance(restype, _CDataMeta) or restype is None or
+ callable(restype)):
+ raise TypeError("restype must be a type, a callable, or None")
self._restype_ = restype
+
def _delrestype(self):
self._ptr = None
del self._restype_
+
restype = property(_getrestype, _setrestype, _delrestype)
def _geterrcheck(self):
return getattr(self, '_errcheck_', None)
+
def _seterrcheck(self, errcheck):
if not callable(errcheck):
raise TypeError("The errcheck attribute must be callable")
self._errcheck_ = errcheck
+
def _delerrcheck(self):
try:
del self._errcheck_
except AttributeError:
pass
+
errcheck = property(_geterrcheck, _seterrcheck, _delerrcheck)
def _ffishapes(self, args, restype):
@@ -116,18 +179,18 @@
self._buffer = _rawffi.Array('P')(1)
return
- args = list(args)
- argument = args.pop(0)
+ argsl = list(args)
+ argument = argsl.pop(0)
# Direct construction from raw address
- if isinstance(argument, (int, long)) and not args:
+ if isinstance(argument, (int, long)) and not argsl:
ffiargs, ffires = self._ffishapes(self._argtypes_, self._restype_)
self._ptr = _rawffi.FuncPtr(argument, ffiargs, ffires, self._flags_)
self._buffer = self._ptr.byptr()
return
- # A callback into python
- if callable(argument) and not args:
+ # A callback into Python
+ if callable(argument) and not argsl:
self.callable = argument
ffiargs, ffires = self._ffishapes(self._argtypes_, self._restype_)
if self._restype_ is None:
@@ -141,39 +204,32 @@
# Function exported from a shared library
if isinstance(argument, tuple) and len(argument) == 2:
import ctypes
- name, dll = argument
- # XXX Implement support for foreign function ordinal
- if not isinstance(name, basestring):
- raise NotImplementedError(
- "Support for foreign functions exported by ordinal "
- "hasn't been implemented yet."
- )
- self.name = name
+ self.name, dll = argument
if isinstance(dll, str):
self.dll = ctypes.CDLL(dll)
else:
self.dll = dll
- if args:
- self._paramflags = args.pop(0)
- if args:
+ if argsl:
+ self.paramflags = argsl.pop(0)
+ if argsl:
raise TypeError("Unknown constructor %s" % (args,))
- # we need to check dll anyway
+ # We need to check dll anyway
ptr = self._getfuncptr([], ctypes.c_int)
self._buffer = ptr.byptr()
return
# A COM function call, by index
if (sys.platform == 'win32' and isinstance(argument, (int, long))
- and args):
+ and argsl):
ffiargs, ffires = self._ffishapes(self._argtypes_, self._restype_)
self._com_index = argument + 0x1000
- self.name = args.pop(0)
- if args:
- self._paramflags = args.pop(0)
- if args:
- raise TypeError("Unknown constructor %s" % (args,))
- # XXX Implement support for the optional ``iid`` pointer to the
- # interface identifier used in extended error reporting.
+ self.name = argsl.pop(0)
+ if argsl:
+ self.paramflags = argsl.pop(0)
+ if argsl:
+ self._com_iid = argsl.pop(0)
+ if argsl:
+ raise TypeError("Unknown constructor %s" % (args,))
return
raise TypeError("Unknown constructor %s" % (args,))
@@ -187,29 +243,30 @@
return f
def __call__(self, *args, **kwargs):
+ argtypes = self._argtypes_
if self.callable is not None:
- if len(args) == len(self._argtypes_):
+ if len(args) == len(argtypes):
pass
elif self._flags_ & _rawffi.FUNCFLAG_CDECL:
- if len(args) < len(self._argtypes_):
- plural = len(self._argtypes_) > 1 and "s" or ""
+ if len(args) < len(argtypes):
+ plural = len(argtypes) > 1 and "s" or ""
raise TypeError(
"This function takes at least %d argument%s (%s given)"
- % (len(self._argtypes_), plural, len(args)))
+ % (len(argtypes), plural, len(args)))
else:
# For cdecl functions, we allow more actual arguments
# than the length of the argtypes tuple.
args = args[:len(self._argtypes_)]
else:
- plural = len(self._argtypes_) > 1 and "s" or ""
+ plural = len(argtypes) > 1 and "s" or ""
raise TypeError(
"This function takes %d argument%s (%s given)"
- % (len(self._argtypes_), plural, len(args)))
+ % (len(argtypes), plural, len(args)))
# check that arguments are convertible
## XXX Not as long as ctypes.cast is a callback function with
## py_object arguments...
- ## self._convert_args(self._argtypes_, args)
+ ## self._convert_args(argtypes, args, {})
try:
res = self.callable(*args)
@@ -221,22 +278,26 @@
if self._restype_ is not None:
return res
return
- argtypes = self._argtypes_
+
+ if argtypes is None:
+ argtypes = []
if self._com_index:
from ctypes import cast, c_void_p, POINTER
+ if not args:
+ raise ValueError(
+ "native COM method call without 'this' parameter"
+ )
thisarg = cast(args[0], POINTER(POINTER(c_void_p))).contents
argtypes = [c_void_p] + list(argtypes)
args = list(args)
args[0] = args[0].value
else:
thisarg = None
-
- if argtypes is None:
- argtypes = []
- args, output_values = self._convert_args(argtypes, args, kwargs)
+
+ args, outargs = self._convert_args(argtypes, args, kwargs)
argtypes = [type(arg) for arg in args]
-
+
restype = self._restype_
funcptr = self._getfuncptr(argtypes, restype, thisarg)
if self._flags_ & _rawffi.FUNCFLAG_USE_ERRNO:
@@ -251,7 +312,27 @@
set_errno(_rawffi.get_errno())
if self._flags_ & _rawffi.FUNCFLAG_USE_LASTERROR:
set_last_error(_rawffi.get_last_error())
- result = self._build_result(restype, resbuffer, argtypes, args)
+
+ result = None
+ if self._com_index:
+ if resbuffer[0] & 0x80000000:
+ raise get_com_error(resbuffer[0],
+ self._com_iid, args[0])
+ else:
+ result = int(resbuffer[0])
+ elif restype is not None:
+ checker = getattr(self.restype, '_check_retval_', None)
+ if checker:
+ val = restype(resbuffer[0])
+ # the original ctypes seems to make the distinction between
+ # classes defining a new type, and their subclasses
+ if '_type_' in restype.__dict__:
+ val = val.value
+ result = checker(val)
+ elif not isinstance(restype, _CDataMeta):
+ result = restype(resbuffer[0])
+ else:
+ result = restype._CData_retval(resbuffer)
# The 'errcheck' protocol
if self._errcheck_:
@@ -264,13 +345,13 @@
if v is not args:
result = v
- if output_values:
- if len(output_values) == 1:
- return output_values[0]
- return tuple(output_values)
+ if not outargs:
+ return result
- return result
+ if len(outargs) == 1:
+ return outargs[0]
+ return tuple(outargs)
def _getfuncptr(self, argtypes, restype, thisarg=None):
if self._ptr is not None and argtypes is self._argtypes_:
@@ -293,14 +374,17 @@
raise ValueError("COM method call without VTable")
ptr = thisarg[self._com_index - 0x1000]
return _rawffi.FuncPtr(ptr, argshapes, resshape, self._flags_)
-
+
cdll = self.dll._handle
try:
return cdll.ptr(self.name, argshapes, resshape, self._flags_)
except AttributeError:
if self._flags_ & _rawffi.FUNCFLAG_CDECL:
raise
-
+ # Win64 has no stdcall calling conv, so it should also not have the
+ # name mangling of it.
+ if WIN64:
+ raise
# For stdcall, try mangled names:
# funcname -> _funcname@<n>
# where n is 0, 4, 8, 12, ..., 128
@@ -320,7 +404,6 @@
arg = argtype.from_param(arg)
if hasattr(arg, '_as_parameter_'):
arg = arg._as_parameter_
-
if isinstance(arg, _CData):
# The usual case when argtype is defined
cobj = arg
@@ -334,134 +417,88 @@
cobj = c_int(arg)
else:
raise TypeError("Don't know how to handle %s" % (arg,))
-
return cobj
- def _convert_args(self, argtypes, args, kwargs):
- wrapped_args = []
- output_values = []
- consumed = 0
+ def _convert_args(self, argtypes, args, kwargs, marker=object()):
+ callargs = []
+ outargs = []
+ total = len(args)
+ paramflags = self._paramflags
- # XXX Implement support for kwargs/name
+ if self._com_index:
+ inargs_idx = 1
+ else:
+ inargs_idx = 0
+
+ if not paramflags and total < len(argtypes):
+ raise TypeError("not enough arguments")
+
for i, argtype in enumerate(argtypes):
- defaultvalue = None
- if self._paramflags is not None:
- paramflag = self._paramflags[i]
+ flag = 0
+ name = None
+ defval = marker
+ if paramflags:
+ paramflag = paramflags[i]
paramlen = len(paramflag)
name = None
if paramlen == 1:
- idlflag = paramflag[0]
+ flag = paramflag[0]
elif paramlen == 2:
- idlflag, name = paramflag
+ flag, name = paramflag
elif paramlen == 3:
- idlflag, name, defaultvalue = paramflag
- idlflag &= (PARAMFLAG_FIN | PARAMFLAG_FOUT | PARAMFLAG_FLCID)
-
- if idlflag in (0, PARAMFLAG_FIN):
- pass
- elif idlflag == PARAMFLAG_FOUT:
- import ctypes
- val = argtype._type_()
- output_values.append(val)
- wrapped_args.append(ctypes.byref(val))
- continue
- elif idlflag == PARAMFLAG_FIN | PARAMFLAG_FLCID:
- # Always taken from defaultvalue if given,
- # else the integer 0.
- val = defaultvalue
- if val is None:
+ flag, name, defval = paramflag
+ flag = flag & PARAMFLAG_COMBINED
+ if flag == PARAMFLAG_FIN | PARAMFLAG_FLCID:
+ val = defval
+ if val is marker:
val = 0
wrapped = self._conv_param(argtype, val)
- wrapped_args.append(wrapped)
- continue
+ callargs.append(wrapped)
+ elif flag in (0, PARAMFLAG_FIN):
+ if inargs_idx < total:
+ val = args[inargs_idx]
+ inargs_idx += 1
+ elif kwargs and name in kwargs:
+ val = kwargs[name]
+ inargs_idx += 1
+ elif defval is not marker:
+ val = defval
+ elif name:
+ raise TypeError("required argument '%s' missing" % name)
+ else:
+ raise TypeError("not enough arguments")
+ wrapped = self._conv_param(argtype, val)
+ callargs.append(wrapped)
+ elif flag == PARAMFLAG_FOUT:
+ if defval is not marker:
+ outargs.append(defval)
+ wrapped = self._conv_param(argtype, defval)
+ else:
+ import ctypes
+ val = argtype._type_()
+ outargs.append(val)
+ wrapped = ctypes.byref(val)
+ callargs.append(wrapped)
else:
- raise NotImplementedError(
- "paramflags = %s" % (self._paramflags[i],))
+ raise ValueError("paramflag %d not yet implemented" % flag)
+ else:
+ try:
+ wrapped = self._conv_param(argtype, args[i])
+ except (UnicodeError, TypeError, ValueError), e:
+ raise ArgumentError(str(e))
+ callargs.append(wrapped)
+ inargs_idx += 1
- if consumed < len(args):
- arg = args[consumed]
- elif defaultvalue is not None:
- arg = defaultvalue
- else:
- raise TypeError("Not enough arguments")
-
- try:
- wrapped = self._conv_param(argtype, arg)
- except (UnicodeError, TypeError, ValueError), e:
- raise ArgumentError(str(e))
- wrapped_args.append(wrapped)
- consumed += 1
-
- if len(wrapped_args) < len(args):
- extra = args[len(wrapped_args):]
- argtypes = list(argtypes)
+ if len(callargs) < total:
+ extra = args[len(callargs):]
for i, arg in enumerate(extra):
try:
wrapped = self._conv_param(None, arg)
except (UnicodeError, TypeError, ValueError), e:
raise ArgumentError(str(e))
- wrapped_args.append(wrapped)
- return wrapped_args, output_values
+ callargs.append(wrapped)
- def _build_result(self, restype, resbuffer, argtypes, argsandobjs):
- """Build the function result:
- If there is no OUT parameter, return the actual function result
- If there is one OUT parameter, return it
- If there are many OUT parameters, return a tuple"""
-
- retval = None
-
- if self._com_index:
- if resbuffer[0] & 0x80000000:
- raise get_com_error(resbuffer[0],
- self._com_iid, argsandobjs[0])
- else:
- retval = int(resbuffer[0])
- elif restype is not None:
- checker = getattr(self.restype, '_check_retval_', None)
- if checker:
- val = restype(resbuffer[0])
- # the original ctypes seems to make the distinction between
- # classes defining a new type, and their subclasses
- if '_type_' in restype.__dict__:
- val = val.value
- retval = checker(val)
- elif not isinstance(restype, _CDataMeta):
- retval = restype(resbuffer[0])
- else:
- retval = restype._CData_retval(resbuffer)
-
- results = []
- if self._paramflags:
- for argtype, obj, paramflag in zip(argtypes[1:], argsandobjs[1:],
- self._paramflags):
- if len(paramflag) == 2:
- idlflag, name = paramflag
- elif len(paramflag) == 3:
- idlflag, name, defaultvalue = paramflag
- else:
- idlflag = 0
- idlflag &= (PARAMFLAG_FIN | PARAMFLAG_FOUT | PARAMFLAG_FLCID)
-
- if idlflag in (0, PARAMFLAG_FIN):
- pass
- elif idlflag == PARAMFLAG_FOUT:
- val = obj.__ctypes_from_outparam__()
- results.append(val)
- elif idlflag == PARAMFLAG_FIN | PARAMFLAG_FLCID:
- pass
- else:
- raise NotImplementedError(
- "paramflags = %s" % (paramflag,))
-
- if results:
- if len(results) == 1:
- return results[0]
- else:
- return tuple(results)
-
- # No output parameter, return the actual function result.
- return retval
+ return callargs, outargs
def __nonzero__(self):
return bool(self._buffer[0])
diff --git a/pypy/module/test_lib_pypy/ctypes_tests/test_prototypes.py b/pypy/module/test_lib_pypy/ctypes_tests/test_prototypes.py
--- a/pypy/module/test_lib_pypy/ctypes_tests/test_prototypes.py
+++ b/pypy/module/test_lib_pypy/ctypes_tests/test_prototypes.py
@@ -27,6 +27,55 @@
_ctypes_test = str(conftest.sofile)
mod.testdll = CDLL(_ctypes_test)
+class TestFuncPrototypes(BaseCTypesTestChecker):
+
+ def test_restype_setattr(self):
+ func = testdll._testfunc_p_p
+ raises(TypeError, setattr, func, 'restype', 20)
+
+ def test_argtypes_setattr(self):
+ func = testdll._testfunc_p_p
+ raises(TypeError, setattr, func, 'argtypes', 20)
+ raises(TypeError, setattr, func, 'argtypes', [20])
+
+ func = CFUNCTYPE(c_long, c_void_p, c_long)(lambda: None)
+ assert func.argtypes == (c_void_p, c_long)
+
+ def test_paramflags_setattr(self):
+ func = CFUNCTYPE(c_long, c_void_p, c_long)(lambda: None)
+ raises(TypeError, setattr, func, 'paramflags', 'spam')
+ raises(ValueError, setattr, func, 'paramflags', (1, 2, 3, 4))
+ raises(TypeError, setattr, func, 'paramflags', ((1,), ('a',)))
+ func.paramflags = (1,), (1|4,)
+
+ def test_kwargs(self):
+ proto = CFUNCTYPE(c_char_p, c_char_p, c_int)
+ paramflags = (1, 'text', "tavino"), (1, 'letter', ord('v'))
+ func = proto(('my_strchr', testdll), paramflags)
+ assert func.argtypes == (c_char_p, c_int)
+ assert func.restype == c_char_p
+
+ result = func("abcd", ord('b'))
+ assert result == "bcd"
+
+ result = func()
+ assert result == "vino"
+
+ result = func("grapevine")
+ assert result == "vine"
+
+ result = func(text="grapevine")
+ assert result == "vine"
+
+ result = func(letter=ord('i'))
+ assert result == "ino"
+
+ result = func(letter=ord('p'), text="impossible")
+ assert result == "possible"
+
+ result = func(text="impossible", letter=ord('p'))
+ assert result == "possible"
+
# Return machine address `a` as a (possibly long) non-negative integer.
# Starting with Python 2.5, id(anything) is always non-negative, and
# the ctypes addressof() inherits that via PyLong_FromVoidPtr().
More information about the Pypy-commit
mailing list