[pypy-svn] r69377 - in pypy/trunk/pypy/module/oracle: . test

afa at codespeak.net afa at codespeak.net
Wed Nov 18 15:04:38 CET 2009


Author: afa
Date: Wed Nov 18 15:04:36 2009
New Revision: 69377

Modified:
   pypy/trunk/pypy/module/oracle/interp_cursor.py
   pypy/trunk/pypy/module/oracle/interp_variable.py
   pypy/trunk/pypy/module/oracle/roci.py
   pypy/trunk/pypy/module/oracle/test/test_cursor.py
Log:
implement cursor.description


Modified: pypy/trunk/pypy/module/oracle/interp_cursor.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_cursor.py	(original)
+++ pypy/trunk/pypy/module/oracle/interp_cursor.py	Wed Nov 18 15:04:36 2009
@@ -351,6 +351,187 @@
 
         self.fetchVariables = None
 
+    def getDescription(space, self):
+        "Return a list of 7-tuples consisting of the description of "
+        "the define variables"
+
+        # make sure the cursor is open
+        self._checkOpen(space)
+
+        # fixup bound cursor, if necessary
+        self._fixupBoundCursor()
+
+        # if not a query, return None
+        if self.statementType != roci.OCI_STMT_SELECT:
+            return
+
+        # determine number of items in select-list
+        attrptr = lltype.malloc(rffi.CArrayPtr(roci.ub1).TO, 1, flavor='raw')
+        try:
+            status = roci.OCIAttrGet(
+                self.handle, roci.OCI_HTYPE_STMT,
+                rffi.cast(roci.dvoidp, attrptr),
+                lltype.nullptr(roci.Ptr(roci.ub4).TO),
+                roci.OCI_ATTR_PARAM_COUNT,
+                self.environment.errorHandle)
+            self.environment.checkForError(
+                status,
+                "Cursor_GetDescription()")
+            numItems = attrptr[0]
+        finally:
+            lltype.free(attrptr, flavor='raw')
+
+        return space.newlist(
+            [space.newtuple(self._itemDescription(space, i + 1))
+             for i in range(numItems)])
+
+    def _itemDescription(self, space, pos):
+        "Return a tuple describing the item at the given position"
+
+        # acquire parameter descriptor
+        paramptr = lltype.malloc(roci.Ptr(roci.OCIParam).TO,
+                                 1, flavor='raw')
+        try:
+            status = roci.OCIParamGet(
+                self.handle, roci.OCI_HTYPE_STMT,
+                self.environment.errorHandle,
+                rffi.cast(roci.dvoidpp, paramptr),
+                pos)
+            self.environment.checkForError(
+                status,
+                "Cursor_GetDescription(): parameter")
+            param = paramptr[0]
+        finally:
+            lltype.free(paramptr, flavor='raw')
+
+        try:
+            # acquire usable type of item
+            varType = interp_variable.typeByOracleDescriptor(
+                param, self.environment)
+
+            # acquire internal size of item
+            attrptr = lltype.malloc(rffi.CArrayPtr(roci.ub2).TO, 1,
+                                    flavor='raw')
+            try:
+                status = roci.OCIAttrGet(
+                    param, roci.OCI_HTYPE_DESCRIBE,
+                    rffi.cast(roci.dvoidp, attrptr),
+                    lltype.nullptr(roci.Ptr(roci.ub4).TO),
+                    roci.OCI_ATTR_DATA_SIZE,
+                    self.environment.errorHandle)
+                self.environment.checkForError(
+                    status,
+                    "Cursor_ItemDescription(): internal size")
+                internalSize = attrptr[0]
+            finally:
+                lltype.free(attrptr, flavor='raw')
+
+            # acquire name of item
+            nameptr = lltype.malloc(rffi.CArrayPtr(roci.oratext).TO, 1,
+                                    flavor='raw')
+            lenptr = lltype.malloc(rffi.CArrayPtr(roci.ub4).TO, 1,
+                                    flavor='raw')
+            try:
+                status = roci.OCIAttrGet(
+                    param, roci.OCI_HTYPE_DESCRIBE,
+                    rffi.cast(roci.dvoidp, nameptr),
+                    lenptr,
+                    roci.OCI_ATTR_NAME,
+                    self.environment.errorHandle)
+                self.environment.checkForError(
+                    status,
+                    "Cursor_ItemDescription(): name")
+                name = rffi.charpsize2str(nameptr[0], lenptr[0])
+            finally:
+                lltype.free(nameptr, flavor='raw')
+                lltype.free(lenptr, flavor='raw')
+
+            # lookup precision and scale
+            if varType == interp_variable.VT_Float:
+                attrptr = lltype.malloc(rffi.CArrayPtr(roci.sb1).TO, 1,
+                                        flavor='raw')
+                try:
+                    status = roci.OCIAttrGet(
+                        param, roci.OCI_HTYPE_DESCRIBE,
+                        rffi.cast(roci.dvoidp, attrptr),
+                        lltype.nullptr(roci.Ptr(roci.ub4).TO),
+                        roci.OCI_ATTR_SCALE,
+                        self.environment.errorHandle)
+                    self.environment.checkForError(
+                        status,
+                        "Cursor_ItemDescription(): scale")
+                    scale = attrptr[0]
+                finally:
+                    lltype.free(attrptr, flavor='raw')
+
+                attrptr = lltype.malloc(rffi.CArrayPtr(roci.ub2).TO, 1,
+                                        flavor='raw')
+                try:
+                    status = roci.OCIAttrGet(
+                        param, roci.OCI_HTYPE_DESCRIBE,
+                        rffi.cast(roci.dvoidp, attrptr),
+                        lltype.nullptr(roci.Ptr(roci.ub4).TO),
+                        roci.OCI_ATTR_PRECISION,
+                        self.environment.errorHandle)
+                    self.environment.checkForError(
+                        status,
+                        "Cursor_ItemDescription(): precision")
+                    precision = attrptr[0]
+                finally:
+                    lltype.free(attrptr, flavor='raw')
+            else:
+                scale = 0
+                precision = 0
+
+            # lookup whether null is permitted for the attribute
+            attrptr = lltype.malloc(rffi.CArrayPtr(roci.ub1).TO, 1,
+                                    flavor='raw')
+            try:
+                status = roci.OCIAttrGet(
+                    param, roci.OCI_HTYPE_DESCRIBE,
+                    rffi.cast(roci.dvoidp, attrptr),
+                    lltype.nullptr(roci.Ptr(roci.ub4).TO),
+                    roci.OCI_ATTR_IS_NULL,
+                    self.environment.errorHandle)
+                self.environment.checkForError(
+                    status,
+                    "Cursor_ItemDescription(): nullable")
+                nullable = attrptr[0] != 0
+            finally:
+                lltype.free(attrptr, flavor='raw')
+
+            # set display size based on data type
+            if varType == interp_variable.VT_String:
+                displaySize = internalSize
+            elif varType == interp_variable.VT_NationalCharString:
+                displaySize = internalSize / 2
+            elif varType == interp_variable.VT_Binary:
+                displaySize = internalSize
+            elif varType == interp_variable.VT_FixedChar:
+                displaySize = internalSize
+            elif varType == interp_variable.VT_FixedNationalChar:
+                displaySize = internalSize / 2
+            elif varType == interp_variable.VT_Float:
+                if precision:
+                    displaySize = precision + 1
+                    if scale > 0:
+                        displaySize += scale + 1
+                else:
+                    displaySize = 127
+            elif varType == interp_variable.VT_DateTime:
+                displaySize = 23
+            else:
+                displaySize = -1
+
+            # return the tuple
+            return [space.wrap(name), space.gettypeobject(varType.typedef),
+                    space.wrap(displaySize), space.wrap(internalSize),
+                    space.wrap(precision), space.wrap(scale),
+                    space.wrap(nullable)]
+
+        finally:
+            roci.OCIDescriptorFree(param, roci.OCI_DTYPE_PARAM)
+
     def _setBindVariablesByPos(self, space,
                                w_vars, numElements, arrayPos, defer):
         "handle positional binds"
