Attribute support and multi-threading for Pyro

John Wiegley jwiegley at inprise.com
Fri May 19 15:10:08 EDT 2000


The following diffs will add remote attribute getting/setting support
to Pyro, as well as threading (pass "threaded = 1" to
Pyro.core.Daemon).

These were sent to the author, but he must be very busy, and I would
like further comments by anyone else who is using Pyro.

These diffs (which can be merged with your sources using 'patch') are
against the 1.1 package, which was announced recently.

----------------------------------------------------------------------
--- Pyro/core.py	Mon May  8 13:11:21 2000
+++ Pyro/core.py	Fri May 19 11:55:43 2000
@@ -52,6 +52,23 @@
 			args=args[:-1]+args[-1]
 		return apply(getattr(self.delegate,method),args,keywords)
 
+class ObjBaseWithAttrs(ObjBase):
+	def remote_hasattr(self, attr):
+		attr = getattr(self, attr)
+		if attr is not None:
+			from types import MethodType, BuiltinMethodType
+			if type(attr) in (MethodType, BuiltinMethodType):
+				return 'method'
+			else:
+				return 'attr'
+		return 0
+
+	def remote_getattr(self, attr):
+		return getattr(self, attr)
+
+	def remote_setattr(self, attr, value):
+		return setattr(self, attr, value)
+
 
 #############################################################################
 #
@@ -147,10 +164,13 @@
 	def __init__(self, URI):
 		self.URI = URI
 		self.objectID = URI.objectID
-		self.adapter = Pyro.protocol.getProtocolAdapter(self.URI.protocol)
-		self.adapter.bindToURI(URI)	
+		self.adapter  = None
 	def __getattr__(self, name):
+		# allows one of these to be safely pickled
+		if name != '__getinitargs__':
 		self._name=name; return self.__invokePYRO__
+		else:
+			raise AttributeError()
 	def __repr__(self):
 		return '<Pyro.core.DynamicProxy instance at '+str(id(self))+'>'
 	def __str__(self):
@@ -159,10 +179,67 @@
 	# Note that a slightly faster way of calling is this:
 	#  instead of proxy.method(args...) use proxy('method',args...)
 	def __call__(self,method,*vargs, **kargs):
-		return self.adapter.remoteInvocation(method,RIF_Varargs|RIF_Keywords,vargs,kargs)
+		self._name = method
+		return self.__invokePYRO__
 	def __invokePYRO__(self, *vargs, **kargs):
-		return self.adapter.remoteInvocation(self._name,RIF_Varargs|RIF_Keywords,vargs,kargs)
+		if self.adapter is None:
+			self.adapter = Pyro.protocol.getProtocolAdapter(self.URI.protocol)
+			self.adapter.bindToURI(self.URI)
+		return self.adapter.remoteInvocation(self._name,
+											 RIF_Varargs|RIF_Keywords,
+											 vargs,kargs)
+	def __getstate__(self):
+		temp = {}
+		for key in self.__dict__.keys():
+			if key != "adapter":
+				temp[key] = self.__dict__[key]
+		return temp
+
+	def __setstate__(self, value):
+		for key in value.keys():
+			self.__dict__[key] = value[key]
+		self.__dict__['adapter'] = None
 	
+class DynamicProxyWithAttrs(DynamicProxy):
+	def __init__(self, URI):
+		self.attr_cache = {}
+		DynamicProxy.__init__(self, URI)
+
+	def remote_getattr(self, attr, value = 0):
+		if value: meth = 'remote_getattr'
+		else: meth = 'remote_hasattr'
+		self._name = meth
+		return self.__invokePYRO__(attr)
+
+	def findattr(self, attr):
+		if self.attr_cache.has_key(attr):
+			return self.attr_cache[attr]
+
+		# Go look it up, and cache the value
+		self.attr_cache[attr] = self.remote_getattr(attr)
+		return self.attr_cache[attr]
+
+	def __setattr__(self, attr, value):
+		result = self.findattr(attr)
+		if result == 'attr':
+			self._name = 'remote_setattr'
+			return self.__invokePYRO__(attr, value)
+		else:
+			raise AttributeError()
+
+	def __getattr__(self, attr):
+		# allows one of these to be safely pickled
+		if attr != '__getinitargs__':
+			result = self.findattr(attr)
+			if result == 'method':
+				self._name = name
+				return self.__invokePYRO__
+			elif result is not None:
+				return self.remote_getattr(attr, 1)
+			else:
+				raise AttributeError()
+		else:
+			raise AttributeError()
 
 #############################################################################
 #
@@ -173,7 +250,7 @@
 #############################################################################
 
 class Daemon(Pyro.protocol.TCPServer):
-	def __init__(self,protocol='PYRO',port=0):
+	def __init__(self,protocol='PYRO',port=0,threaded=0):
 		self.hostname = Pyro.protocol.getHostname()	
 		if port:
 			self.port = port
