Sample script using MySQLdb ?
Jeff Bauer
jbauer at rubic.com
Wed Jan 3 11:51:38 EST 2001
Christophe Prevost wrote:
> Somebody have a quite complete script using this module?
Here's a cheezy little script I use for mysql dba stuff.
You might find it useful to get you started. If the
code isn't formatted correctly I can email it to you
as an attachment. Hope that helps.
Jeff Bauer
Rubicon Research
#!/usr/bin/env python
# mysqldba.py
"""
MySQL database administration routines
jbauer at rubic.com
"""
import os, sys, string
from string import join
from UserDict import UserDict
import getopt
from getpass import getpass
import MySQLdb
class DbTable(UserDict):
"""
DbTable is a dictionary of 'db' table rows.
key = (Db, Host, User) tuple
value = dictionary of row values
"""
def __init__(self, connection):
self.connection = connection
apply(UserDict.__init__, (self,))
self.name = 'db'
self.fields = [ 'Db',
'Host',
'User',
'Select_priv',
'Insert_priv',
'Update_priv',
'Delete_priv',
'Index_priv',
'Alter_priv',
'Create_priv',
'Drop_priv',
'Grant_priv', ]
self.populate()
def create_database(self, db_name, **kw):
assert(db_name != 'mysql')
assert(db_name not in self.database_list)
c = self.connection.cursor()
qs = "CREATE DATABASE %s" % db_name
c.execute(qs)
values = [ \
db_name,
kw.get('Host', '%'),
kw.get('User', ''),
kw.get('Select_priv', 'Y'),
kw.get('Insert_priv', 'Y'),
kw.get('Update_priv', 'Y'),
kw.get('Delete_priv', 'Y'),
kw.get('Index_priv', 'Y'),
kw.get('Alter_priv', 'Y'),
kw.get('Create_priv', 'Y'),
kw.get('Drop_priv', 'N'),
kw.get('Grant_priv', 'Y'),
]
self.reload()
qs = "INSERT INTO %s (%s) VALUES (%s)" % \
(self.name,
join(self.fields, ', '),
join(map(lambda x: "'%s'" % x, values), ', '))
c.execute(qs)
self.reload()
def databases(self):
"""
Return a sorted list of databases
"""
databases = {}
for db, host, user in self.keys():
if db:
databases[db] = None
keys = databases.keys()
keys.sort()
return keys
def drop_database(self, db_name):
assert(db_name != 'mysql')
c = self.connection.cursor()
qs = "DROP DATABASE %s" % db_name
c.execute(qs)
qs = "DELETE FROM %s WHERE Db='%s'" % (self.name, db_name)
c.execute(qs)
self.reload()
def populate(self):
c = self.connection.cursor()
qs = "SELECT %s FROM %s" % \
(join(self.fields, ', '), self.name)
c.execute(qs)
_db = {}
for r in c.fetchall():
d = {}
i = 0
for f in self.fields:
d[f] = r[i]
i = i + 1
self[(d['Db'], d['Host'], d['User'])] = d
_db[d['Db']] = None
self.database_list = _db.keys()
self.database_list.sort()
def reload(self):
c = self.connection.cursor()
c.execute("FLUSH PRIVILEGES")
self.clear()
self.populate()
class UserTable(UserDict):
"""
UserTable is a dictionary of 'user' table rows.
key = (Host, User) tuple
value = dictionary of row values
"""
def __init__(self, connection):
self.connection = connection
apply(UserDict.__init__, (self,))
self.name = 'user'
self.fields = [ 'Host',
'User',
'Password',
'Select_priv',
'Insert_priv',
'Update_priv',
'Delete_priv',
'Index_priv',
'Alter_priv',
'Create_priv',
'Drop_priv',
'Grant_priv',
'References_priv',
'Reload_priv',
'Shutdown_priv',
'Process_priv',
'File_priv', ]
self.populate()
def add_user(self, user, **kw):
assert(user != 'root')
if kw.has_key('Password'):
password = "PASSWORD('%s')" % password
else:
password = "''"
values = [ \
"'%s'" % kw.get('Host', '%'),
"'%s'" % user,
password,
"'%s'" % kw.get('Select_priv', 'Y'),
"'%s'" % kw.get('Insert_priv', 'Y'),
"'%s'" % kw.get('Update_priv', 'Y'),
"'%s'" % kw.get('Delete_priv', 'Y'),
"'%s'" % kw.get('Index_priv', 'Y'),
"'%s'" % kw.get('Alter_priv', 'N'),
"'%s'" % kw.get('Create_priv', 'Y'),
"'%s'" % kw.get('Drop_priv', 'Y'),
"'%s'" % kw.get('Grant_priv', 'N'),
"'%s'" % kw.get('References_priv', 'Y'),
"'%s'" % kw.get('Reload_priv', 'Y'),
"'%s'" % kw.get('Shutdown_priv', 'N'),
"'%s'" % kw.get('Process_priv', 'N'),
"'%s'" % kw.get('File_priv', 'N'),
]
qs = "INSERT INTO %s (%s) VALUES (%s)" % \
(self.name,
join(self.fields, ', '),
join(values, ', '))
c = self.connection.cursor()
c.execute(qs)
self.reload()
def del_user(self, user, host=''):
if host:
wc = "User='%s' AND Host='%s'" % (user, host)
else:
wc = "User='%s'" % user
qs = "DELETE FROM %s WHERE %s" % (self.name, wc)
c = self.connection.cursor()
c.execute(qs)
self.reload()
def mod_password(self, user, password):
c = self.connection.cursor()
qs = "UPDATE %s SET Password=PASSWORD('%s') WHERE user='%s'" % \
(self.name, password, user)
c.execute(qs)
c.execute("FLUSH PRIVILEGES")
def mod_user(self, user, **kw):
f = []
v = []
if kw.has_key('Host'):
host = kw['Host']
else:
host = ''
values = []
for k in kw.keys():
if k in self.fields:
if k == 'User':
assert(kw[k] == user)
values.append("%s='%s'" % (k, kw[k]))
if host:
wc = "User='%s' AND Host='%s'" % (user, host)
else:
wc = "User='%s'" % user
qs = "UPDATE %s SET %s WHERE %s" % \
(self.name,
join(values, ', '),
wc)
c = self.connection.cursor()
c.execute(qs)
self.reload()
def populate(self):
c = self.connection.cursor()
qs = "SELECT %s FROM %s" % \
(join(self.fields, ', '), self.name)
c.execute(qs)
_user = {}
for r in c.fetchall():
d = {}
i = 0
for f in self.fields:
d[f] = r[i]
i = i + 1
self[(d['Host'], d['User'])] = d
if d['User']:
_user[d['User']] = None
self.user_list = _user.keys()
self.user_list.sort()
def reload(self):
c = self.connection.cursor()
c.execute("FLUSH PRIVILEGES")
self.clear()
self.populate()
class HostTable(UserDict):
"""
HostTable is a dictionary of 'host' table rows.
key = (Host, Db) tuple
value = dictionary of row values
"""
def __init__(self, connection):
self.connection = connection
apply(UserDict.__init__, (self,))
self.name = 'host'
self.fields = [ 'Host',
'Db',
'Select_priv',
'Insert_priv',
'Update_priv',
'Delete_priv',
'Index_priv',
'Alter_priv',
'Create_priv',
'Drop_priv',
'Grant_priv', ]
self.populate()
def add_host(self, host, **kw):
values = [ \
host,
kw.get('Db', ''),
kw.get('Select_priv', 'Y'),
kw.get('Insert_priv', 'Y'),
kw.get('Update_priv', 'Y'),
kw.get('Delete_priv', 'Y'),
kw.get('Index_priv', 'Y'),
kw.get('Alter_priv', 'Y'),
kw.get('Create_priv', 'Y'),
kw.get('Drop_priv', 'N'),
kw.get('Grant_priv', 'N'),
kw.get('References_priv', 'Y'),
kw.get('Reload_priv', 'Y'),
kw.get('Shutdown_priv', 'N'),
kw.get('Process_priv', 'N'),
kw.get('File_priv', 'N'),
]
qs = "INSERT INTO %s (%s) VALUES (%s)" % \
(self.name,
join(self.fields, ', '),
join(map(lambda x: "'%s'" % x, values), ', '))
c = self.connection.cursor()
c.execute(qs)
self.reload()
def del_host(self, host, db=''):
assert(host != 'localhost')
pass #XXX self.reload()
def mod_host(self, host, **kw):
pass #XXX self.reload()
def mod_password(self, user, password):
self.userTable.mod_password(user, password)
def populate(self):
c = self.connection.cursor()
qs = "SELECT %s FROM %s" % \
(join(self.fields, ', '), self.name)
c.execute(qs)
_host = {}
for r in c.fetchall():
d = {}
i = 0
for f in self.fields:
d[f] = r[i]
i = i + 1
self[(d['Host'], d['Db'])] = d
_host[d['Host']] = None
self.host_list = _host.keys()
self.host_list.sort()
def reload(self):
c = self.connection.cursor()
c.execute("FLUSH PRIVILEGES")
self.clear()
self.populate()
class Server:
def __init__(self, **kw):
self.connection = apply(MySQLdb.connect, (), kw)
self.dbTable = DbTable(self.connection)
self.hostTable = HostTable(self.connection)
self.userTable = UserTable(self.connection)
def add_host(self, host, **kw):
apply(self.hostTable.add_host, (host,), kw)
def add_user(self, user, **kw):
apply(self.userTable.add_user, (user,), kw)
def create_database(self, db_name, **kw):
apply(self.dbTable.create_database, (db_name,), kw)
def del_host(self, host, db=''):
self.hostTable.del_host(host, db)
def del_user(self, user, host=''):
self.userTable.del_user(user, host)
def drop_database(self, db_name):
self.dbTable.drop_database(db_name)
def __get_password(self, user):
passwd1 = getpass("Enter MySQL user '%s' password: " % user)
passwd2 = getpass("Re-enter password: ")
if passwd1 == passwd2:
return passwd1
else:
return None
def mod_host(self, host, **kw):
apply(self.userTable.mod_host, (host,), kw)
def mod_user(self, user, **kw):
apply(self.userTable.mod_user, (user,), kw)
def process(self, operation, args=[]):
c = self.connection.cursor()
if len(args) < 1 and \
operation not in ('dblist', 'hostlist', 'userlist'):
print "error: argument[s] required for %s operation" %
operation
return
if operation == 'addhost':
print "operation: '%s' not yet implemented" % operation #XXX
elif operation == 'adduser':
self.add_user(args[0])
elif operation == 'dbcreate':
self.create_database(args[0])
elif operation == 'dbdrop':
assert(args[0] != 'mysql')
self.drop_database(args[0])
elif operation == 'dblist':
c.execute("SHOW DATABASES")
db_list = map(lambda x: x[0], c.fetchall())
db_list.sort()
for db_name in db_list:
print db_name
elif operation == 'hostlist':
for host in self.hostTable.host_list:
print host
elif operation == 'moduser':
print "operation: '%s' not yet implemented" % operation #XXX
elif operation == 'passwd':
if len(args) < 2:
while 1:
passwd = self.__get_password(args[0])
if passwd:
break
else:
passwd = args[1]
self.userTable.mod_password(args[0], passwd)
elif operation == 'rmhost':
print "operation: '%s' not yet implemented" % operation #XXX
elif operation == 'rmuser':
assert(args[0] != 'root')
self.del_user(args[0])
elif operation == 'tblist':
c.execute("SHOW TABLES FROM %s" % args[0])
table_list = map(lambda x: x[0], c.fetchall())
table_list.sort()
for tb_name in table_list:
print tb_name
elif operation == 'userlist':
for user in self.userTable.user_list:
print user
else:
print "error: invalid operation: %s" % operation
if __name__ == '__main__':
user = 'root'
host = 'localhost'
passwd = ''
usage = """
usage: %s [options] operation
options:
-u user default='%s'
-h host name or IP address, default='%s'
-p password
operation:
addhost [hostname]
adduser [username] add user
dbcreate [dbname] create a database with default privileges
dbdrop [dbname] drop a database (BE VERY CAREFUL)
dblist list databases
hostlist list hosts
moduser [username userpriv=X] modify user privileges (e.g.
Grant_priv=Y)
passwd [username [password]] change user password
rmhost [hostname] remove host
rmuser [username] remove user
tblist [dbname] list database tables
userlist list users
""" % (os.path.basename(sys.argv[0]), user, host)
optlist, args = getopt.getopt(sys.argv[1:], 'h:p:u:')
if len(sys.argv) < 2:
print usage; sys.exit(1)
argv = []
for _arg in sys.argv[1:]:
argv.append(_arg)
for (opt, optarg) in optlist:
if opt == '-u':
user = optarg
elif opt == '-h':
host = optarg
elif opt == '-p':
passwd = optarg
else:
print "error: invalid option: %s" % opt
print usage; sys.exit(1)
argv.remove(opt)
argv.remove(optarg)
if len(argv) < 1:
print usage
sys.exit(1)
if not passwd:
passwd = getpass("Enter MySQL user '%s' password: " % user)
server = Server(host=host, db='mysql', user=user, passwd=passwd)
server.process(argv[0], argv[1:])
More information about the Python-list
mailing list