@@ -865,5 +1046,6 @@
     bindarraysize = GetSetProperty(cursor_bindarraysize_get, cursor_bindarraysize_set),
     rowcount = interp_attrproperty('rowCount', W_Cursor),
     statement = interp_attrproperty_w('w_statement', W_Cursor),
-    bindvars = GetSetProperty(cursor_bindvars_get)
+    bindvars = GetSetProperty(cursor_bindvars_get),
+    description = GetSetProperty(W_Cursor.getDescription),
 )

Modified: pypy/trunk/pypy/module/oracle/interp_variable.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_variable.py	(original)
+++ pypy/trunk/pypy/module/oracle/interp_variable.py	Wed Nov 18 15:04:36 2009
@@ -36,7 +36,7 @@
     
 def _defineHelper(cursor, param, position, numElements):
     # determine data type
-    varType = _typeByOracleDescriptor(param, cursor.environment)
+    varType = typeByOracleDescriptor(param, cursor.environment)
     if cursor.numbersAsStrings and varType is VT_Float:
         varType = VT_NumberAsString
 
@@ -656,7 +656,7 @@
         )
     variableTypeByTypedef[cls.typedef] = cls
 
-def _typeByOracleDescriptor(param, environment):
+def typeByOracleDescriptor(param, environment):
     # retrieve datatype of the parameter
     attrptr = lltype.malloc(rffi.CArrayPtr(roci.ub2).TO, 1, flavor='raw')
     try:

