[Twisted-Python] basic auth example with db backend

This is a somewhat less trivial (and more robust) example than the "cred" example I posted earlier. BasicAuthResource is lifted from moshez's example in bug 393 with some typos corrected and a notification to the user if something gets majorly screwed up, so the auth form doesn't continue to pop up in that case. UserDbCredChecker borrows radix's approach from a db auth he wrote for a spike we did a while back. Again, I'd appreciate any suggestions/comments from twisted gurus -- especially if you see any security problems, etc.! Cheers, Steve # Twisted, the Framework of Your Internet # Copyright (C) 2001 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ This is a basic auth example with a db-based checker. Note: this example assumes that PostgreSQL is installed and a database named 'userdb' has been created. If it has, this script will set up the users table, populate it with a test user, and drop the table when it exits, so it can be modified and run again. """ import sys import md5 from pyPgSQL import PgSQL from twisted.cred import error from twisted.cred import portal from twisted.cred import checkers from twisted.cred import credentials from twisted.enterprise import adbapi from twisted.internet import reactor from twisted.python import components from twisted.python import failure from twisted.python import log from twisted.web import resource from twisted.web import server schema = """ CREATE TABLE users ( username varchar(64) PRIMARY KEY, password varchar(32) ) """ testdata = """ INSERT INTO users VALUES ( 'Zaphod', '%s' ) """ % md5.md5('s3krit').hexdigest() class AvatarResource: __implements__ = resource.IResource def __init__(self, id): self.id = id def render(self, request): return """<html> Hello, <b>%s</b> ...<br> Welcome to the Restaurant at the End of the Universe! </html> """ % self.id def logout(self): pass class Realm: __implements__ = portal.IRealm def requestAvatar(self, avatarId, mind, *interfaces): av = AvatarResource(avatarId) return resource.IResource, av, av.logout class UserDb: def __init__(self): # db setup and teardown are synchronous; no big deal. conn = PgSQL.connect(database='userdb') curs = conn.cursor() curs.execute(schema) curs.execute(testdata) conn.commit() conn.close() self.pool = adbapi.ConnectionPool('pyPgSQL.PgSQL', database='userdb') def teardown(self): conn = PgSQL.connect(database='userdb') curs = conn.cursor() curs.execute('DELETE FROM users') curs.execute('DROP TABLE users') conn.commit() conn.close() class UserDbCredChecker: __implements__ = (checkers.ICredentialsChecker,) credentialInterfaces = (credentials.IUsernamePassword,) def __init__(self, userdb): self.db = userdb def _verifyCreds(self, creds): """ Verify credentials, returning the username if successful. """ res = self.db.pool.runQuery( """SELECT * from users where username = %s and password = %s""", (creds.username, md5.md5(creds.password).hexdigest())) def succeeded(r): try: if len(r) == 1: return creds.username elif len(r) == 0: return failure.Failure(error.UnauthorizedLogin()) except: # Houston, we have a problem ... # should probably have user notify admin e = 'User database appears to be borken' raise ValueError, e def failed(e): raise LookupError, e return res.addCallbacks(succeeded, failed) def requestAvatarId(self, c): if not c.username or not c.password: return failure.Failure(error.UnauthorizedLogin()) return self._verifyCreds(c) class BasicAuthResource: __implements__ = resource.IResource isLeaf = True httpRealm="Restaurant at the End of the Universe" def __init__(self, portal): self.portal = portal def render(self, request): username, password = request.getUser(), request.getPassword() creds = credentials.UsernamePassword(username, password) d = self.portal.login(creds, None, resource.IResource) def cb((_, r, logout)): request.notifyFinish().addBoth(lambda _: logout()) result = resource.getChildForRequest(r, request).render(request) if result != server.NOT_DONE_YET: request.write(result) request.finish() def eb(f): try: f.trap(error.LoginFailed, error.UnauthorizedLogin) request.setResponseCode(401) request.setHeader("www-authenticate", 'Basic realm="%s"' % self.httpRealm) request.finish() except: log.msg('Error during auth: %s' % f) MALFUNCTION = """<html><h2>Something untoward happened!</h2> <hr>Please notify the sysadmin: <a href="mailto:sysadmin@host.domain"> sysadmin@host.domain</a><hr></html>""" request.write(MALFUNCTION) request.finish() d.addCallbacks(callback=cb,errback=eb) d.addErrback(log.err) return server.NOT_DONE_YET r = Realm() p = portal.Portal(r) d = UserDb() c = UserDbCredChecker(d) p.registerChecker(c) root.putChild('login', BasicAuthResource(p)) log.startLogging(sys.stdout) site = server.Site(root) reactor.listenTCP(1999, site) reactor.run() d.teardown()
participants (1)
-
Stephen Waterbury