Is it possible to use python to get True Full Duplex on a Serial port?
Grant Edwards
invalid at invalid
Fri Aug 14 10:28:26 EDT 2009
On 2009-08-14, greg <greg at cosc.canterbury.ac.nz> wrote:
> Hendrik van Rooyen wrote:
>
>> port = open("/dev/ttyS0","r+b",0)
>>
>> What I would really like is to have two threads - one that
>> does blocking input waiting for a character, and one that
>> examines an output queue and transmits the stuff it finds.
>
> You can't read and write with the same stdio file object at
> the same time. Odd things tend to happen if you try.
>
> You need to open *two* file objects, one for reading and one
> for writing:
>
> fr = open("/dev/ttyS0","rb",0)
> fw = open("/dev/ttyS0","wb",0)
Doh! It didn't even occur to me that somebody would use python
"file" objects for serial ports, and I completely overlooked
the fact that the OP was doing that.
In short: don't do that -- it just messes things up.
> and give fr to the reading thread and fw to the writing
> thread.
>
> You could also try avoiding file objects altogether and use
> the raw system calls in the os module.
That's definitely the way you should do serial port I/O.
> Since you're not using any buffering, there's little reason to
> use the stdio layer. If you do that, you should be able to use
> the same file descriptor for reading and writing without any
> trouble.
Do not use Python file objects. Use the underlying file
descriptors: os.open(), os.read(), os.write(). That will
almost certainly solve your problems.
If you want examples of os.xxxxx() usage, below is the
PosixSerial.py module that I use for Linux-only applications.
For cross-platform work, use pyserial (whose Posix support is
based on the code below).
---------------------------------PosixSerial.py------------------------------
# Posix serial port class
# Copyright 2001-2009 Grant B. Edwards <grante at visi.com>
# You may use this code in any way you like so long as you
# leave this copyright notice.
# Though you are not required to, it would be nice if you send
# me a copy of bufixes or enhancements and allowed me to
# incorporate and distribute them.
import sys
import fcntl
import os
import struct
import termios
import string
import select
if string.split(sys.version)[0] > '2':
TERMIOS = termios
else:
import TERMIOS
# construct dictionaries for baud rate lookups
baudEnumToInt = {}
baudIntToEnum = {}
for rate in (0,50,75,110,134,150,200,300,600,1200,1800,2400,4800,9600,19200,38400,57600,115200,230400,460800,500000,576000,921600,1000000,1152000,1500000,2000000,2500000,3000000,3500000,4000000):
try:
i = eval('TERMIOS.B'+str(rate))
baudEnumToInt[i]=rate
baudIntToEnum[rate] = i
except:
pass
# Do not know if these are right for anything except Linux
if sys.platform[:5] == 'linux':
TIOCMGET = 0x5415
TIOCMBIS = 0x5416
TIOCMBIC = 0x5417
TIOCMSET = 0x5418
TIOCM_LE = 0x001
TIOCM_DTR = 0x002
TIOCM_RTS = 0x004
TIOCM_ST = 0x008
TIOCM_SR = 0x010
TIOCM_CTS = 0x020
TIOCM_CAR = 0x040
TIOCM_RNG = 0x080
TIOCM_DSR = 0x100
TIOCM_CD = TIOCM_CAR
TIOCM_RI = TIOCM_RNG
TIOCM_OUT1 = 0x2000
TIOCM_OUT2 = 0x4000
TIOCM_zero_str = struct.pack('I',0)
TIOCM_one_str = struct.pack('I',1)
TIOCM_RTS_str = struct.pack('I',TIOCM_RTS)
TIOCM_DTR_str = struct.pack('I',TIOCM_DTR)
portNotOpenError = ValueError('port not open')
class Port:
""" An object wrapper for Posix serial ports """
def __init__(self,path=None,noinit=False):
self.fd = None
if path:
self.open(path,noinit)
def __tcsetattr(self):
termios.tcsetattr(self.fd,TERMIOS.TCSANOW,[self.iflag,self.oflag,self.cflag,self.lflag,self.ispeed,self.ospeed,self.cc])
def __tcgetattr(self):
self.iflag,self.oflag,self.cflag,self.lflag,self.ispeed,self.ospeed,self.cc = termios.tcgetattr(self.fd)
def open(self,path,noinit):
if self.fd:
self.close()
self.path = path
self.fd = os.open(path,os.O_RDWR)
self.__tcgetattr()
if not noinit:
self.iflag = 0
self.oflag = 0
self.lflag = 0
self.__tcsetattr()
def close(self):
if self.fd:
os.close(self.fd)
self.fd = None
def fileno(self):
return self.fd;
def _write(self,data):
if not self.fd: raise portNotOpenError
return os.write(self.fd,data)
def write(self,data):
if not self.fd: raise portNotOpenError
t = len(data)
d = data
while t>0:
n = os.write(self.fd,d)
d = d[n:]
t = t - n
def read(self,size=1024,timeout=None):
if not self.fd: raise portNotOpenError
if timeout is None:
return os.read(self.fd,size)
else:
r,w,e = select.select([self.fd],[],[self.fd],timeout)
if r:
return os.read(self.fd,size)
else:
raise "timeout"
def baud(self,rate=None):
if not self.fd: raise portNotOpenError
if not (rate is None):
try:
b = baudIntToEnum[rate]
except:
raise ValueError,'invalid baud rate: '+str(rate)
self.ispeed = b
self.ospeed = b
self.__tcsetattr()
return baudEnumToInt[self.ispeed]
def charLen(self,clen=None):
if not self.fd: raise portNotOpenError
if not clen is None:
self.cflag = self.cflag & ~TERMIOS.CSIZE
if clen == 8:
self.cflag = self.cflag | TERMIOS.CS8
elif clen == 7:
self.cflag = self.cflag | TERMIOS.CS7
elif clen == 6:
self.cflag = self.cflag | TERMIOS.CS6
elif clen == 5:
self.cflag = self.cflag | TERMIOS.CS5
else:
raise ValueError,'invalid char len: '+str(clen)
self.__tcsetattr()
cs = self.cflag & TERMIOS.CSIZE
if cs == TERMIOS.CS8:
return 8
elif cs == TERMIOS.CS7:
return 7
elif cs == TERMIOS.CS6:
return 6
elif cs == TERMIOS.CS5:
return 5
else:
raise ValueError,'char len invalid'
def parity(self,par=None):
if not self.fd: raise portNotOpenError
if not par is None:
self.cflag = self.cflag & ~(TERMIOS.PARENB|TERMIOS.PARODD)
if par == 'none':
pass
elif par == 'even':
self.cflag = self.cflag | (TERMIOS.PARENB)
elif par == 'odd':
self.cflag = self.cflag | (TERMIOS.PARENB|TERMIOS.PARODD)
else:
raise ValueError,'invalid parity: '+str(par)
self.__tcsetattr()
if (self.cflag & TERMIOS.PARENB) == 0:
return 'none'
elif self.cflag & TERMIOS.PARODD:
return 'odd'
else:
return 'even'
def xonxoff(self,enable=None):
if not self.fd: raise portNotOpenError
if not enable is None:
if enable:
self.iflag = self.iflag | (TERMIOS.IXON|TERMIOS.IXOFF)
else:
self.iflag = self.iflag & ~(TERMIOS.IXON|TERMIOS.IXOFF)
self.__tcsetattr()
return (self.iflag & (TERMIOS.IXON|TERMIOS.IXOFF)) == (TERMIOS.IXON|TERMIOS.IXOFF)
def rtscts(self,enable=None):
if not self.fd: raise portNotOpenError
if not enable is None:
if enable:
self.cflag = self.cflag | TERMIOS.CRTSCTS
else:
self.cflag = self.cflag & ~TERMIOS.CRTSCTS
self.__tcsetattr()
return (self.cflag & TERMIOS.CRTSCTS) != 0
def cook(self,enable=None):
if not self.fd: raise portNotOpenError
if not enable is None:
self.iflag = 0
self.oflag = 0
if enable:
self.lflag = self.lflag | TERMIOS.ICANON
else:
self.lflag = self.lflag & ~TERMIOS.ICANON
self.__tcsetattr()
return (self.lflag & TERMIOS.ICANON) != 0
def echo(self,enable=None):
if not self.fd:
raise portNotOpenError
if not enable is None:
self.iflag = 0
self.oflag = 0
if enable:
self.lflag = self.lflag | TERMIOS.ECHO
else:
self.lflag = self.lflag & ~TERMIOS.ECHO
self.__tcsetattr()
return (self.lflag & TERMIOS.ECHO) != 0
def __handleiflag(self,enable,mask):
if not self.fd:
raise portNotOpenError
if not enable is None:
if enable:
self.iflag |= mask
else:
self.iflag &= ~mask
self.__tcsetattr()
return (self.iflag & mask) != 0
def ignbrk(self,enable=None):
return self.__handleiflag(enable,TERMIOS.IGNBRK)
def parmrk(self,enable=None):
return self.__handleiflag(enable,TERMIOS.PARMRK)
def ignpar(self,enable=None):
return self.__handleiflag(enable,TERMIOS.IGNPAR)
def brkint(self,enable=None):
return self.__handleiflag(enable,TERMIOS.BRKINT)
def flushInput(self):
if not self.fd:
raise portNotOpenError
termios.tcflush(self.fd,TERMIOS.TCIFLUSH)
def flushOutput(self):
if not self.fd:
raise portNotOpenError
termios.tcflush(self.fd,TERMIOS.TCOFLUSH)
def flushAll(self):
self.flushInput()
self.flushOutput()
def sendBreak(self,howlong=0):
if not self.fd:
raise portNotOpenError
termios.tcsendbreak(self.fd,howlong)
def drainOutput(self):
if not self.fd: raise portNotOpenError
termios.tcdrain(self.fd)
def vmin(self,vm=None):
if not self.fd: raise portNotOpenError
if not vm is None:
if vm<0 or vm>255:
raise ValueError,'invalid vmin: '+str(vm)
self.cc[TERMIOS.VMIN] = vm
self.__tcsetattr()
return self.cc[TERMIOS.VMIN]
def vtime(self,vt=None):
if not self.fd: raise portNotOpenError
if not vt is None:
if vt<0 or vt>255:
raise ValueError,'invalid vtime: '+str(vt)
self.cc[TERMIOS.VTIME] = vt
self.__tcsetattr()
return self.cc[TERMIOS.VTIME]
def nonblocking(self):
if not self.fd:
raise portNotOpenError
fcntl.fcntl(self.fd,fcntl.F_SETFL,os.O_NONBLOCK)
def txCount(self):
s = fcntl.ioctl(self.fd,termios.TIOCOUTQ,TIOCM_zero_str)
return struct.unpack('I',s)[0]
def rxCount(self):
s = fcntl.ioctl(self.fd,termios.TIOCINQ,TIOCM_zero_str)
return struct.unpack('I',s)[0]
def resetRM(self,value):
if value:
v = TIOCM_one_str
else:
v = TIOCM_zero_str
fcntl.ioctl(self.fd,0x525005,v)
if sys.platform[:5] == 'linux':
def DSR(self):
if not self.fd: raise portNotOpenError
s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
return bool(struct.unpack('I',s)[0] & TIOCM_DSR)
def CD(self):
if not self.fd: raise portNotOpenError
s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
return bool(struct.unpack('I',s)[0] & TIOCM_CD)
def RI(self):
if not self.fd: raise portNotOpenError
s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
return bool(struct.unpack('I',s)[0] & TIOCM_RI)
def CTS(self):
if not self.fd: raise portNotOpenError
s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
return bool(struct.unpack('I',s)[0] & TIOCM_CTS)
def DTR(self,on=None):
if not self.fd: raise portNotOpenError
if not on is None:
if on:
fcntl.ioctl(self.fd,TIOCMBIS,TIOCM_DTR_str)
else:
fcntl.ioctl(self.fd,TIOCMBIC,TIOCM_DTR_str)
s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
return bool(struct.unpack('I',s)[0] & TIOCM_DTR)
def RTS(self,on=None):
if not self.fd: raise portNotOpenError
if not on is None:
if on:
fcntl.ioctl(self.fd,TIOCMBIS,TIOCM_RTS_str)
else:
fcntl.ioctl(self.fd,TIOCMBIC,TIOCM_RTS_str)
s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
return bool(struct.unpack('I',s)[0] & TIOCM_RTS)
-----------------------------------------------------------------------------
--
Grant Edwards grante Yow! Maybe I should have
at asked for my Neutron Bomb
visi.com in PAISLEY --
More information about the Python-list
mailing list