Modified: pypy/trunk/pypy/module/oracle/roci.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/roci.py	(original)
+++ pypy/trunk/pypy/module/oracle/roci.py	Wed Nov 18 15:04:36 2009
@@ -24,6 +24,7 @@
     _compilation_info_ = eci
 
     ub1 = platform.SimpleType('ub1', rffi.UINT)
+    sb1 = platform.SimpleType('sb1', rffi.INT)
     ub2 = platform.SimpleType('ub2', rffi.UINT)
     sb2 = platform.SimpleType('sb2', rffi.INT)
     ub4 = platform.SimpleType('ub4', rffi.UINT)
@@ -53,6 +54,7 @@
     OCI_CRED_RDBMS
     OCI_ATTR_SERVER OCI_ATTR_SESSION OCI_ATTR_USERNAME OCI_ATTR_PASSWORD
     OCI_ATTR_STMT_TYPE OCI_ATTR_PARAM_COUNT OCI_ATTR_ROW_COUNT
+    OCI_ATTR_NAME OCI_ATTR_SCALE OCI_ATTR_PRECISION OCI_ATTR_IS_NULL
     OCI_ATTR_DATA_SIZE OCI_ATTR_DATA_TYPE OCI_ATTR_CHARSET_FORM
     OCI_ATTR_PARSE_ERROR_OFFSET
     OCI_NTV_SYNTAX

Modified: pypy/trunk/pypy/module/oracle/test/test_cursor.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/test/test_cursor.py	(original)
+++ pypy/trunk/pypy/module/oracle/test/test_cursor.py	Wed Nov 18 15:04:36 2009
@@ -150,3 +150,27 @@
         count, = cur.fetchone()
         assert count == len(rows)
         assert var.maxlength == 100 * self.cnx.maxBytesPerCharacter
+
+    def test_description_number(self):
+        cur = self.cnx.cursor()
+        try:
+            cur.execute("drop table pypy_temp_table")
+        except oracle.DatabaseError:
+            pass
+        cur.execute("create table pypy_temp_table ("
+                    "intcol                number(9) not null,"
+                    "numbercol             number(9, 2) not null,"
+                    "floatcol              float not null,"
+                    "unconstrainedcol      number not null,"
+                    "nullablecol           number(38)"
+                    ")")
+        cur.execute("select * from pypy_temp_table")
+        got = cur.description
+        expected = [
+            ('INTCOL', oracle.NUMBER, 10, 22, 9, 0, 0),
+            ('NUMBERCOL', oracle.NUMBER, 13, 22, 9, 2, 0),
+            ('FLOATCOL', oracle.NUMBER, 127, 22, 126, -127, 0),
+            ('UNCONSTRAINEDCOL', oracle.NUMBER, 127, 22, 0, -127, 0),
+            ('NULLABLECOL', oracle.NUMBER, 39, 22, 38, 0, 1),
+            ]
+        assert got == expected



More information about the Pypy-commit mailing list