[Tutor] A complete, functioning small web app: code for review
Gabriel Farrell
gsf at panix.com
Sat Feb 10 00:24:24 CET 2007
Greetings,
I've written the attached simple web app. It serves as a more
pleasant user interface to a file located on our journal proxy
server. It relies on the Paramiko library for sftp and Mako for
templating. Basic file functions are as follows:
* ule: The user starts here. A page is produced based on the
template, which includes the deciphered output of any GET
messages received.
* ule_post: The view/controller for any posted actions. Acts on POST
and redirects the user to "ule".
* ule_model.py: The model -- all the real data handling goes on here.
* sftp_wrapper.py: A wrapper for the Paramiko library to simplify
sftp calls.
* ule_template.html: The Mako template for the main page of the app.
* redirect.html: Another Mako template for the redirect.
It functions without any major bugs that I'm aware of. I took
advantage of the manageable scale of this project to really focus on
writing clean code. I hope it will help others looking to do
anything similar. I would appreciate any comments on its style
and organization.
Gabe
-------------- next part --------------
#!/usr/bin/python
'''
ule -- page display for the EZProxy User List Editor -- reads GET
form to determine variables for display. Processing is sent via
POST to ulepost.
'''
# uncomment cgitb for testing
#import cgitb; cgitb.enable()
import cgi
import sys
sys.path.insert(0, 'lib/python')
from mako.template import Template
from ule_model import UserList
# this form should be GET
form = cgi.FieldStorage()
ul = UserList()
def main():
try:
userpass_list = ul.getList()
if form.getvalue('submit') == 'please wait':
userpass_list = ul.check_against_innopac()
finally:
ul.closeSFTP()
tmpl = Template(filename='ule_template.html')
out_stream = tmpl.render(
form = form,
userpass_list = userpass_list,
)
print 'Content-Type: text/html\n'
print out_stream
if __name__ == '__main__':
main()
-------------- next part --------------
#!/usr/bin/python
'''
ule_post -- for handling POST from ule and redirecting back to ule with
proper GET.
'''
# uncomment cgitb for testing
#import cgitb; cgitb.enable()
import cgi
import re
from urllib import urlencode
import sys
sys.path.insert(0, 'lib/python')
from mako.template import Template
from ule_model import UserList, UleError
form = cgi.FieldStorage()
# initialize GET list -- will be appended to ule url
GET_params = []
ul = UserList()
def add():
username = ''
password = ''
username_valid = None
password_valid = None
not_allowed = re.compile(r'[^%s]' % ul.allowed)
if 'username' not in form:
GET_params.append(('username_status', 'empty'))
else:
username = form['username'].value
if not_allowed.search(username):
GET_params.append(('username_status', 'illegal'))
else:
for userpass in ul.getList():
if username == userpass[0]:
GET_params.append(('username_status', 'in_list'))
break
else:
username_valid = True
if 'password' not in form:
GET_params.append(('password_status', 'empty'))
else:
password = form['password'].value
if not_allowed.search(password):
GET_params.append(('password_status', 'illegal'))
else:
password_valid = True
if username_valid and password_valid:
ul.add(username, password)
GET_params.append(('username_status', 'added'))
GET_params.append(('added', username))
else:
if username:
GET_params.append(('username', username))
if password:
GET_params.append(('password', password))
def remove():
if 'userlist' not in form:
GET_params.append(('none_removed', True))
else:
user_list = form.getlist('userlist')
removed_list = ul.remove(user_list)
for username in removed_list:
GET_params.append(('removed', username))
def undo():
try:
bak_days, bak_seconds = ul.undo()
GET_params.append(('bak_days', bak_days))
GET_params.append(('bak_seconds', bak_seconds))
except UleError, e:
if str(e) == 'no backups to revert to':
GET_params.append(('no_backups', True))
def redirect():
url_add = urlencode(GET_params)
url = 'ule?' + url_add
print '''Status: 303 See Other
Location: %s
Pragma: no-cache
Content-Type: text/html
''' % url
redirect_template = Template(filename='redirect.html')
print redirect_template.render(
url = url,
)
def main():
try:
if form.getvalue('submit') == 'add':
add()
if form.getvalue('submit') == 'remove':
remove()
if form.getvalue('submit') == 'undo':
undo()
redirect()
# sync after everything else so render doesn't wait for it
ul.sync()
finally:
ul.closeSFTP()
if __name__ == '__main__':
main()
-------------- next part --------------
'''
ule_model -- for maintaining the integrity of ule's data.
'''
import re
from datetime import datetime
import os
from time import strptime
from sftp_wrapper import Session
MAIN_SERVER = 'ezproxy.library.xxxxxx.edu'
USERNAME = 'xxxxxx'
PASSWORD = 'xxxxxxx'
LDAP_USR_PATH = '/usr/local/ezproxy/ldap.usr'
DUMPURL = 'http://innopac.library.xxxxxx.edu:4500/PATRONAPI/%s/dump'
SECONDARY_SERVER = 'ezproxy2.library.xxxxxx.edu'
class UleError(Exception):
pass
class UserList(object):
def __init__(self):
# characters allowed in usernames and passwords
self.allowed = r'\w-'
# userpass is the regex to parse ldap.usr for usernames and
# passwords
self.userpass_re = re.compile(r'''
^ # start of line
([%s]*) # username
: # colon
([%s]*) # password
$ # end of line
''' % (self.allowed, self.allowed), re.VERBOSE | re.MULTILINE)
self.openSFTP()
def openSFTP(self):
self.ss = Session(MAIN_SERVER, USERNAME, PASSWORD)
def closeSFTP(self):
self.ss.close()
def getList(self):
ldap_usr_file = self.ss.open(LDAP_USR_PATH)
self.ldap_usr_file_str = ldap_usr_file.read()
ldap_usr_file.close()
return self.userpass_re.findall(self.ldap_usr_file_str)
def _getAdministrativa(self):
admin_section = self.userpass_re.split(self.ldap_usr_file_str,
maxsplit = 1)[0]
return admin_section
def _backup(self):
now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
backup_file_name = LDAP_USR_PATH + '.' + now + '.bak'
backup_file = self.ss.open(backup_file_name, 'w')
backup_file.write(self.ldap_usr_file_str)
backup_file.close()
def _write(self, userpasses):
self._backup()
ldap_usr_file = self.ss.open(LDAP_USR_PATH, 'w')
ldap_usr_file.write(self._getAdministrativa())
userpass_lines = [username + ':' + password for (username, password)
in userpasses]
ldap_usr_file.write('\n'.join(userpass_lines))
ldap_usr_file.close()
def add(self, username, password):
userpasses = self.getList()
userpasses.append((username, password))
# Decorate-Sort-Undecorate
dsu_list = [(x[0].lower(), x[0], x[1]) for x in userpasses]
dsu_list.sort()
userpasses = [(x[1], x[2]) for x in dsu_list]
self._write(userpasses)
def remove(self, user_list):
userpasses = self.getList()
removed_list = []
for removal in user_list:
removal_tuple = tuple(removal.split(':'))
username = removal_tuple[0]
if removal_tuple in userpasses:
userpasses.remove(removal_tuple)
removed_list.append(username)
self._write(userpasses)
return removed_list
def getBackups(self):
ldap_dir = os.path.dirname(LDAP_USR_PATH)
file_list = self.ss.listdir(ldap_dir)
backup_list = [x for x in file_list if x.startswith('ldap.usr.')
and x.endswith('.bak')]
backup_list.sort()
return (backup_list, ldap_dir)
def undo(self):
backup_list, ldap_dir = self.getBackups()
try:
most_recent = os.path.join(ldap_dir, backup_list.pop())
except IndexError:
raise UleError('no backups to revert to')
bak_file = self.ss.open(most_recent)
bak_file_str = bak_file.read()
bak_file.close()
ldap_usr_file = self.ss.open(LDAP_USR_PATH, 'w')
ldap_usr_file.write(bak_file_str)
self.ss.remove(most_recent)
bak_time = most_recent.split('.')[2]
now = datetime.now()
bak_time_obj = datetime(*strptime(bak_time, "%Y-%m-%dT%H:%M:%S")[0:6])
bak_time_delta = now - bak_time_obj
return (bak_time_delta.days, bak_time_delta.seconds)
def check_against_innopac(self):
def innocheck(username):
'check username against innopac dump'
import urllib2
dumpurl = DUMPURL % username
dump = urllib2.urlopen(dumpurl)
for line in dump:
if 'REC INFO' in line:
return True
new_userlist = []
for userpass in self.getList():
if not innocheck(userpass[0]):
# the flag is len(userpass) == 3
userpass = userpass + (1,)
new_userlist.append(userpass)
return new_userlist
def _cleanup(self, max=10):
'delete backups in excess of max'
backup_list, ldap_dir = self.getBackups()
backup_list.reverse()
def cleanup_rec():
if len(backup_list) > max:
oldest = os.path.join(ldap_dir, backup_list.pop())
self.ss.remove(oldest)
cleanup_rec()
cleanup_rec()
def sync(self):
'sync MAIN_SERVER and SECONDARY_SERVER'
self._cleanup()
ss2 = Session(SECONDARY_SERVER, USERNAME, PASSWORD)
def remoteCopy(path):
file1 = self.ss.open(path)
file1_str = file1.read()
file1.close()
file2 = ss2.open(path, 'w')
file2.write(file1_str)
file2.close()
# copy ldap.usr from main to secondary
remoteCopy(LDAP_USR_PATH)
# remove old backups and copy new
backup_list, ldap_dir = self.getBackups()
file_list2 = ss2.listdir(ldap_dir)
backup_list2 = [x for x in file_list2 if x.startswith('ldap.usr.')
and x.endswith('.bak')]
for backup in backup_list2:
backup_path = os.path.join(ldap_dir, backup)
ss2.remove(backup_path)
for backup in backup_list:
backup_path = os.path.join(ldap_dir, backup)
remoteCopy(backup_path)
ss2.close()
-------------- next part --------------
'''
sftp_wrapper is a wrapper class for the sftp portion of the
paramiko library, located in lib/python/
'''
import sys
sys.path.insert(0, 'lib/python')
import paramiko
class Session(object):
def __init__(self, hostname, username, password):
self.make_sftp(hostname, username, password)
# uncomment to save log -- for testing
# hmm, doesn't seem to be working -- gsf 20061220
#paramiko.util.log_to_file('sftp.log')
def make_sftp(self, hostname, username, password):
self.tunnel = paramiko.Transport((hostname, 22))
hostkeys = paramiko.util.load_host_keys('known_hosts')
hostkey = hostkeys[hostname]['ssh-rsa']
self.tunnel.connect(username=username, password=password,
hostkey=hostkey)
self.sftp = paramiko.SFTPClient.from_transport(self.tunnel)
def open(self, filename, mode='r'):
return self.sftp.open(filename, mode=mode)
def chdir(self, dir):
self.sftp.chdir(dir)
def get(self, remote, local=None):
if not local:
local = remote
self.sftp.get(remote, local)
def put(self, local, remote=None):
if not remote:
remote = local
self.sftp.put(local, remote)
def listdir(self, remote):
return self.sftp.listdir(remote)
def remove(self, remote):
self.sftp.remove(remote)
def close(self):
self.tunnel.close()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://mail.python.org/pipermail/tutor/attachments/20070209/c6b7af05/attachment.html
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://mail.python.org/pipermail/tutor/attachments/20070209/c6b7af05/attachment-0001.html
More information about the Tutor
mailing list