Sample script using MySQLdb ?

Jeff Bauer jbauer at rubic.com
Wed Jan 3 11:51:38 EST 2001


Christophe Prevost wrote:
> Somebody have a quite complete script using this module?

Here's a cheezy little script I use for mysql dba stuff.
You might find it useful to get you started.  If the
code isn't formatted correctly I can email it to you
as an attachment.  Hope that helps.

Jeff Bauer
Rubicon Research


#!/usr/bin/env python
# mysqldba.py

"""
MySQL database administration routines

jbauer at rubic.com
"""

import os, sys, string
from string import join
from UserDict import UserDict
import getopt
from getpass import getpass
import MySQLdb

class DbTable(UserDict):
    """
    DbTable is a dictionary of 'db' table rows.
      key = (Db, Host, User) tuple
      value = dictionary of row values
    """
    def __init__(self, connection):
        self.connection = connection
        apply(UserDict.__init__, (self,))
        self.name = 'db'
        self.fields = [ 'Db',
                        'Host',
                        'User',
                        'Select_priv',
                        'Insert_priv',
                        'Update_priv',
                        'Delete_priv',
                        'Index_priv',
                        'Alter_priv',
                        'Create_priv',
                        'Drop_priv',
                        'Grant_priv', ]
        self.populate()

    def create_database(self, db_name, **kw):
        assert(db_name != 'mysql')
        assert(db_name not in self.database_list)
        c = self.connection.cursor()
        qs = "CREATE DATABASE %s" % db_name
        c.execute(qs)
        values = [ \
            db_name,
            kw.get('Host', '%'),
            kw.get('User', ''),
            kw.get('Select_priv', 'Y'),
            kw.get('Insert_priv', 'Y'),
            kw.get('Update_priv',  'Y'),
            kw.get('Delete_priv', 'Y'),
            kw.get('Index_priv', 'Y'),
            kw.get('Alter_priv', 'Y'),
            kw.get('Create_priv', 'Y'),
            kw.get('Drop_priv', 'N'),
            kw.get('Grant_priv', 'Y'), 
            ]
        self.reload()
        qs = "INSERT INTO %s (%s) VALUES (%s)" % \
             (self.name,
              join(self.fields, ', '),
              join(map(lambda x: "'%s'" % x, values), ', '))
        c.execute(qs)
        self.reload()

    def databases(self):
        """
        Return a sorted list of databases
        """
        databases = {}
        for db, host, user in self.keys():
            if db:
                databases[db] = None
        keys = databases.keys()
        keys.sort()
        return keys

    def drop_database(self, db_name):
        assert(db_name != 'mysql')
        c = self.connection.cursor()
        qs = "DROP DATABASE %s" % db_name
        c.execute(qs)
        qs = "DELETE FROM %s WHERE Db='%s'" % (self.name, db_name)
        c.execute(qs)
        self.reload()

    def populate(self):
        c = self.connection.cursor()
        qs = "SELECT %s FROM %s" % \
             (join(self.fields, ', '), self.name)
        c.execute(qs)
        _db = {}
        for r in c.fetchall():
            d = {}
            i = 0
            for f in self.fields:
                d[f] = r[i]
                i = i + 1
            self[(d['Db'], d['Host'], d['User'])] = d
            _db[d['Db']] = None
        self.database_list = _db.keys()
        self.database_list.sort()

    def reload(self):
        c = self.connection.cursor()
        c.execute("FLUSH PRIVILEGES")
        self.clear()
        self.populate()

