[Tutor] A complete, functioning small web app: code for review

Gabriel Farrell gsf at panix.com
Sat Feb 10 00:24:24 CET 2007


Greetings,

I've written the attached simple web app.  It serves as a more
pleasant user interface to a file located on our journal proxy
server.  It relies on the Paramiko library for sftp and Mako for
templating.  Basic file functions are as follows:
* ule: The user starts here.  A page is produced based on the
  template, which includes the deciphered output of any GET 
  messages received.
* ule_post: The view/controller for any posted actions.  Acts on POST
  and redirects the user to "ule".
* ule_model.py: The model -- all the real data handling goes on here.
* sftp_wrapper.py: A wrapper for the Paramiko library to simplify 
  sftp calls.
* ule_template.html: The Mako template for the main page of the app.
* redirect.html: Another Mako template for the redirect.

It functions without any major bugs that I'm aware of.  I took
advantage of the manageable scale of this project to really focus on
writing clean code.  I hope it will help others looking to do
anything similar.  I would appreciate any comments on its style
and organization.

Gabe

-------------- next part --------------
#!/usr/bin/python
'''
ule -- page display for the EZProxy User List Editor -- reads GET 
form to determine variables for display.  Processing is sent via
POST to ulepost.
'''

# uncomment cgitb for testing
#import cgitb; cgitb.enable()

import cgi
import sys
sys.path.insert(0, 'lib/python')
from mako.template import Template

from ule_model import UserList

# this form should be GET
form = cgi.FieldStorage()

ul = UserList()

def main():
    try:
        userpass_list = ul.getList()

        if form.getvalue('submit') == 'please wait':
            userpass_list = ul.check_against_innopac()
    finally:
        ul.closeSFTP()

    tmpl = Template(filename='ule_template.html')
    out_stream = tmpl.render(
        form = form,
        userpass_list = userpass_list,
    )

    print 'Content-Type: text/html\n'
    print out_stream

if __name__ == '__main__':
    main()
-------------- next part --------------
#!/usr/bin/python
'''
ule_post -- for handling POST from ule and redirecting back to ule with
proper GET.
'''

# uncomment cgitb for testing
#import cgitb; cgitb.enable()
import cgi
import re
from urllib import urlencode
import sys
sys.path.insert(0, 'lib/python')
from mako.template import Template

from ule_model import UserList, UleError

form = cgi.FieldStorage()
# initialize GET list -- will be appended to ule url
GET_params = []

ul = UserList()

def add():
    username = ''
    password = ''
    username_valid = None
    password_valid = None
    not_allowed = re.compile(r'[^%s]' % ul.allowed)

    if 'username' not in form:
        GET_params.append(('username_status', 'empty'))
    else:
        username = form['username'].value
        if not_allowed.search(username):
            GET_params.append(('username_status', 'illegal'))
        else:
            for userpass in ul.getList():
                if username == userpass[0]:
                    GET_params.append(('username_status', 'in_list'))
                    break
                else:
                    username_valid = True
    
    if 'password' not in form:
        GET_params.append(('password_status', 'empty'))
    else:
        password = form['password'].value
        if not_allowed.search(password):
            GET_params.append(('password_status', 'illegal'))
        else:
            password_valid = True
            
    if username_valid and password_valid:
        ul.add(username, password)
        GET_params.append(('username_status', 'added'))
        GET_params.append(('added', username))
    else:
        if username:
            GET_params.append(('username', username))
        if password:
            GET_params.append(('password', password))

def remove():
    if 'userlist' not in form:
        GET_params.append(('none_removed', True))
    else:
        user_list = form.getlist('userlist')
        removed_list = ul.remove(user_list)
        for username in removed_list:
            GET_params.append(('removed', username))

def undo():
    try:
        bak_days, bak_seconds = ul.undo()
        GET_params.append(('bak_days', bak_days))
        GET_params.append(('bak_seconds', bak_seconds))
    except UleError, e:
        if str(e) == 'no backups to revert to':
            GET_params.append(('no_backups', True))

