pty.py: any example code out there?

Alex Coventry alex_c at mit.edu
Tue Jul 18 07:46:52 EDT 2000


Hi, David.

Here is something I wrote a little while back.  It works on Linux, at
least.  It's kind of ugly -- it's one of those things where I thought
it would be easy, but I kept having to deal with problems I hadn't
anticipated.  Hope the pty_Popen class helps as an example, though.
That part's fairly simple.

Alex.

#!/usr/bin/python 

# No warranty.  I don't have any money, anyway, so sue someone else,
# please.  Do as you wish with this code.

'''Sync up Apache and Unix passwords.'''

password_file = '/etc/htpasswd'

import os, pty, time, signal, getpass, pdb, sys, string, re

st = pdb.set_trace

class pty_Popen:

    def __init__ (self, command, args, delay=0.1):
        self.delay = delay
        # Clone this process in a separate thread.
        self.pid, self.child = pty.fork ()
        # In the child process, pid will contain 0.  In the parent,
        # it will contain the pid of the child.
        if self.pid == 0: # In the child process, so replace the python
                     # process with the requested command.
            os.execv (command, [''] + args)
        else: # In the parent process, which stays live
            pass

    def read (self, max_read):
        time.sleep (self.delay)
        return os.read (self.child, max_read)

    def write (self, text):
        time.sleep (self.delay)
        return os.write (self.child, text)


class UnexpectedProcessOutputError (Exception):
    pass


def get_effective_user ():
    effective_user = os.popen ('/usr/bin/whoami').readline ()
    assert effective_user[-1] == '\n'
    return effective_user[:-1]


class Passwd:

    def __init__ (self, username):
        self.username = username
        self.effective_user = get_effective_user ()
        self.log = []
        self.unexpected_process_output_p = None

    def execute (self):
        self.get_passwd_process () # Just for initial authentication.
        self.get_password ()
        self.change_password ()

    def get_password (self):
        while 1:
            self.new_password = getpass.getpass ('New password: ')
            if self.validate_password (self.new_password):
                break

    def validate_password (self, password):
        if password == self.current_password:
            print 'That\'s your old password!'
            return 0
        if re.search ("['\\\\]", password):
            print 'Please don\'t use single quotes or backslashes\n' \
                  'in your password.'
            return 0
        check_result = self.check_password_against_machine (password)
        if check_result != '':
            print 'This paranoid machine thinks that password is\n'\
                  'too easy to guess.  Please try something else.\n' \
                  'Its complaint was:'
            check_result = string.split (check_result, '\n')
            if len (check_result) >= 2 and check_result[-1] == \
               'New UNIX password: ':
                print string.strip (check_result[0])
            else:
                print string.join (check_result, '\n')
            return 0
        re_entry = getpass.getpass ('Please retype it, so I know ' \
                                    'there\'s no typos: ')
        if re_entry != password:
            print 'Mismatch.  Lucky I checked, huh?'
            return 0
        else:
            return 1

    def check_password_against_machine (self, password):
        # Go through the passwd exchange up to the point where the new
        # password is requested.
        self.get_passwd_process ()
        self.write (self.new_password)
        response = self.get_response ()
        # It it's accepted, it will ask for it to be retyped.
        if response == 'Retype new UNIX password:':
            return ''
        else:
            return response

    def change_password (self):
        self.get_passwd_process ()
        self.write (self.new_password)
        self.get_response () == 'Retype new UNIX password:'
        self.write (self.new_password)
        self.check_process_output (('passwd: all authentication '\
                                    'tokens updated successfully'),
                                   self.get_response ())

    def get_current_password (self):
        if not hasattr (self, 'current_password'):
            self.current_password = getpass.getpass ('Current password: ')

    def wrong_password (self):
        print 'Incorrect password entered.'
        sys.exit (1)

    def get_passwd_process (self):
        self.get_current_password ()
        self.passwd_process = pty_Popen ('/bin/su', [self.username, '-c',
                                                     '/usr/bin/passwd'])
        response = self.get_response ()
        if self.effective_user != 'root': # Negotiate the password challenge.
            self.check_process_output ('Password:', response)
            self.write (self.current_password)
            response = self.get_response()
            if response == ': incorrect password':
                self.wrong_password ()
        # Next response is sometimes not read in completely the first time.
        lines = string.split (response, '\n')
        if len (lines) == 1:
            lines.append (self.get_response ())
        lines = map (string.strip, lines)
        response = string.join (lines) 
        expected_response = 'Changing password for %s ' % self.username + \
                            '(current) UNIX password:'
        self.check_process_output (expected_response,
                                   response[-len (expected_response):])
        self.write (self.current_password)
        response = self.get_response ()
        if response != 'New UNIX password:':
            if self.check_process_output ('passwd: Authentication failure',
                                          response):
                self.wrong_password () # Die.

    def get_response (self):
        while 1:
            response = string.strip (self.passwd_process.read (1024))
            self.log.append (('o', response))
            if response:
                break
        return response

    def write (self, text):
        self.log.append (('i', text))
        assert self.passwd_process.write (text + '\n') == len (text) + 1

    def check_process_output (self, expected, actual):
        if expected != actual:
            self.log.append (('expected', expected))
            self.unexpected_process_output_p = 1
            print 'Unexpected output from passwd process encountered:'
            print response
            print 'Continuing in the hope that the response does not'
            print 'indicate a fatal error.'
            return 0
        else:
            return 1