class UserTable(UserDict):
    """
    UserTable is a dictionary of 'user' table rows.
      key = (Host, User) tuple
      value = dictionary of row values
    """
    def __init__(self, connection):
        self.connection = connection
        apply(UserDict.__init__, (self,))
        self.name = 'user'
        self.fields = [ 'Host',
                        'User',
                        'Password',
                        'Select_priv',
                        'Insert_priv',
                        'Update_priv',
                        'Delete_priv',
                        'Index_priv',
                        'Alter_priv',
                        'Create_priv',
                        'Drop_priv',
                        'Grant_priv',
                        'References_priv', 
                        'Reload_priv',
                        'Shutdown_priv',
                        'Process_priv',
                        'File_priv', ]
        self.populate()

    def add_user(self, user, **kw):
        assert(user != 'root')
        if kw.has_key('Password'):
            password = "PASSWORD('%s')" % password
        else:
            password = "''"
        values = [ \
            "'%s'" % kw.get('Host', '%'),
            "'%s'" % user,
            password,
            "'%s'" % kw.get('Select_priv', 'Y'),
            "'%s'" % kw.get('Insert_priv', 'Y'),
            "'%s'" % kw.get('Update_priv',  'Y'),
            "'%s'" % kw.get('Delete_priv', 'Y'),
            "'%s'" % kw.get('Index_priv', 'Y'),
            "'%s'" % kw.get('Alter_priv', 'N'),
            "'%s'" % kw.get('Create_priv', 'Y'),
            "'%s'" % kw.get('Drop_priv', 'Y'),
            "'%s'" % kw.get('Grant_priv', 'N'),
            "'%s'" % kw.get('References_priv', 'Y'),
            "'%s'" % kw.get('Reload_priv', 'Y'),
            "'%s'" % kw.get('Shutdown_priv', 'N'),
            "'%s'" % kw.get('Process_priv', 'N'),
            "'%s'" % kw.get('File_priv', 'N'),
            ]
        qs = "INSERT INTO %s (%s) VALUES (%s)" % \
             (self.name,
              join(self.fields, ', '),
              join(values, ', '))
        c = self.connection.cursor()
        c.execute(qs)
        self.reload()

    def del_user(self, user, host=''):
        if host:
            wc = "User='%s' AND Host='%s'" % (user, host)
        else:
            wc = "User='%s'" % user
        qs = "DELETE FROM %s WHERE %s" % (self.name, wc)
        c = self.connection.cursor()
        c.execute(qs)
        self.reload()

    def mod_password(self, user, password):
        c = self.connection.cursor()
        qs = "UPDATE %s SET Password=PASSWORD('%s') WHERE user='%s'" % \
             (self.name, password, user)
        c.execute(qs)
        c.execute("FLUSH PRIVILEGES")

    def mod_user(self, user, **kw):
        f = []
        v = []
        if kw.has_key('Host'):
            host = kw['Host']
        else:
            host = ''
        values = []
        for k in kw.keys():
            if k in self.fields:
                if k == 'User':
                    assert(kw[k] == user)
                values.append("%s='%s'" % (k, kw[k]))
        if host:
            wc = "User='%s' AND Host='%s'" % (user, host)
        else:
            wc = "User='%s'" % user
        qs = "UPDATE %s SET %s WHERE %s" % \
             (self.name,
              join(values, ', '),
              wc)
        c = self.connection.cursor()
        c.execute(qs)
        self.reload()

    def populate(self):
        c = self.connection.cursor()
        qs = "SELECT %s FROM %s" % \
             (join(self.fields, ', '), self.name)
        c.execute(qs)
        _user = {}
        for r in c.fetchall():
            d = {}
            i = 0
            for f in self.fields:
                d[f] = r[i]
                i = i + 1
            self[(d['Host'], d['User'])] = d
            if d['User']:
                _user[d['User']] = None
        self.user_list = _user.keys()
        self.user_list.sort()

    def reload(self):
        c = self.connection.cursor()
        c.execute("FLUSH PRIVILEGES")
        self.clear()
        self.populate()

class HostTable(UserDict):
    """
    HostTable is a dictionary of 'host' table rows.
      key = (Host, Db) tuple
      value = dictionary of row values
    """
    def __init__(self, connection):
        self.connection = connection
        apply(UserDict.__init__, (self,))
        self.name = 'host'
        self.fields = [ 'Host',
                        'Db',
                        'Select_priv',
                        'Insert_priv',
                        'Update_priv',
                        'Delete_priv',
                        'Index_priv',
                        'Alter_priv',
                        'Create_priv',
                        'Drop_priv',
                        'Grant_priv', ]
        self.populate()

    def add_host(self, host, **kw):
        values = [ \
            host,
            kw.get('Db', ''),
            kw.get('Select_priv', 'Y'),
            kw.get('Insert_priv', 'Y'),
            kw.get('Update_priv',  'Y'),
            kw.get('Delete_priv', 'Y'),
            kw.get('Index_priv', 'Y'),
            kw.get('Alter_priv', 'Y'),
            kw.get('Create_priv', 'Y'),
            kw.get('Drop_priv', 'N'),
            kw.get('Grant_priv', 'N'),
            kw.get('References_priv', 'Y'),
            kw.get('Reload_priv', 'Y'),
            kw.get('Shutdown_priv', 'N'),
            kw.get('Process_priv', 'N'),
            kw.get('File_priv', 'N'),
            ]
        qs = "INSERT INTO %s (%s) VALUES (%s)" % \
             (self.name,
              join(self.fields, ', '),
              join(map(lambda x: "'%s'" % x, values), ', '))
        c = self.connection.cursor()
        c.execute(qs)
        self.reload()

    def del_host(self, host, db=''):
        assert(host != 'localhost')
        pass #XXX  self.reload()

    def mod_host(self, host, **kw):
        pass #XXX  self.reload()

    def mod_password(self, user, password):
        self.userTable.mod_password(user, password)

    def populate(self):
        c = self.connection.cursor()
        qs = "SELECT %s FROM %s" % \
             (join(self.fields, ', '), self.name)
        c.execute(qs)
        _host = {}
        for r in c.fetchall():
            d = {}
            i = 0
            for f in self.fields:
                d[f] = r[i]
                i = i + 1
            self[(d['Host'], d['Db'])] = d
            _host[d['Host']] = None
        self.host_list = _host.keys()
        self.host_list.sort()

    def reload(self):
        c = self.connection.cursor()
        c.execute("FLUSH PRIVILEGES")
        self.clear()
        self.populate()

