Pipe IO Problem?
Chris S.
chrisks at NOSPAM.udel.edu
Mon Sep 6 06:37:20 EDT 2004
A wrote a small class to handle IO through pipes, but the connection
only seems to work in one direction. The following code defines
connection.py, engine.py, and controller.py. Once connected, the engine
is only able to send and the controller recieve. Also, this only works
when using popen2.popen3. Using os.popen3 doesn't work at all and seems
to behave completely differently.
What could be causing these problems? Any help is greatly appreciated.
##-------------------------------------------
# connection.py
import sys
import threading
import socket
import time
class ConnectionError(Exception):
'''Encapsulates connection errors.'''
pass
class Connection:
'''Base class for an interprocess connection.
This class is meant to be used in two primary ways:
First, as a caller, the initiator of the connection.
Second, as a callee, the recepiant of the connection.
However, once the connection has been made, general use should
behave symmetrically.'''
def __init__(self, in_handler=None):
self.connected = False
self._recv_handler = in_handler
self._recv_buffer = [] # stores data if no in_handler given
self._recv_lock = threading.Lock()
self._send_lock = threading.Lock()
def _send_raw(self, string):
'''Outgoing handler.'''
raise Exception, 'this method must be overridden'
def _recv_raw(self):
'''Incoming handler.'''
raise Exception, 'this method must be overridden'
def _launch_recv_handler(self):
'''Launches general reception handler.'''
assert not self.connected, 'connection already open'
print 'launching recv handler'
self.connected = True
t = threading.Thread(target=self._recv_raw)
t.setDaemon(True)
t.start()
def _launch_data_handler(self, data):
'''Launches user defined reception handler.'''
if self._recv_handler: # launch custom handler (ie in_handler)
if present
t = threading.Thread(target=self._recv_handler, args=(data,))
t.setDaemon(True)
t.start()
else: # otherwise append to buffer
self._recv_lock.acquire()
self._recv_buffer.append(data)
self._recv_lock.release()
def open(self):
raise Exception, 'this method must be overridden'
def close(self, message=None):
raise Exception, 'this method must be overridden'
def send(self, data):
assert self.connected, 'not connected'
self._send_raw(data)
def recv(self):
'''Blocks until it has data to return.
Only use this if you haven't specified a reception handler.'''
assert self.connected, 'not connected'
# wait for data
while not len(self._recv_buffer):
time.sleep(0.1)
# get data
self._recv_lock.acquire()
data = self._recv_buffer.pop(0)
self._recv_lock.release()
return data
def pending(self):
'''True if pending data in the buffer. False otherwise.'''
return bool(len(self._recv_buffer))
def name(self):
pass
class Pipe(Connection):
'''Base class for a pipe connection.'''
def __init__(self, *args, **kargs):
Connection.__init__(self, *args, **kargs)
def _send_raw(self, string):
assert self.connected, 'not connected'
try:
self._send_lock.acquire()
self._outp.write(string+'\n')
self._outp.flush()
self._send_lock.release()
except Exception, e:
self._send_lock.release()
self.close(e)
raise Exception, e
def _recv_raw(self):
while self.connected:
# get data
try:
data = self._inp.readline()
except Exception, e:
self.close(e)
break
if not len(data):
time.sleep(0.1)
continue
# launch handler
self._launch_data_handler(data)
def open(self, target=None):
assert not self.connected, 'connection already open'
if target:
#import os # different functionality?
#(self._inp, self._outp, self._errp) = os.popen3(target)
import popen2
(self._inp, self._outp, self._errp) = popen2.popen3(target)
else:
from sys import stdin, stdout
self._inp = stdin
self._outp = stdout
self._launch_recv_handler()
def close(self, message=None):
assert self.connected, 'connection already closed'
self.connected = False
self._inp.close()
self._inp = None
self._outp.close()
self._outp = None
self._errp.close()
self._errp = None
if message:
print message
##-------------------------------------------
# engine.py
import time
from connection import *
outfile = open('enginetest.data', 'w')
def print_received(data):
print >>outfile, data
conn = Pipe(in_handler=print_received)
conn.open()
for i in range(5):
time.sleep(1)
conn.send('engine '+str(i))
outfile.close()
##-------------------------------------------
# controller.py
import time
from connection import *
def print_received(data):
print 'controller received:',data
conn = Pipe(in_handler=print_received)
conn.open('pipetest_engine.py')
for i in range(5):
print 'controller',i
time.sleep(1)
conn.send(str(i))
More information about the Python-list
mailing list