def redirect():
    url_add = urlencode(GET_params)
    url = 'ule?' + url_add
    print '''Status: 303 See Other
Location: %s
Pragma: no-cache
Content-Type: text/html
''' % url
    redirect_template = Template(filename='redirect.html')
    print redirect_template.render(
        url = url,
    )

def main():
    try:
        if form.getvalue('submit') == 'add':
            add()
        if form.getvalue('submit') == 'remove':
            remove()
        if form.getvalue('submit') == 'undo':
            undo()
        redirect()
        # sync after everything else so render doesn't wait for it
        ul.sync()
    finally:
        ul.closeSFTP()

if __name__ == '__main__':
    main()

-------------- next part --------------
'''
ule_model -- for maintaining the integrity of ule's data.
'''

import re
from datetime import datetime
import os
from time import strptime
from sftp_wrapper import Session

MAIN_SERVER = 'ezproxy.library.xxxxxx.edu'
USERNAME = 'xxxxxx'
PASSWORD = 'xxxxxxx'
LDAP_USR_PATH = '/usr/local/ezproxy/ldap.usr'
DUMPURL = 'http://innopac.library.xxxxxx.edu:4500/PATRONAPI/%s/dump'
SECONDARY_SERVER = 'ezproxy2.library.xxxxxx.edu'

class UleError(Exception):
    pass

class UserList(object):
    def __init__(self):
        # characters allowed in usernames and passwords 
        self.allowed = r'\w-' 

        # userpass is the regex to parse ldap.usr for usernames and 
        # passwords
        self.userpass_re = re.compile(r'''
                ^           # start of line
                ([%s]*)     # username
                :           # colon
                ([%s]*)     # password
                $           # end of line
                ''' % (self.allowed, self.allowed), re.VERBOSE | re.MULTILINE)

        self.openSFTP()

    def openSFTP(self):
        self.ss = Session(MAIN_SERVER, USERNAME, PASSWORD)

    def closeSFTP(self):
        self.ss.close()

    def getList(self):
        ldap_usr_file = self.ss.open(LDAP_USR_PATH)
        self.ldap_usr_file_str = ldap_usr_file.read()
        ldap_usr_file.close()
        return self.userpass_re.findall(self.ldap_usr_file_str)
    
    def _getAdministrativa(self):
        admin_section = self.userpass_re.split(self.ldap_usr_file_str, 
            maxsplit = 1)[0]
        return admin_section
    
    def _backup(self):
        now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
        backup_file_name = LDAP_USR_PATH + '.' + now + '.bak'
        backup_file = self.ss.open(backup_file_name, 'w')
        backup_file.write(self.ldap_usr_file_str)
        backup_file.close()

    def _write(self, userpasses):
        self._backup()
        ldap_usr_file = self.ss.open(LDAP_USR_PATH, 'w')
        ldap_usr_file.write(self._getAdministrativa())
        userpass_lines = [username + ':' + password for (username, password) 
                in userpasses]
        ldap_usr_file.write('\n'.join(userpass_lines))
        ldap_usr_file.close()

    def add(self, username, password):
        userpasses = self.getList()
        userpasses.append((username, password))
        # Decorate-Sort-Undecorate
        dsu_list = [(x[0].lower(), x[0], x[1]) for x in userpasses]
        dsu_list.sort()
        userpasses = [(x[1], x[2]) for x in dsu_list]
        self._write(userpasses)

    def remove(self, user_list):
        userpasses = self.getList()
        removed_list = []
        for removal in user_list:
            removal_tuple = tuple(removal.split(':'))
            username = removal_tuple[0]
            if removal_tuple in userpasses:
                userpasses.remove(removal_tuple)
                removed_list.append(username)
        self._write(userpasses)
        return removed_list

    def getBackups(self):
        ldap_dir = os.path.dirname(LDAP_USR_PATH)
        file_list = self.ss.listdir(ldap_dir)
        backup_list = [x for x in file_list if x.startswith('ldap.usr.') 
            and x.endswith('.bak')]
        backup_list.sort()
        return (backup_list, ldap_dir)
        
    def undo(self):
        backup_list, ldap_dir = self.getBackups()
        try:
            most_recent = os.path.join(ldap_dir, backup_list.pop())
        except IndexError:
            raise UleError('no backups to revert to')
        bak_file = self.ss.open(most_recent)
        bak_file_str = bak_file.read()
        bak_file.close()
        ldap_usr_file = self.ss.open(LDAP_USR_PATH, 'w')
        ldap_usr_file.write(bak_file_str)
        self.ss.remove(most_recent)
        bak_time = most_recent.split('.')[2]
        now = datetime.now()
        bak_time_obj = datetime(*strptime(bak_time, "%Y-%m-%dT%H:%M:%S")[0:6])
        bak_time_delta = now - bak_time_obj
        return (bak_time_delta.days, bak_time_delta.seconds)

    def check_against_innopac(self):
        def innocheck(username):
            'check username against innopac dump'
            import urllib2
            dumpurl = DUMPURL % username
            dump = urllib2.urlopen(dumpurl)
            for line in dump:
                if 'REC INFO' in line:
                    return True

        new_userlist = []
        for userpass in self.getList():
            if not innocheck(userpass[0]):
                # the flag is len(userpass) == 3
                userpass = userpass + (1,)
            new_userlist.append(userpass)
        return new_userlist

    def _cleanup(self, max=10):
        'delete backups in excess of max'
        backup_list, ldap_dir = self.getBackups()
        backup_list.reverse()
        def cleanup_rec():
            if len(backup_list) > max:
                oldest = os.path.join(ldap_dir, backup_list.pop())
                self.ss.remove(oldest)
                cleanup_rec()
        cleanup_rec()
        
    def sync(self):
        'sync MAIN_SERVER and SECONDARY_SERVER'
        self._cleanup()
        ss2 = Session(SECONDARY_SERVER, USERNAME, PASSWORD)
        def remoteCopy(path):
            file1 = self.ss.open(path)
            file1_str = file1.read()
            file1.close()
            file2 = ss2.open(path, 'w')
            file2.write(file1_str)
            file2.close()
        # copy ldap.usr from main to secondary
        remoteCopy(LDAP_USR_PATH)

        # remove old backups and copy new
        backup_list, ldap_dir = self.getBackups()
        file_list2 = ss2.listdir(ldap_dir)
        backup_list2 = [x for x in file_list2 if x.startswith('ldap.usr.') 
            and x.endswith('.bak')]
        for backup in backup_list2:
            backup_path = os.path.join(ldap_dir, backup)
            ss2.remove(backup_path)
        for backup in backup_list:
            backup_path = os.path.join(ldap_dir, backup)
            remoteCopy(backup_path)

        ss2.close()

