Is it possible to use python to get True Full Duplex on a Serial port?

Grant Edwards invalid at invalid
Fri Aug 14 16:28:26 CEST 2009


On 2009-08-14, greg <greg at cosc.canterbury.ac.nz> wrote:
> Hendrik van Rooyen wrote:
>
>> port = open("/dev/ttyS0","r+b",0)
>> 
>> What I would really like is to have two threads - one that
>> does blocking input waiting for a character, and one that
>> examines an output queue and transmits the stuff it finds.
>
> You can't read and write with the same stdio file object at
> the same time. Odd things tend to happen if you try.
>
> You need to open *two* file objects, one for reading and one
> for writing:
>
>    fr = open("/dev/ttyS0","rb",0)
>    fw = open("/dev/ttyS0","wb",0)

Doh!  It didn't even occur to me that somebody would use python
"file" objects for serial ports, and I completely overlooked
the fact that the OP was doing that.

In short: don't do that -- it just messes things up.

> and give fr to the reading thread and fw to the writing
> thread.
>
> You could also try avoiding file objects altogether and use
> the raw system calls in the os module.

That's definitely the way you should do serial port I/O.

> Since you're not using any buffering, there's little reason to
> use the stdio layer. If you do that, you should be able to use
> the same file descriptor for reading and writing without any
> trouble.

Do not use Python file objects.  Use the underlying file
descriptors: os.open(), os.read(), os.write().  That will
almost certainly solve your problems.

If you want examples of os.xxxxx() usage, below is the
PosixSerial.py module that I use for Linux-only applications.
For cross-platform work, use pyserial (whose Posix support is
based on the code below).


---------------------------------PosixSerial.py------------------------------
# Posix serial port class
# Copyright 2001-2009 Grant B. Edwards  <grante at visi.com>

# You may use this code in any way you like so long as you
# leave this copyright notice.

# Though you are not required to, it would be nice if you send
# me a copy of bufixes or enhancements and allowed me to
# incorporate and distribute them.

import sys
import fcntl
import os
import struct
import termios
import string
import select

if string.split(sys.version)[0] > '2':
    TERMIOS = termios
else:
    import TERMIOS

# construct dictionaries for baud rate lookups

baudEnumToInt = {}
baudIntToEnum = {}
for rate in (0,50,75,110,134,150,200,300,600,1200,1800,2400,4800,9600,19200,38400,57600,115200,230400,460800,500000,576000,921600,1000000,1152000,1500000,2000000,2500000,3000000,3500000,4000000):
    try:
        i = eval('TERMIOS.B'+str(rate))
        baudEnumToInt[i]=rate
        baudIntToEnum[rate] = i
    except:
        pass

# Do not know if these are right for anything except Linux
if sys.platform[:5] == 'linux':
    TIOCMGET = 0x5415
    TIOCMBIS = 0x5416
    TIOCMBIC = 0x5417
    TIOCMSET = 0x5418
    
    TIOCM_LE   =  0x001
    TIOCM_DTR  =  0x002
    TIOCM_RTS  =  0x004
    TIOCM_ST   =  0x008
    TIOCM_SR   =  0x010
    TIOCM_CTS  =  0x020
    TIOCM_CAR  =  0x040
    TIOCM_RNG  =  0x080
    TIOCM_DSR  =  0x100
    TIOCM_CD   =  TIOCM_CAR
    TIOCM_RI   =  TIOCM_RNG
    TIOCM_OUT1 =  0x2000
    TIOCM_OUT2 =  0x4000
    
    TIOCM_zero_str = struct.pack('I',0)
    TIOCM_one_str = struct.pack('I',1)
    TIOCM_RTS_str = struct.pack('I',TIOCM_RTS)
    TIOCM_DTR_str = struct.pack('I',TIOCM_DTR)

portNotOpenError = ValueError('port not open')

