[pypy-svn] r69654 - in pypy/trunk/pypy/module/oracle: . test

afa at codespeak.net afa at codespeak.net
Thu Nov 26 16:32:45 CET 2009


Author: afa
Date: Thu Nov 26 16:32:44 2009
New Revision: 69654

Modified:
   pypy/trunk/pypy/module/oracle/__init__.py
   pypy/trunk/pypy/module/oracle/interp_connect.py
   pypy/trunk/pypy/module/oracle/interp_environ.py
   pypy/trunk/pypy/module/oracle/interp_error.py
   pypy/trunk/pypy/module/oracle/interp_pool.py
   pypy/trunk/pypy/module/oracle/roci.py
   pypy/trunk/pypy/module/oracle/test/test_connect.py
Log:
Implement SessionPool.acquire() and release()


Modified: pypy/trunk/pypy/module/oracle/__init__.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/__init__.py	(original)
+++ pypy/trunk/pypy/module/oracle/__init__.py	Thu Nov 26 16:32:44 2009
@@ -5,6 +5,7 @@
 
     interpleveldefs = {
         'connect': 'interp_connect.W_Connection',
+        'Connection': 'interp_connect.W_Connection',
         'UNICODE': 'interp_variable.VT_NationalCharString',
         'NUMBER': 'interp_variable.VT_Float',
         'STRING': 'interp_variable.VT_String',

Modified: pypy/trunk/pypy/module/oracle/interp_connect.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_connect.py	(original)
+++ pypy/trunk/pypy/module/oracle/interp_connect.py	Thu Nov 26 16:32:44 2009
@@ -12,7 +12,7 @@
 from pypy.module.oracle.config import string_w, StringBuffer, MAX_STRING_CHARS
 from pypy.module.oracle.interp_environ import Environment
 from pypy.module.oracle.interp_cursor import W_Cursor
-from pypy.module.oracle.interp_pool import W_Pool
+from pypy.module.oracle.interp_pool import W_SessionPool
 from pypy.module.oracle.interp_variable import VT_String
 
 class W_Connection(Wrappable):
@@ -21,10 +21,13 @@
         self.environment = None
         self.autocommit = False
 
+        self.sessionHandle = None
+
         self.w_inputTypeHandler = None
         self.w_outputTypeHandler = None
 
         self.w_version = None
+        self.release = False
 
     def descr_new(space, w_subtype,
                   w_user=NoneNotWrapped,
@@ -44,10 +47,11 @@
 
         # set up the environment
         if w_pool:
-            pool = space.instance_w(W_Pool, w_pool)
+            pool = space.interp_w(W_SessionPool, w_pool)
             self.environment = pool.environment.clone()
         else:
-            self.environment = Environment(space, threaded, events)
+            pool = None
+            self.environment = Environment.create(space, threaded, events)
 
         self.w_username = w_user
         self.w_password = w_password
@@ -66,7 +70,10 @@
                 space.call_method(self.w_password, 'split',
                                   space.wrap('@'), space.wrap(1)))
 
-        self.connect(space, mode, twophase)
+        if pool or w_cclass:
+            self.getConnection(space, pool, w_cclass, purity)
+        else:
+            self.connect(space, mode, twophase)
         return space.wrap(self)
 
     descr_new.unwrap_spec = [ObjSpace, W_Root,
@@ -77,7 +84,28 @@
                              W_Root,
                              bool,
                              W_Root]
-                                       
+
+    def __del__(self):
+        if self.release:
+            roci.OCITransRollback(
+                self.handle, self.environment.errorHandle,
+                roci.OCI_DEFAULT)
+            roci.OCISessionRelease(
+                self.handle, self.environment.errorHandle,
+                None, 0, roci.OCI_DEFAULT)
+        else:
+            if self.sessionHandle:
+                roci.OCITransRollback(
+                    self.handle, self.environment.errorHandle,
+                    roci.OCI_DEFAULT)
+                roci.OCISessionEnd(
+                    self.handle, self.environment.errorHandle,
+                    self.sessionHandle, roci.OCI_DEFAULT)
+            if self.serverHandle:
+                roci.OCIServerDetach(
+                    self.serverHandle, self.environment.errorHandle,
+                    roci.OCI_DEFAULT)
+
     def connect(self, space, mode, twophase):
         stringBuffer = StringBuffer()
 
@@ -107,7 +135,7 @@
                 status, "Connection_Connect(): server attach")
         finally:
             stringBuffer.clear()