-------------- next part --------------
'''
sftp_wrapper is a wrapper class for the sftp portion of the 
paramiko library, located in lib/python/
'''

import sys
sys.path.insert(0, 'lib/python')

import paramiko

class Session(object):
    def __init__(self, hostname, username, password):
        self.make_sftp(hostname, username, password)
        # uncomment to save log -- for testing
        # hmm, doesn't seem to be working -- gsf 20061220
        #paramiko.util.log_to_file('sftp.log')
    
    def make_sftp(self, hostname, username, password):
        self.tunnel = paramiko.Transport((hostname, 22))
        hostkeys = paramiko.util.load_host_keys('known_hosts')
        hostkey = hostkeys[hostname]['ssh-rsa']
        self.tunnel.connect(username=username, password=password, 
            hostkey=hostkey)
        self.sftp = paramiko.SFTPClient.from_transport(self.tunnel)

    def open(self, filename, mode='r'):
        return self.sftp.open(filename, mode=mode)

    def chdir(self, dir):
        self.sftp.chdir(dir)

    def get(self, remote, local=None):
        if not local:
            local = remote
        self.sftp.get(remote, local)

    def put(self, local, remote=None):
        if not remote:
            remote = local
        self.sftp.put(local, remote)

    def listdir(self, remote):
        return self.sftp.listdir(remote)

    def remove(self, remote):
        self.sftp.remove(remote)

    def close(self):
        self.tunnel.close()

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://mail.python.org/pipermail/tutor/attachments/20070209/c6b7af05/attachment.html 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://mail.python.org/pipermail/tutor/attachments/20070209/c6b7af05/attachment-0001.html 


More information about the Tutor mailing list