[pypy-svn] r69520 - in pypy/trunk/pypy/module/oracle: . test
afa at codespeak.net
afa at codespeak.net
Mon Nov 23 00:11:56 CET 2009
Author: afa
Date: Mon Nov 23 00:11:55 2009
New Revision: 69520
Added:
pypy/trunk/pypy/module/oracle/test/test_cursorvar.py
Modified:
pypy/trunk/pypy/module/oracle/interp_connect.py
pypy/trunk/pypy/module/oracle/interp_cursor.py
pypy/trunk/pypy/module/oracle/interp_variable.py
Log:
Start implementing cursor bind variables
Modified: pypy/trunk/pypy/module/oracle/interp_connect.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_connect.py (original)
+++ pypy/trunk/pypy/module/oracle/interp_connect.py Mon Nov 23 00:11:55 2009
@@ -12,7 +12,7 @@
from pypy.module.oracle.config import string_w, StringBuffer, MAX_STRING_CHARS
from pypy.module.oracle.interp_environ import Environment
from pypy.module.oracle.interp_cursor import W_Cursor
-from pypy.module.oracle.interp_pool import W_Pool
+#from pypy.module.oracle.interp_pool import W_Pool
from pypy.module.oracle.interp_variable import VT_String
class W_Connection(Wrappable):
Modified: pypy/trunk/pypy/module/oracle/interp_cursor.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_cursor.py (original)
+++ pypy/trunk/pypy/module/oracle/interp_cursor.py Mon Nov 23 00:11:55 2009
@@ -21,6 +21,7 @@
self.statementType = -1
self.handle = None
self.isOpen = True
+ self.isOwned = False
self.setInputSizes = False
self.arraySize = 50
@@ -155,7 +156,7 @@
self._checkOpen(space)
# close the cursor
- self._freeHandle(space, raiseError=True)
+ self.freeHandle(space, raiseError=True)
self.isOpen = False
self.handle = None
@@ -223,7 +224,22 @@
interp_error.get(space).w_InterfaceError,
space.wrap("not open"))
- def _freeHandle(self, space, raiseError=True):
+ def allocateHandle(self):
+ handleptr = lltype.malloc(rffi.CArrayPtr(roci.OCIStmt).TO,
+ 1, flavor='raw')
+ try:
+ status = roci.OCIHandleAlloc(
+ self.environment.handle,
+ handleptr, roci.OCI_HTYPE_STMT, 0,
+ lltype.nullptr(rffi.CArray(roci.dvoidp)))
+ self.environment.checkForError(
+ status, "Cursor_New()")
+ self.handle = handleptr[0]
+ finally:
+ lltype.free(handleptr, flavor='raw')
+ self.isOwned = True
+
+ def freeHandle(self, space, raiseError=True):
if not self.handle:
return
if self.isOwned:
@@ -262,7 +278,7 @@
# release existing statement, if necessary
self.w_statementTag = w_tag
- self._freeHandle(space)
+ self.freeHandle(space)
# prepare statement
self.isOwned = False
Modified: pypy/trunk/pypy/module/oracle/interp_variable.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_variable.py (original)
+++ pypy/trunk/pypy/module/oracle/interp_variable.py Mon Nov 23 00:11:55 2009
@@ -162,9 +162,10 @@
flavor='raw', zero=True)
# perform extended initialization
- self.initialize(cursor)
+ self.initialize(self.environment.space, cursor)
def __del__(self):
+ self.finalize()
lltype.free(self.actualElementsPtr, flavor='raw')
if self.actualLength:
lltype.free(self.actualLength, flavor='raw')
@@ -172,6 +173,8 @@
lltype.free(self.data, flavor='raw')
if self.returnCode:
lltype.free(self.returnCode, flavor='raw')
+ if self.indicator:
+ lltype.free(self.indicator, flavor='raw')
def getBufferSize(self):
return self.size
@@ -218,7 +221,10 @@
"Variable_MakeArray(): type does not support arrays"))
self.isArray = True
- def initialize(self, cursor):
+ def initialize(self, space, cursor):
+ pass
+
+ def finalize(self):
pass
@classmethod
@@ -408,7 +414,7 @@
else:
return self.size * 2
- def initialize(self, cursor):
+ def initialize(self, space, cursor):
self.actualLength = lltype.malloc(rffi.CArrayPtr(roci.ub2).TO,
self.allocatedElements,
zero=True, flavor='raw')
@@ -797,8 +803,47 @@
pass
class VT_Cursor(W_Variable):
+ oracleType = roci.SQLT_RSET
+ size = rffi.sizeof(roci.OCIStmt)
canBeInArray = False
+ def initialize(self, space, cursor):
+ from pypy.module.oracle import interp_cursor
+ self.connection = cursor.connection
+ self.cursors_w = [None] * self.allocatedElements
+ for i in range(self.allocatedElements):
+ tempCursor = interp_cursor.W_Cursor(space, self.connection)
+ tempCursor.allocateHandle()
+ self.cursors_w[i] = space.wrap(tempCursor)
+
+ dataptr = rffi.ptradd(
+ rffi.cast(roci.Ptr(roci.OCIStmt), self.data),
+ i)
+ dataptr[0] = tempCursor.handle
+
+ def setValueProc(self, space, pos, w_value):
+ from pypy.module.oracle import interp_cursor
+ w_CursorType = space.gettypeobject(interp_cursor.W_Cursor.typedef)
+ if not space.is_true(space.isinstance(w_value, w_CursorType)):
+ raise OperationError(
+ space.w_TypeError,
+ space.wrap("expecting cursor"))
+
+ self.cursors_w[pos] = w_value
+
+ cursor = space.interp_w(interp_cursor.W_Cursor, w_value)
+ if not cursor.isOwned:
+ cursor.freeHandle(space, raiseError=True)
+ cursor.isOwned = True
+ cursor.allocateHandle()
+
+ dataptr = rffi.ptradd(
+ rffi.cast(roci.Ptr(roci.OCIStmt), self.data),
+ pos)
+ dataptr[0] = cursor.handle
+ cursor.statementType = -1
+
+
class VT_Object(W_Variable):
canBeInArray = False
@@ -942,7 +987,11 @@
# XXX Delta
- # XXX cursorType
+ from pypy.module.oracle import interp_cursor
+ if space.is_true(space.isinstance( # XXX is there an easier way?
+ w_value,
+ space.gettypeobject(interp_cursor.W_Cursor.typedef))):
+ return VT_Cursor, 0, numElements
if space.is_true(space.isinstance(w_value, get(space).w_DecimalType)):
return VT_NumberAsString, 0, numElements
Added: pypy/trunk/pypy/module/oracle/test/test_cursorvar.py
==============================================================================
--- (empty file)
+++ pypy/trunk/pypy/module/oracle/test/test_cursorvar.py Mon Nov 23 00:11:55 2009
@@ -0,0 +1,17 @@
+from pypy.module.oracle.test.test_connect import OracleTestBase
+
+class AppTestCursorVar(OracleTestBase):
+
+ def test_bind_inout(self):
+ cur = self.cnx.cursor()
+ cursor = self.cnx.cursor()
+ assert cursor.description is None
+ cur.execute("""
+ begin
+ open :cursor for select 1 numbercol from dual;
+ end;""",
+ cursor=cursor)
+ assert cursor.description == [
+ ('NUMBERCOL', oracle.NUMBER, 127, 2, 0, 0, 1)]
+ data = cursor.fetchall()
+ assert data == [(1,)]
More information about the Pypy-commit
mailing list