-        
+
         # allocate the service context handle
         handleptr = lltype.malloc(rffi.CArrayPtr(roci.OCISvcCtx).TO,
                                   1, flavor='raw')
@@ -131,7 +159,7 @@
             self.environment.errorHandle)
         self.environment.checkForError(
             status, "Connection_Connect(): set server handle")
-        
+
         # set the internal and external names; these are needed for global
         # transactions but are limited in terms of the lengths of the strings
 
@@ -207,6 +235,145 @@
             self.sessionHandle = lltype.nullptr(roci.OCISession.TO)
             raise
 
+    def getConnection(self, space, pool, w_cclass, purity):
+        """Get a connection using the OCISessionGet() interface
+        rather than using the low level interface for connecting."""
+
+        proxyCredentials = False
+        authInfo = None
+
+        if pool:
+            w_dbname = pool.w_name
+            mode = roci.OCI_SESSGET_SPOOL
+            if not pool.homogeneous and pool.w_username and self.w_username:
+                proxyCredentials = space.ne(pool.w_username, self.w_username)
+                mode |= roci.OCI_SESSGET_CREDPROXY
+        else:
+            w_dbname = self.w_tnsentry
+            mode = roci.OCI_SESSGET_STMTCACHE
+
+        stringBuffer = StringBuffer()
+
+        # set up authorization handle, if needed
+        if not pool or w_cclass or proxyCredentials:
+            # create authorization handle
+            status = roci.OCIHandleAlloc(
+                self.environment.handle,
+                handleptr,
+                roci.HTYPE_AUTHINFO,
+                0, None)
+            self.environment.checkForError(
+                status, "Connection_GetConnection(): allocate handle")
+
+            externalCredentials = True
+
+            # set the user name, if applicable
+            stringBuffer.fill(space, self.w_username)
+            try:
+                if stringBuffer.size > 0:
+                    externalCredentials = False
+                    status = roci.OCIAttrSet(
+                        authInfo,
+                        roci.OCI_HTYPE_AUTHINFO,
+                        stringBuffer.ptr, stringBuffer.size,
+                        roci.OCI_ATTR_PASSWORD,
+                        self.environment.errorHandle)
+                    self.environment.checkForError(
+                        status, "Connection_GetConnection(): set user name")
+            finally:
+                stringBuffer.clear()
+
+            # set the password, if applicable
+            stringBuffer.fill(space, self.w_password)
+            try:
+                if stringBuffer.size > 0:
+                    externalCredentials = False
+                    status = roci.OCIAttrSet(
+                        authInfo,
+                        roci.OCI_HTYPE_AUTHINFO,
+                        stringBuffer.ptr, stringBuffer.size,
+                        roci.OCI_ATTR_USERNAME,
+                        self.environment.errorHandle)
+                    self.environment.checkForError(
+                        status, "Connection_GetConnection(): set password")
+            finally:
+                stringBuffer.clear()
+
+            # if no user name or password are set, using external credentials
+            if not pool and externalCredentials:
+                mode |= roci.OCI_SESSGET_CREDEXT
+
+            # set the connection class, if applicable
+            stringBuffer.fill(space, w_cclass)
+            try:
+                if stringBuffer.size > 0:
+                    externalCredentials = False
+                    status = roci.OCIAttrSet(
+                        authInfo,
+                        roci.OCI_HTYPE_AUTHINFO,
+                        stringBuffer.ptr, stringBuffer.size,
+                        roci.OCI_ATTR_CONNECTION_CLASS,
+                        self.environment.errorHandle)
+                    self.environment.checkForError(
+                        status, "Connection_GetConnection(): set connection class")
+            finally:
+                stringBuffer.clear()
+
+            # set the purity, if applicable
+            purityptr = lltype.malloc(rffi.CArrayPtr(roci.ub4).TO,
+                                      1, flavor='raw')
+            try:
+                status = roci.OCIAttrSet(
+                    authInfo,
+                    roci.OCI_HTYPE_AUTHINFO,
+                    purityptr, rffi.sizeof(roci.ub4),
+                    roci.OCI_ATTR_PURITY,
+                    self.environment.errorHandle)
+                self.environment.checkForError(
+                    status, "Connection_GetConnection(): set purity")
+            finally:
+                lltype.free(purityptr, flavor='raw')
+
+        # acquire the new session
+        stringBuffer.fill(space, w_dbname)
+        foundptr = lltype.malloc(rffi.CArrayPtr(roci.boolean).TO,
+                                 1, flavor='raw')
+        handleptr = lltype.malloc(rffi.CArrayPtr(roci.OCISvcCtx).TO,
+                                  1, flavor='raw')
+        try:
+            status = roci.OCISessionGet(
+                self.environment.handle,
+                self.environment.errorHandle,
+                handleptr,
+                authInfo,
+                stringBuffer.ptr, stringBuffer.size,
+                None, 0,
+                lltype.nullptr(roci.Ptr(roci.oratext).TO),
+                lltype.nullptr(roci.Ptr(roci.ub4).TO),
+                foundptr,
+                mode)
+            self.environment.checkForError(
+                status, "Connection_GetConnection(): get connection")
+
+            self.handle = handleptr[0]
+        finally:
+            stringBuffer.clear()
+            lltype.free(foundptr, flavor='raw')
+
+        # eliminate the authorization handle immediately, if applicable
+        if authInfo:
+            roci.OCIHandleFree(authInfo, roci.OCI_HTYPE_AUTHINFO)
+
+        # copy members in the case where a pool is being used
+        if pool:
+            if not proxyCredentials:
+                self.w_username = pool.w_username
+                self.w_password = pool.w_password
+            self.w_tnsentry = pool.w_tnsentry
+            self.sessionPool = pool
+
+        self.release = True
+
     def _checkConnected(self, space):
         if not self.handle:
             raise OperationError(

Modified: pypy/trunk/pypy/module/oracle/interp_environ.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_environ.py	(original)
+++ pypy/trunk/pypy/module/oracle/interp_environ.py	Thu Nov 26 16:32:44 2009
@@ -4,9 +4,52 @@
 
 from pypy.module.oracle.interp_error import W_Error, get
 
-class Environment:
-    def __init__(self, space, threaded, events):
+class Environment(object):
+    def __init__(self, space, handle):
         self.space = space
+        self.handle = handle
+
+        # create the error handle
+        handleptr = lltype.malloc(rffi.CArrayPtr(roci.OCIError).TO,
+                                  1, flavor='raw')
+        try:
+            status = roci.OCIHandleAlloc(
+                self.handle,
+                handleptr, roci.OCI_HTYPE_ERROR, 0,
+                lltype.nullptr(rffi.CArray(roci.dvoidp)))
+            self.checkForError(
+                status, "Environment_New(): create error handle")
+            self.errorHandle = handleptr[0]
+        finally:
+            lltype.free(handleptr, flavor='raw')
+
+
+    def checkForError(self, status, context):
+        if status in (roci.OCI_SUCCESS, roci.OCI_SUCCESS_WITH_INFO):
+            return
+
+        if status != roci.OCI_INVALID_HANDLE:
+            # At this point it is assumed that the Oracle
+            # environment is fully initialized
+            error = W_Error(self.space, self, context, 1)
+            if error.code in (1, 1400, 2290, 2291, 2292):
+                w_type = get(self.space).w_IntegrityError
+            elif error.code in (1012, 1033, 1034, 1089, 3113, 3114,
+                                12203, 12500, 12571):
+                w_type = get(self.space).w_OperationalError
+            else:
+                w_type = get(self.space).w_DatabaseError
+            raise OperationError(w_type, self.space.wrap(error))
+
+        error = W_Error(self.space, self, context, 0)
+        error.code = 0
+        error.w_message = self.space.wrap("Invalid handle!")
+        raise OperationError(get(self.space).w_DatabaseError,
+                             self.space.wrap(error))
+
+    @classmethod
+    def create(cls, space, threaded, events):
+        "Create a new environment object from scratch"
         mode = roci.OCI_OBJECT
         if threaded:
             mode |= roci.OCI_THREADED
@@ -40,48 +83,25 @@
                     self.space.wrap(
                         "Unable to acquire Oracle environment handle"))
 