class Server:
    def __init__(self, **kw):
        self.connection = apply(MySQLdb.connect, (), kw)
        self.dbTable = DbTable(self.connection)
        self.hostTable = HostTable(self.connection)
        self.userTable = UserTable(self.connection)

    def add_host(self, host, **kw):
        apply(self.hostTable.add_host, (host,), kw)

    def add_user(self, user, **kw):
        apply(self.userTable.add_user, (user,), kw)

    def create_database(self, db_name, **kw):
        apply(self.dbTable.create_database, (db_name,), kw)

    def del_host(self, host, db=''):
        self.hostTable.del_host(host, db)

    def del_user(self, user, host=''):
        self.userTable.del_user(user, host)

    def drop_database(self, db_name):
        self.dbTable.drop_database(db_name)

    def __get_password(self, user):
        passwd1 = getpass("Enter MySQL user '%s' password: " % user)
        passwd2 = getpass("Re-enter password: ")
        if passwd1 == passwd2:
            return passwd1
        else:
            return None

    def mod_host(self, host, **kw):
        apply(self.userTable.mod_host, (host,), kw)

    def mod_user(self, user, **kw):
        apply(self.userTable.mod_user, (user,), kw)

    def process(self, operation, args=[]):
        c = self.connection.cursor()
        if len(args) < 1 and \
           operation not in ('dblist', 'hostlist', 'userlist'):
            print "error: argument[s] required for %s operation" %
operation
            return
        if operation == 'addhost':
            print "operation: '%s' not yet implemented" % operation #XXX
        elif operation == 'adduser':
            self.add_user(args[0])
        elif operation == 'dbcreate':
            self.create_database(args[0])
        elif operation == 'dbdrop':
            assert(args[0] != 'mysql')
            self.drop_database(args[0])
        elif operation == 'dblist':
            c.execute("SHOW DATABASES")
            db_list = map(lambda x: x[0], c.fetchall())
            db_list.sort()
            for db_name in db_list:
                print db_name
        elif operation == 'hostlist':
            for host in self.hostTable.host_list:
                print host
        elif operation == 'moduser':
            print "operation: '%s' not yet implemented" % operation #XXX
        elif operation == 'passwd':
            if len(args) < 2:
                while 1:
                    passwd = self.__get_password(args[0])
                    if passwd:
                        break
            else:
                passwd = args[1]
            self.userTable.mod_password(args[0], passwd)
        elif operation == 'rmhost':
            print "operation: '%s' not yet implemented" % operation #XXX
        elif operation == 'rmuser':
            assert(args[0] != 'root')
            self.del_user(args[0])
        elif operation == 'tblist':
            c.execute("SHOW TABLES FROM %s" % args[0])
            table_list = map(lambda x: x[0], c.fetchall())
            table_list.sort()
            for tb_name in table_list:
                print tb_name
        elif operation == 'userlist':
            for user in self.userTable.user_list:
                print user
        else:
            print "error: invalid operation: %s" % operation

if __name__ == '__main__':
    user = 'root'
    host = 'localhost'
    passwd = ''
    usage = """
usage: %s [options] operation

options:
    -u user      default='%s'
    -h host      name or IP address, default='%s'
    -p password

operation:
    addhost   [hostname]
    adduser   [username]    add user
    dbcreate  [dbname]      create a database with default privileges
    dbdrop    [dbname]      drop a database (BE VERY CAREFUL)
    dblist                  list databases
    hostlist                list hosts
    moduser   [username userpriv=X]  modify user privileges (e.g.
Grant_priv=Y)
    passwd    [username [password]]  change user password
    rmhost    [hostname]    remove host
    rmuser    [username]    remove user
    tblist    [dbname]      list database tables
    userlist                list users
""" % (os.path.basename(sys.argv[0]), user, host)
    optlist, args = getopt.getopt(sys.argv[1:], 'h:p:u:')

    if len(sys.argv) < 2:
        print usage; sys.exit(1)
    argv = []
    for _arg in sys.argv[1:]:
        argv.append(_arg)

    for (opt, optarg) in optlist:
        if opt == '-u':
            user = optarg
        elif opt == '-h':
            host = optarg
        elif opt == '-p':
            passwd = optarg
        else:
            print "error: invalid option: %s" % opt
            print usage; sys.exit(1)
        argv.remove(opt)
        argv.remove(optarg)
    if len(argv) < 1:
        print usage
        sys.exit(1)
    if not passwd:
        passwd = getpass("Enter MySQL user '%s' password: " % user)

    server = Server(host=host, db='mysql', user=user, passwd=passwd)
    server.process(argv[0], argv[1:])




More information about the Python-list mailing list