class Port:
    """ An object wrapper for Posix serial ports """
    def __init__(self,path=None,noinit=False):
        self.fd = None
        if path:
            self.open(path,noinit)

    def __tcsetattr(self):
        termios.tcsetattr(self.fd,TERMIOS.TCSANOW,[self.iflag,self.oflag,self.cflag,self.lflag,self.ispeed,self.ospeed,self.cc])

    def __tcgetattr(self):
        self.iflag,self.oflag,self.cflag,self.lflag,self.ispeed,self.ospeed,self.cc = termios.tcgetattr(self.fd)

    def open(self,path,noinit):
        if self.fd:
            self.close()
        self.path = path
        self.fd = os.open(path,os.O_RDWR)
        self.__tcgetattr()
        if not noinit:
            self.iflag = 0
            self.oflag = 0
            self.lflag = 0
            self.__tcsetattr()

    def close(self):
        if self.fd:
            os.close(self.fd)
            self.fd = None
            
    def fileno(self):
        return self.fd;

    def _write(self,data):
        if not self.fd: raise portNotOpenError
        return os.write(self.fd,data)

    def write(self,data):
        if not self.fd: raise portNotOpenError
        t = len(data)
        d = data
        while t>0:
            n = os.write(self.fd,d)
            d = d[n:]
            t = t - n

    def read(self,size=1024,timeout=None):
        if not self.fd: raise portNotOpenError
        if timeout is None:
            return os.read(self.fd,size)
        else:
            r,w,e = select.select([self.fd],[],[self.fd],timeout)
            if r:
                return os.read(self.fd,size)
            else:
                raise "timeout"
                
            

    def baud(self,rate=None):
        if not self.fd: raise portNotOpenError

        if not (rate is None):
            try:
                b = baudIntToEnum[rate]
            except:
                raise ValueError,'invalid baud rate: '+str(rate)
            self.ispeed = b
            self.ospeed = b
            self.__tcsetattr()
        return baudEnumToInt[self.ispeed]

    def charLen(self,clen=None):
        if not self.fd: raise portNotOpenError
        if not clen is None:
            self.cflag = self.cflag & ~TERMIOS.CSIZE
            if clen == 8:
                self.cflag = self.cflag | TERMIOS.CS8
            elif clen == 7:
                self.cflag = self.cflag | TERMIOS.CS7
            elif clen == 6:
                self.cflag = self.cflag | TERMIOS.CS6
            elif clen == 5:
                self.cflag = self.cflag | TERMIOS.CS5
            else:
                raise ValueError,'invalid char len: '+str(clen)
            self.__tcsetattr()
        cs = self.cflag & TERMIOS.CSIZE
        if   cs == TERMIOS.CS8: 
            return 8
        elif cs == TERMIOS.CS7: 
            return 7
        elif cs == TERMIOS.CS6: 
            return 6
        elif cs == TERMIOS.CS5: 
            return 5
        else: 
            raise ValueError,'char len invalid'
            
    def parity(self,par=None):
        if not self.fd: raise portNotOpenError

        if not par is None:
            self.cflag = self.cflag & ~(TERMIOS.PARENB|TERMIOS.PARODD)
            if par == 'none':
                pass
            elif par == 'even':
                self.cflag = self.cflag | (TERMIOS.PARENB)
            elif par == 'odd':
                self.cflag = self.cflag | (TERMIOS.PARENB|TERMIOS.PARODD)
            else:
                raise ValueError,'invalid parity: '+str(par)
            self.__tcsetattr()
        if (self.cflag & TERMIOS.PARENB) == 0:
            return 'none'
        elif self.cflag & TERMIOS.PARODD:
            return 'odd'
        else:
            return 'even'

    def xonxoff(self,enable=None):
        if not self.fd: raise portNotOpenError
        if not enable is None:
            if enable:
                self.iflag = self.iflag | (TERMIOS.IXON|TERMIOS.IXOFF)
            else:
                self.iflag = self.iflag & ~(TERMIOS.IXON|TERMIOS.IXOFF)
            self.__tcsetattr()
        return (self.iflag & (TERMIOS.IXON|TERMIOS.IXOFF)) == (TERMIOS.IXON|TERMIOS.IXOFF)

    def rtscts(self,enable=None):
        if not self.fd: raise portNotOpenError
        if not enable is None:
            if enable:
                self.cflag = self.cflag | TERMIOS.CRTSCTS
            else:
                self.cflag = self.cflag & ~TERMIOS.CRTSCTS
            self.__tcsetattr()
        return (self.cflag & TERMIOS.CRTSCTS) != 0
        
    def cook(self,enable=None):
        if not self.fd: raise portNotOpenError
        if not enable is None:
            self.iflag = 0
            self.oflag = 0
            if enable:
                self.lflag = self.lflag | TERMIOS.ICANON
            else:
                self.lflag = self.lflag & ~TERMIOS.ICANON
            self.__tcsetattr()
        return (self.lflag & TERMIOS.ICANON) != 0

    def echo(self,enable=None):
        if not self.fd:
            raise portNotOpenError
        if not enable is None:
            self.iflag = 0
            self.oflag = 0
            if enable:
                self.lflag = self.lflag | TERMIOS.ECHO
            else:
                self.lflag = self.lflag & ~TERMIOS.ECHO
            self.__tcsetattr()
        return (self.lflag & TERMIOS.ECHO) != 0

    def __handleiflag(self,enable,mask):
        if not self.fd:
            raise portNotOpenError
        if not enable is None:
            if enable:
                self.iflag |= mask
            else:
                self.iflag &= ~mask
            self.__tcsetattr()
        return (self.iflag & mask) != 0

    def ignbrk(self,enable=None):
        return self.__handleiflag(enable,TERMIOS.IGNBRK)
    
    def parmrk(self,enable=None):
        return self.__handleiflag(enable,TERMIOS.PARMRK)
    
    def ignpar(self,enable=None):
        return self.__handleiflag(enable,TERMIOS.IGNPAR)
    
    def brkint(self,enable=None):
        return self.__handleiflag(enable,TERMIOS.BRKINT)

    def flushInput(self):
        if not self.fd:
            raise portNotOpenError
        termios.tcflush(self.fd,TERMIOS.TCIFLUSH)

    def flushOutput(self):
        if not self.fd:
            raise portNotOpenError
        termios.tcflush(self.fd,TERMIOS.TCOFLUSH)

    def flushAll(self):
        self.flushInput()
        self.flushOutput()
        
    def sendBreak(self,howlong=0):
        if not self.fd:
            raise portNotOpenError
        termios.tcsendbreak(self.fd,howlong)

    def drainOutput(self):
        if not self.fd: raise portNotOpenError
        termios.tcdrain(self.fd)

    def vmin(self,vm=None):
        if not self.fd: raise portNotOpenError
        if not vm is None:
            if vm<0 or vm>255:
                raise ValueError,'invalid vmin: '+str(vm)
            self.cc[TERMIOS.VMIN] = vm
            self.__tcsetattr()
        return self.cc[TERMIOS.VMIN]

    def vtime(self,vt=None):
        if not self.fd: raise portNotOpenError
        if not vt is None:
            if vt<0 or vt>255:
                raise ValueError,'invalid vtime: '+str(vt)
            self.cc[TERMIOS.VTIME] = vt
            self.__tcsetattr()
        return self.cc[TERMIOS.VTIME]

    def nonblocking(self):
        if not self.fd:
            raise portNotOpenError
        fcntl.fcntl(self.fd,fcntl.F_SETFL,os.O_NONBLOCK)

    def txCount(self):
        s = fcntl.ioctl(self.fd,termios.TIOCOUTQ,TIOCM_zero_str)
        return struct.unpack('I',s)[0]

    def rxCount(self):
        s = fcntl.ioctl(self.fd,termios.TIOCINQ,TIOCM_zero_str)
        return struct.unpack('I',s)[0]
    
    def resetRM(self,value):
        if value:
            v = TIOCM_one_str
        else:
            v = TIOCM_zero_str
        fcntl.ioctl(self.fd,0x525005,v)
    
    if sys.platform[:5] == 'linux':

        def DSR(self):
            if not self.fd: raise portNotOpenError
            s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
            return bool(struct.unpack('I',s)[0] & TIOCM_DSR)

        def CD(self):
            if not self.fd: raise portNotOpenError
            s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
            return bool(struct.unpack('I',s)[0] & TIOCM_CD)
            
        def RI(self):
            if not self.fd: raise portNotOpenError
            s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
            return bool(struct.unpack('I',s)[0] & TIOCM_RI)
    
    
        def CTS(self):
            if not self.fd: raise portNotOpenError
            s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
            return bool(struct.unpack('I',s)[0] & TIOCM_CTS)
    
    
        def DTR(self,on=None):
            if not self.fd: raise portNotOpenError
            if not on is None:
                if on:
                    fcntl.ioctl(self.fd,TIOCMBIS,TIOCM_DTR_str)
                else:
                    fcntl.ioctl(self.fd,TIOCMBIC,TIOCM_DTR_str)
            s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
            return bool(struct.unpack('I',s)[0] & TIOCM_DTR)

        
        def RTS(self,on=None):
            if not self.fd: raise portNotOpenError
            if not on is None:
                if on:
                    fcntl.ioctl(self.fd,TIOCMBIS,TIOCM_RTS_str)
                else:
                    fcntl.ioctl(self.fd,TIOCMBIC,TIOCM_RTS_str)
            s = fcntl.ioctl(self.fd,TIOCMGET,TIOCM_zero_str)
            return bool(struct.unpack('I',s)[0] & TIOCM_RTS)
    
-----------------------------------------------------------------------------


-- 
Grant Edwards                   grante             Yow! Maybe I should have
                                  at               asked for my Neutron Bomb
                               visi.com            in PAISLEY --



More information about the Python-list mailing list