-            self.handle = handleptr[0]
+            handle = handleptr[0]
         finally:
             lltype.free(handleptr, flavor='raw')
 
-
-        self.maxBytesPerCharacter = config.BYTES_PER_CHAR
-        self.maxStringBytes = config.BYTES_PER_CHAR * config.MAX_STRING_CHARS
-
-        # create the error handle
-        handleptr = lltype.malloc(rffi.CArrayPtr(roci.OCIError).TO,
-                                  1, flavor='raw')
         try:
-            status = roci.OCIHandleAlloc(
-                self.handle,
-                handleptr, roci.OCI_HTYPE_ERROR, 0,
-                lltype.nullptr(rffi.CArray(roci.dvoidp)))
-            self.checkForError(
-                status, "Environment_New(): create error handle")
-            self.errorHandle = handleptr[0]
-        finally:
-            lltype.free(handleptr, flavor='raw')
+            newenv = cls(space, handle)
+        except:
+            roci.OCIHandleFree(handle, roci.OCI_HTYPE_ENV)
+            raise
+
+        newenv.maxBytesPerCharacter = config.BYTES_PER_CHAR
+        newenv.maxStringBytes = config.BYTES_PER_CHAR * config.MAX_STRING_CHARS
+        return newenv
+
+    def clone(self):
+        """Clone an existing environment.
+        used when acquiring a connection from a session pool, for example."""
+        newenv = type(self)(self.space, self.handle)
+        newenv.maxBytesPerCharacter = self.maxBytesPerCharacter
+        newenv.maxStringBytes = self.maxStringBytes
+        return newenv
 