@@ -185,10 +262,12 @@
 		self.adapter = Pyro.protocol.getProtocolAdapter(protocol)
 		self.adapter.setDaemon(self)
 		try:
-			Pyro.protocol.TCPServer.__init__(self, DaemonSlave(), self.port)
-		except socket.error:
-			Log.error('Daemon','Couldn\'t start Pyro daemon- already running?')
-			raise DaemonError('Couldn\'t start Pyro daemon- perhaps it\'s running already?')
+			Pyro.protocol.TCPServer.__init__(self, DaemonSlave(), self.port,
+							 threaded)
+		except socket.error, msg:
+			text = 'Couldn\'t start Pyro daemon: ' + str(msg)
+			Log.error('Daemon', text)
+			raise DaemonError(text)
 	
 	def __del__(self):
 		# server shutting down, unregister all known objects in the NS
@@ -219,10 +298,13 @@
 		self.implementations[object.GUID()]=(object,name)
 		# register the object with the NS
 		if self.NameServer:
-			self.NameServer.register(name,PyroURI(self.hostname,object.GUID(),
-				protocol=self.protocol,port=self.port))
+			URI = PyroURI(self.hostname,object.GUID(),
+						  protocol=self.protocol,port=self.port)
+			self.NameServer.register(name, URI)
+			return URI
 		else:
 			Log.warn('Daemon','connecting object without naming service specified:',name)
+			return None
 
 	def disconnect(self,object):
 		try:
@@ -303,4 +385,3 @@
 	_initGeneric_post()
 	if banner:
 		print 'Pyro Server Initialized. Using Pyro V'+Pyro.PYRO_VERSION
-
--- Pyro/protocol.py	Sun May  7 13:40:35 2000
+++ Pyro/protocol.py	Fri May 19 11:59:05 2000
@@ -7,8 +7,9 @@
 #
 #############################################################################
 
-import select, socket, struct
+import select, socket, struct, time
 import Pyro
+from threading import Thread
 from Pyro.util import pickle, Log
 from Pyro.errors import *
 
@@ -176,7 +177,7 @@
 
 #-------- TCPServer base class
 class TCPServer:
-	def __init__(self, requestServer, port):
+	def __init__(self, requestServer, port, threaded):
 		self.slave = requestServer
 		self.slave.daemon=self
 		self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -184,6 +185,8 @@
 		self.sock.listen(5)
 		self.connections = []
 		self.setParamsForLoop(5)
+		self.threaded = threaded
+		self.handling = []
 	def __del__(self):
 		if len(self.connections)>0:
 			Log.warn('TCPServer','Shutting down but there are still',len(self.connections),'active connections')
@@ -202,30 +205,51 @@
 			self.adapter.sendAccept(csock)
 			self.connections.append(conn)
 			Log.msg('TCPServer','new connection from',addr,'#conns=',len(self.connections))
+			return conn
 		else:
 			# we have too many open connections. Disconnect this one.
 			Log.msg('TCPServer','Too many open connections, closing',addr,'#conns=',len(self.connections))
 			self.adapter.sendDeny(csock)
+			return None
+
+	def handleRequest(self, c):
+		try:
+			time.sleep(.001)
+			self.slave.handleRequest(c)
+		except:
+			self.handleError(c)
+		if self.threaded and c in self.handling:
+			self.handling.remove(c)
 
 	def handleRequests(self, timeout=None, others=[], callback=None):
 		activecnt=1	
 		while activecnt:
-			socklist = self.connections+[self.sock]+others
+			connections = []
+			for conn in self.connections:
+				if conn not in self.handling:
+					connections.append(conn)
+			socklist = connections+[self.sock]+others
+			time.sleep(.001)
 			if timeout==None:
 				ins,outs,exs = select.select(socklist,[],[])
 			else:
 				ins,outs,exs = select.select(socklist,[],[],timeout)
 			activecnt=len(ins)
+			time.sleep(.001)
 			if self.sock in ins:
-				self.newConnection(self.sock)
+				conn = self.newConnection(self.sock)
 				ins.remove(self.sock)
+				if conn: ins.append(conn)
+			time.sleep(.001)
 			for c in ins:
 				if isinstance(c,TCPConnection):
-					try:
-						self.slave.handleRequest(c)
-					except:
-						self.handleError(c)
+					if self.threaded:
+						self.handling.append(c)
+						Thread(target=self.handleRequest, args=(c,)).start()
+					else:
+						self.handleRequest(c)
 					ins.remove(c)
+				time.sleep(.001)
 			if ins and callback:
 				# the 'others' must have fired...
 				callback(ins)
@@ -243,5 +267,8 @@
 	def removeConnection(self, conn):
 		if conn in self.connections:
 			self.connections.remove(conn)
-			Log.msg('TCPServer','removed connection with',conn.addr,' #conns=',len(self.connections))
+			if self.threaded and conn in self.handling:
+				self.handling.remove(conn)
+			Log.msg('TCPServer','removed connection with',conn.addr,
+					' #conns=',len(self.connections))



More information about the Python-list mailing list