def get_executing_user ():
    id_info = os.popen ('/usr/bin/id').read()[:-1]
    id_info = string.split (id_info)[1]
    id_re = re.match (r'gid=\d+\(([^)]*)\)', id_info)
    assert id_re
    user = id_re.group (1)
    return user

def main (t):
    executing_user = get_executing_user ()
    no_keyboard_interrupts = 1
    try:
        try:
            t.execute ()
        except KeyboardInterrupt:
            no_keyboard_interrupts = None
    except EOFError:
        no_keyboard_interrupts = None
    if not no_keyboard_interrupts:
        print
        print 'Stopped.'
        sys.exit (2)

    # If the database has not yet been created, use the '-c' flag
    # to do so.
    if os.path.exists (password_file):
        flags = '-b'
    else:
        flags = '-bc'

    # Quote the password, since that came from the user.
    # Just in case they try anything funny.
    args = [flags, password_file, executing_user, "'%s'" % t.new_password]
    command = string.join (['/usr/local/bin/htpasswd'] + args)

    # Save a backup, just in case
    current_password_file_contents = open (password_file).readlines ()    
    home_dir = os.path.expanduser ('~' + get_effective_user ())
    backup_filename = os.path.split (password_file)[1] + '.bak'
    backup_filename = os.path.join (home_dir, backup_filename)
    open (backup_filename, 'w').writelines (current_password_file_contents)
    assert os.system (command) == 0

    # Check that nothing got stomped on in password_file.
    new_password_file_contents = open (password_file).readlines ()
    for line in current_password_file_contents:
        if line not in new_password_file_contents and \
           string.split (line, ':')[0] != executing_user:
            # alert_alex ('Contents of password file have been ' \
                          'inadvertantly destroyed.')
            # print 'Back up stored in', backup_filename


def test ():
    global password_file
    password_file = '/home/alex_c/com/python/htpasswd'
    executing_user = 'alex_c'
    t = Passwd (executing_user)
    t.current_password, t.new_password  = 'whynot.', 'flurgle.'
    st ()
    t.check_password_against_machine (t.new_password)
    t.change_password ()
    raise 'stopped'

    
if __name__ == '__main__':
    executing_user = get_executing_user ()
    t = Passwd (executing_user)
    main (t)
    if t.unexpected_process_output_p:
        error_log_file = open ('/home/ht_keeper/error_log', 'a')
        # Want to record a log of the exchange, but don't want to 
        # record any passwords.
        log = []
        for line in t.log:
            if line[0] != 'i': # Don't get user input.
                log.append (line[1])
        error_log_file.write ('%s: %s\n' % (executing_user, log))
        # alert_alex ("There was an error in passwd.ht.")
        
    # Log the change.
    import time
    log_filename = '/home/ht_keeper/log'
    log_file = open (log_filename, 'a')
    log_file.write ('%s %s\n' % (executing_user, time.time ()))
    log_file.close ()
    
    



More information about the Python-list mailing list