-
-    def checkForError(self, status, context):
-        if status in (roci.OCI_SUCCESS, roci.OCI_SUCCESS_WITH_INFO):
-            return
-        
-        if status != roci.OCI_INVALID_HANDLE:
-            # At this point it is assumed that the Oracle
-            # environment is fully initialized
-            error = W_Error(self.space, self, context, 1)
-            if error.code in (1, 1400, 2290, 2291, 2292):
-                w_type = get(self.space).w_IntegrityError
-            elif error.code in (1012, 1033, 1034, 1089, 3113, 3114,
-                                12203, 12500, 12571):
-                w_type = get(self.space).w_OperationalError
-            else:
-                w_type = get(self.space).w_DatabaseError
-            raise OperationError(w_type, self.space.wrap(error))
-
-        error = W_Error(self.space, self, context, 0)
-        error.code = 0
-        error.w_message = self.space.wrap("Invalid handle!")
-        raise OperationError(get(self.space).w_DatabaseError,
-                             self.space.wrap(error))

Modified: pypy/trunk/pypy/module/oracle/interp_error.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_error.py	(original)
+++ pypy/trunk/pypy/module/oracle/interp_error.py	Thu Nov 26 16:32:44 2009
@@ -22,6 +22,7 @@
         self.w_InternalError = get('InternalError')
         self.w_DataError = get('DataError')
         self.w_Variable = get('Variable')
