pty.py: any example code out there?
Alex Coventry
alex_c at mit.edu
Tue Jul 18 07:46:52 EDT 2000
Hi, David.
Here is something I wrote a little while back. It works on Linux, at
least. It's kind of ugly -- it's one of those things where I thought
it would be easy, but I kept having to deal with problems I hadn't
anticipated. Hope the pty_Popen class helps as an example, though.
That part's fairly simple.
Alex.
#!/usr/bin/python
# No warranty. I don't have any money, anyway, so sue someone else,
# please. Do as you wish with this code.
'''Sync up Apache and Unix passwords.'''
password_file = '/etc/htpasswd'
import os, pty, time, signal, getpass, pdb, sys, string, re
st = pdb.set_trace
class pty_Popen:
def __init__ (self, command, args, delay=0.1):
self.delay = delay
# Clone this process in a separate thread.
self.pid, self.child = pty.fork ()
# In the child process, pid will contain 0. In the parent,
# it will contain the pid of the child.
if self.pid == 0: # In the child process, so replace the python
# process with the requested command.
os.execv (command, [''] + args)
else: # In the parent process, which stays live
pass
def read (self, max_read):
time.sleep (self.delay)
return os.read (self.child, max_read)
def write (self, text):
time.sleep (self.delay)
return os.write (self.child, text)
class UnexpectedProcessOutputError (Exception):
pass
def get_effective_user ():
effective_user = os.popen ('/usr/bin/whoami').readline ()
assert effective_user[-1] == '\n'
return effective_user[:-1]
class Passwd:
def __init__ (self, username):
self.username = username
self.effective_user = get_effective_user ()
self.log = []
self.unexpected_process_output_p = None
def execute (self):
self.get_passwd_process () # Just for initial authentication.
self.get_password ()
self.change_password ()
def get_password (self):
while 1:
self.new_password = getpass.getpass ('New password: ')
if self.validate_password (self.new_password):
break
def validate_password (self, password):
if password == self.current_password:
print 'That\'s your old password!'
return 0
if re.search ("['\\\\]", password):
print 'Please don\'t use single quotes or backslashes\n' \
'in your password.'
return 0
check_result = self.check_password_against_machine (password)
if check_result != '':
print 'This paranoid machine thinks that password is\n'\
'too easy to guess. Please try something else.\n' \
'Its complaint was:'
check_result = string.split (check_result, '\n')
if len (check_result) >= 2 and check_result[-1] == \
'New UNIX password: ':
print string.strip (check_result[0])
else:
print string.join (check_result, '\n')
return 0
re_entry = getpass.getpass ('Please retype it, so I know ' \
'there\'s no typos: ')
if re_entry != password:
print 'Mismatch. Lucky I checked, huh?'
return 0
else:
return 1
def check_password_against_machine (self, password):
# Go through the passwd exchange up to the point where the new
# password is requested.
self.get_passwd_process ()
self.write (self.new_password)
response = self.get_response ()
# It it's accepted, it will ask for it to be retyped.
if response == 'Retype new UNIX password:':
return ''
else:
return response
def change_password (self):
self.get_passwd_process ()
self.write (self.new_password)
self.get_response () == 'Retype new UNIX password:'
self.write (self.new_password)
self.check_process_output (('passwd: all authentication '\
'tokens updated successfully'),
self.get_response ())
def get_current_password (self):
if not hasattr (self, 'current_password'):
self.current_password = getpass.getpass ('Current password: ')
def wrong_password (self):
print 'Incorrect password entered.'
sys.exit (1)
def get_passwd_process (self):
self.get_current_password ()
self.passwd_process = pty_Popen ('/bin/su', [self.username, '-c',
'/usr/bin/passwd'])
response = self.get_response ()
if self.effective_user != 'root': # Negotiate the password challenge.
self.check_process_output ('Password:', response)
self.write (self.current_password)
response = self.get_response()
if response == ': incorrect password':
self.wrong_password ()
# Next response is sometimes not read in completely the first time.
lines = string.split (response, '\n')
if len (lines) == 1:
lines.append (self.get_response ())
lines = map (string.strip, lines)
response = string.join (lines)
expected_response = 'Changing password for %s ' % self.username + \
'(current) UNIX password:'
self.check_process_output (expected_response,
response[-len (expected_response):])
self.write (self.current_password)
response = self.get_response ()
if response != 'New UNIX password:':
if self.check_process_output ('passwd: Authentication failure',
response):
self.wrong_password () # Die.
def get_response (self):
while 1:
response = string.strip (self.passwd_process.read (1024))
self.log.append (('o', response))
if response:
break
return response
def write (self, text):
self.log.append (('i', text))
assert self.passwd_process.write (text + '\n') == len (text) + 1
def check_process_output (self, expected, actual):
if expected != actual:
self.log.append (('expected', expected))
self.unexpected_process_output_p = 1
print 'Unexpected output from passwd process encountered:'
print response
print 'Continuing in the hope that the response does not'
print 'indicate a fatal error.'
return 0
else:
return 1
def get_executing_user ():
id_info = os.popen ('/usr/bin/id').read()[:-1]
id_info = string.split (id_info)[1]
id_re = re.match (r'gid=\d+\(([^)]*)\)', id_info)
assert id_re
user = id_re.group (1)
return user
def main (t):
executing_user = get_executing_user ()
no_keyboard_interrupts = 1
try:
try:
t.execute ()
except KeyboardInterrupt:
no_keyboard_interrupts = None
except EOFError:
no_keyboard_interrupts = None
if not no_keyboard_interrupts:
print
print 'Stopped.'
sys.exit (2)
# If the database has not yet been created, use the '-c' flag
# to do so.
if os.path.exists (password_file):
flags = '-b'
else:
flags = '-bc'
# Quote the password, since that came from the user.
# Just in case they try anything funny.
args = [flags, password_file, executing_user, "'%s'" % t.new_password]
command = string.join (['/usr/local/bin/htpasswd'] + args)
# Save a backup, just in case
current_password_file_contents = open (password_file).readlines ()
home_dir = os.path.expanduser ('~' + get_effective_user ())
backup_filename = os.path.split (password_file)[1] + '.bak'
backup_filename = os.path.join (home_dir, backup_filename)
open (backup_filename, 'w').writelines (current_password_file_contents)
assert os.system (command) == 0
# Check that nothing got stomped on in password_file.
new_password_file_contents = open (password_file).readlines ()
for line in current_password_file_contents:
if line not in new_password_file_contents and \
string.split (line, ':')[0] != executing_user:
# alert_alex ('Contents of password file have been ' \
'inadvertantly destroyed.')
# print 'Back up stored in', backup_filename
def test ():
global password_file
password_file = '/home/alex_c/com/python/htpasswd'
executing_user = 'alex_c'
t = Passwd (executing_user)
t.current_password, t.new_password = 'whynot.', 'flurgle.'
st ()
t.check_password_against_machine (t.new_password)
t.change_password ()
raise 'stopped'
if __name__ == '__main__':
executing_user = get_executing_user ()
t = Passwd (executing_user)
main (t)
if t.unexpected_process_output_p:
error_log_file = open ('/home/ht_keeper/error_log', 'a')
# Want to record a log of the exchange, but don't want to
# record any passwords.
log = []
for line in t.log:
if line[0] != 'i': # Don't get user input.
log.append (line[1])
error_log_file.write ('%s: %s\n' % (executing_user, log))
# alert_alex ("There was an error in passwd.ht.")
# Log the change.
import time
log_filename = '/home/ht_keeper/log'
log_file = open (log_filename, 'a')
log_file.write ('%s %s\n' % (executing_user, time.time ()))
log_file.close ()
More information about the Python-list
mailing list