+        self.w_Connection = get('Connection')
 
         w_import = space.builtin.get('__import__')
         w_decimal = space.call(w_import, space.newlist(

Modified: pypy/trunk/pypy/module/oracle/interp_pool.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/interp_pool.py	(original)
+++ pypy/trunk/pypy/module/oracle/interp_pool.py	Thu Nov 26 16:32:44 2009
@@ -1,4 +1,5 @@
 from pypy.interpreter.baseobjspace import Wrappable
+from pypy.interpreter.argument import Arguments, Signature
 from pypy.interpreter.gateway import ObjSpace, W_Root, NoneNotWrapped
 from pypy.interpreter.gateway import interp2app
 from pypy.interpreter.typedef import TypeDef, GetSetProperty
@@ -7,8 +8,9 @@
 
 Null = NoneNotWrapped
 
-from pypy.module.oracle import roci, config, interp_error, interp_environ
-
+from pypy.module.oracle import roci, config
+from pypy.module.oracle import interp_error, interp_environ
+from pypy.module.oracle.interp_error import get
 
 class W_SessionPool(Wrappable):
     def __init__(self):
@@ -36,14 +38,16 @@
         self.w_password = w_password
         self.w_tnsentry = w_dsn
 
-        self.w_connectionType = w_connectiontype
+        from pypy.module.oracle.interp_connect import W_Connection
+        self.w_connectionType = w_connectiontype or get(space).w_Connection
         self.minSessions = min
         self.maxSessions = max
         self.sessionIncrement = increment
         self.homogeneous = homogeneous
 
         # set up the environment
-        self.environment = interp_environ.Environment(space, threaded, events)
+        self.environment = interp_environ.Environment.create(
+            space, threaded, events)
 
         # create the session pool handle
         handleptr = lltype.malloc(rffi.CArrayPtr(roci.OCIServer).TO,
@@ -110,6 +114,66 @@
                 get(space).w_InterfaceError,
                 space.wrap("not connected"))
 
+    def acquire(self, space, __args__):
+        (w_user, w_password, w_cclass, w_purity
+         ) = __args__.parse_obj(
+            None, "acquire",
+            Signature(["user", "password", "cclass", "purity"]),
+            defaults_w=[None, None, None, space.w_False])
+        if self.homogeneous and (w_user or w_password):
+            raise OperationError(
+                get(space).w_ProgrammingError,
+                space.wrap("pool is homogeneous. "
+                           "Proxy authentication is not possible."))
+
+        self.checkConnected(space)
+
+        newargs = Arguments(space,
+                            __args__.arguments_w,
+                            __args__.keywords + ["pool"],
+                            __args__.keywords_w + [space.wrap(self)])
+        return space.call_args(self.w_connectionType, newargs)
+    acquire.unwrap_spec = ['self', ObjSpace, Arguments]
+
+    def release(self, space, w_connection):
+        self._release(space, w_connection, roci.OCI_DEFAULT)
+    release.unwrap_spec = ['self', ObjSpace, W_Root]
+
+    def drop(self, space, w_connection):
+        self._release(space, w_connection, roci.OCI_SESSRLS_DROPSESS)
+    drop.unwrap_spec = ['self', ObjSpace, W_Root]
+
+    def _release(self, space, w_connection, mode):
+        from pypy.module.oracle.interp_connect import W_Connection
+        connection = space.interp_w(W_Connection, w_connection)
+
+        self.checkConnected(space)
+
+        if connection.sessionPool is not self:
+            raise OperationError(
+                get(space).w_ProgrammingError,
+                space.wrap("connection not acquired with this session pool"))
+
+        # attempt a rollback
+        status = roci.OCITransRollback(
+            connection.handle, connection.environment.errorHandle,
+            roci.OCI_DEFAULT)
+        # if dropping the connection from the pool, ignore the error
+        if mode != roci.OCI_SESSRLS_DROPSESS:
+            self.environment.checkForError(
+                status, "SessionPool_Release(): rollback")
+
+        # release the connection
+        status = roci.OCISessionRelease(
+            connection.handle, connection.environment.errorHandle,
+            None, 0, mode)
+        self.environment.checkForError(
+            status, "SessionPool_Release(): release session")
+
+        # ensure that the connection behaves as closed
+        connection.sessionPool = None
+        connection.handle = None
+
 def computedProperty(oci_attr_code, oci_value_type):
     def fget(space, self):
         self.checkConnected(space)
@@ -131,6 +195,10 @@
 W_SessionPool.typedef = TypeDef(
     "SessionPool",
     __new__ = interp2app(W_SessionPool.descr_new.im_func),
+    acquire = interp2app(W_SessionPool.acquire),
+    release = interp2app(W_SessionPool.release),
+    drop = interp2app(W_SessionPool.drop),
+
     username = interp_attrproperty_w('w_username', W_SessionPool),
     password = interp_attrproperty_w('w_password', W_SessionPool),
     tnsentry = interp_attrproperty_w('w_tnsentry', W_SessionPool),

Modified: pypy/trunk/pypy/module/oracle/roci.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/roci.py	(original)
+++ pypy/trunk/pypy/module/oracle/roci.py	Thu Nov 26 16:32:44 2009
@@ -89,6 +89,8 @@
     OCI_TYPECODE_NAMEDCOLLECTION OCI_TYPECODE_OBJECT
     OCI_NLS_MAXBUFSZ OCI_NLS_CS_ORA_TO_IANA
     OCI_SPC_STMTCACHE OCI_SPC_HOMOGENEOUS
+    OCI_SESSGET_SPOOL OCI_SESSGET_CREDPROXY OCI_SESSGET_STMTCACHE
+    OCI_SESSRLS_DROPSESS
     '''.split()
 
     for c in constants:
@@ -105,6 +107,7 @@
 OCIError = rffi.VOIDP
 OCIServer = rffi.VOIDP
 OCISession = rffi.VOIDP
+OCIAuthInfo = rffi.VOIDP
 OCISPool = rffi.VOIDP
 OCIStmt = rffi.VOIDP
 OCIParam = rffi.VOIDP
@@ -160,6 +163,11 @@
      ub4],               # mode
     sword)
 
+OCIServerDetach = external(
+    'OCIServerDetach',
+    [OCIServer, OCIError, ub4],
+    sword)
+
 OCISessionBegin = external(
     'OCISessionBegin',
     [OCISvcCtx, OCIError, OCISession, ub4, ub4],
@@ -173,6 +181,22 @@
      ub4],         # mode
     sword)
 
+OCISessionGet = external(
+    'OCISessionGet',
+    [OCIEnv,           # envhp
+     OCIError,         # errhp
+     Ptr(OCISvcCtx),   # svchp
+     OCIAuthInfo,      # authInfop,
+     oratext,          # dbName
+     ub4,              # dbName_len
+     oratext,          # tagInfo
+     ub4,              # tagInfo_len
+     Ptr(oratext),     # retTagInfo
+     Ptr(ub4),         # retTagInfo_len
+     Ptr(boolean),     # found
+     ub4],             # mode
+    sword)
+
 OCISessionPoolCreate = external(
     'OCISessionPoolCreate',
     [OCISvcCtx,    # svchp
@@ -192,6 +216,15 @@
      ub4],         # mode
     sword)
 
+OCISessionRelease = external(
+    'OCISessionRelease',
+    [OCISvcCtx,    # svchp
+     OCIError,     # errhp
+     oratext,      # tag
+     ub4,          # tag_len
+     ub4],         # mode
+    sword)
+
 # Handle and Descriptor Functions
 
 OCIAttrGet = external(

Modified: pypy/trunk/pypy/module/oracle/test/test_connect.py
==============================================================================
--- pypy/trunk/pypy/module/oracle/test/test_connect.py	(original)
+++ pypy/trunk/pypy/module/oracle/test/test_connect.py	Thu Nov 26 16:32:44 2009
@@ -117,7 +117,7 @@
 
 
 class AppTestPool(OracleNotConnectedTestBase):
-    def test_pool(self):
+    def test_pool_basicattributes(self):
         pool = oracle.SessionPool(self.username, self.password,
                                   self.tnsentry,
                                   2, 8, 3)
@@ -129,3 +129,20 @@
         assert pool.increment == 3
         assert pool.opened == 2
         assert pool.busy == 0
+
+    def test_pool_acquire(self):
+        pool = oracle.SessionPool(self.username, self.password,
+                                  self.tnsentry,
+                                  2, 8, 3)
+        assert (pool.busy, pool.opened) == (0, 2)
+        c1 = pool.acquire()
+        assert (pool.busy, pool.opened) == (1, 2)
+        c2 = pool.acquire()
+        assert (pool.busy, pool.opened) == (2, 2)
+        c3 = pool.acquire()
+        assert (pool.busy, pool.opened) == (3, 5)
+        pool.release(c3)
+        assert pool.busy == 2
+        del c2
+        import gc; gc.collect()
+        assert pool.busy == 1



More information about the Pypy-commit mailing list