Request for comments - concurrent ssh client

mk mrkafk at gmail.com
Wed Nov 4 15:00:38 CET 2009


Hello everyone,

Since I'm not happy with shmux or pssh, I wrote my own "concurrent ssh" 
program for parallel execution of SSH commands on multiple hosts. Before 
I release program to the wild, I would like to hear (constructive) 
comments on what may be wrong with the program and/or how to fix it. 
(note: the program requires paramiko ssh client module)

#!/usr/local/bin/python -W ignore::DeprecationWarning

import time
import sys
import os
import operator
import paramiko
import threading
import subprocess
import optparse

usage = "Usage: cssh [options] IP1 hostname2 IP3 hostname4 
...\n\n(IPs/hostnames on the commandline are actually optional, they can 
be specified in the file, see below.)"
op = optparse.OptionParser(usage=usage)

op.add_option('-c','--cmd',dest='cmd',help="""Command to run. Mutually 
exclusive with -s.""")
op.add_option('-s','--script',dest='script',help="""Script file to run. 
Mutually exclusive with -c. Script can have its own arguments, specify 
them in doublequotes, like "script -arg arg".""")
op.add_option('-i','--script-dir',dest='scriptdir',help="""The directory 
where script will be copied and executed. Defaults to /tmp.""")
op.add_option('-l','--cleanup',dest='cleanup',action='store_true',help="""Delete 
the script on remote hosts after executing it.""")
op.add_option('-f','--file',dest='file',help="""File with hosts to use, 
one host per line. Concatenated with list of hosts/IP addresses 
specified at the end of the commandline. Optionally, in a line of the 
file you can specify sequence: "Address/Hostname Username Password 
SSH_Port" separated by spaces (additional parameters can be specified on 
a subset of lines; where not specified, relevant parameters take default 
values).""")
op.add_option('-d','--dir',dest='dir',help='Directory for storing 
standard output and standard error of command. If specified, directory 
will be created, with subdirs named IPs/hostnames and relevant files 
stored in those subdirs.')
op.add_option('-u','--username',dest='username',help="""Username to 
specify for SSH. Defaults to 'root'.""")
op.add_option('-p','--password',dest='password',help="""Password. 
Password is used first; if connection fails using password, cssh uses 
SSH key (default or specified).""")
op.add_option('-o','--port',dest='port',help="""Default SSH port.""")
op.add_option('-k','--key',dest='key',help="""SSH Key file. Defaults to 
'/root/.ssh/id_dsa'.""")
op.add_option('-n','--nokey',dest='nokey',action="store_true", 
help="""Turns off using SSH key.""")
op.add_option('-t','--timeout',dest='timeout',help="""SSH connection 
timeout. Defaults to 20 seconds.""")
op.add_option('-m','--monochromatic',dest='mono',action='store_true',help="""Do 
not use colors while printing output.""")
op.add_option('-r','--maxthreads',dest='maxthreads',help="""Maximum 
number of threads working concurrently. Default is 100. Exceeding 200 is 
generally not recommended due to potential exhaustion of address space 
(each thread can use 10 MB of address space and 32-bit systems have a 
maximum of 4GB of address space).""")
op.add_option('-q','--quiet',dest='quiet',action='store_true',help="""Quiet. 
Do not print out summaries like IPs for which communication succeeded or 
failed, etc.""")

# add resource file?

(opts, args) = op.parse_args()

failit = False

if opts.cmd == None and opts.script == None:
     print "You have to specify one of the following: command to run, 
using -c command or --cmd command, or script to run, using -s scriptfile 
or --script scriptfile."
     print
     failit = True

if opts.cmd != None and opts.script != None:
     print "Options command (-c) and script (-s) are mutually exclusive. 
Specify either one."
     print
     failit = True

if opts.cmd == None and opts.script != None:
     try:
         scriptpath = opts.script.split()[0]
         scriptfo = open(scriptpath,'r')
         scriptfo.close()
     except IOError:
         print "Could not open script file %s." % opts.script
         print
         failit = True

if opts.file == None and args == []:
     print "You have to specify at least one of the following:"
     print " - list of IPs/hostnames at the end of the command line 
(after all options)"
     print " - list of IPs/hostnames stored in file specified after -f 
or --file option (like: -f hostnames.txt)"
     print " You can also specify both sources. In that case IP/hostname 
lists will be concatenated."
     print
     failit = True

if opts.password == None and opts.nokey:
     print "Since using key has been turned off using -n option, you 
have to specify password using -p password or --password password."
     print
     failit = True

if opts.key is not None and opts.nokey:
     print "Options -n and -k keyfile are mutually exclusive. Specify 
either one."
     print
     failit = True

if failit:
     sys.exit(0)

if opts.scriptdir == None:
     opts.scriptdir = '/tmp'

if opts.cleanup == None:
     opts.cleanup = False

if opts.key == None:
     opts.key = '/root/.ssh/id_dsa'

if opts.port == None:
     opts.port = 22

if opts.nokey:
     opts.key = None

if opts.timeout == None:
     opts.timeout = 20

if opts.mono == None:
     opts.mono = False

if opts.maxthreads == None:
     opts.maxthreads = 100

if opts.quiet == None:
     opts.quiet = False


HEADER = '\033[95m'
BLACK = '\033[30m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
OKBLUE = '\033[94m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
ENDC = '\033[0m'

if opts.mono:
     HEADER = BLACK = RED = GREEN = YELLOW = OKBLUE = BLUE = MAGENTA = 
CYAN = WHITE = ENDC = ''

hosts = args[:]
fhosts = []
fname = opts.file

if fname is not None:
     try:
         fhosts = open(fname).readlines()
     except IOError, e:
         print "Error:", str(e)
         sys.exit(1)

hosts.extend(fhosts)
hosts = [ s.strip() for s in hosts if s != '' and s != None and s != '\n' ]
hosts = [ s.split() for s in hosts ]

if hosts == []:
     print "Error: list of hosts is empty. Quitting"
     sys.exit(1)


class SSHThread(threading.Thread):
     def __init__(self, lock, cmd, ip, username, sshprivkey=None, 
passw=None, port=22, script=None, scriptdir=None):

         threading.Thread.__init__(self)

         self.lock = lock
         self.cmd = cmd
         self.ip = ip
         self.username = username
         self.sshprivkey = sshprivkey
         self.passw = passw
         self.port = port
         self.conobj = None
         self.confailed = True
         if script != None:
             scriptcomp = script.strip().split()
             self.scriptpath = scriptcomp[0]
             self.scriptname = self.scriptpath.split('/')[-1]
             self.scriptargs = script[len(scriptpath):]
             self.scriptdir = scriptdir.strip().rstrip('/')
             self.rspath = self.scriptdir + '/' + self.scriptname
         self.finished = False

     def ping(self, lock, ip):
         subp = subprocess.Popen(['/bin/ping', '-c', '1', ip], 
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
         so, se = subp.communicate()
         return (so, se)

     def ssh_connect(self):
         self.conobj = paramiko.SSHClient()
         aap = paramiko.AutoAddPolicy()
         self.conobj.set_missing_host_key_policy(aap)
         loginsuccess = False
         if self.passw is not None:
             try:
                 self.conobj.connect(self.ip, username=self.username, 
password=self.passw, port=self.port, timeout=opts.timeout, 
allow_agent=False, look_for_keys = False)
                 loginsuccess = True
             except:
                 pass
         if not loginsuccess and self.sshprivkey is not None:
             try:
                 self.conobj.connect(self.ip, username=self.username, 
key_filename=self.sshprivkey, port=self.port, timeout=opts.timeout)
                 loginsuccess = True
             except:
                 pass
         if not loginsuccess:
             self.conobj = None
             self.finished = True

     def execcmds(self):
         so = se = ''
         try:
             si, so, se = self.conobj.exec_command(self.cmd)
             sol = so.readlines()
             sel = se.readlines()
             so = ''.join([ s.replace('\r\n','\n') for s in sol ])
             se = ''.join([ s.replace('\r\n','\n') for s in sel ])
         except:
             pass
         return (so, se)

     def sendscript(self):
         fo = open(self.scriptpath,'rb')
         cnt = ''.join(fo.readlines())
         transport = self.conobj.get_transport()
         channel = transport.open_session()
         destpath = self.scriptdir + '/' + self.scriptname
         try:
             channel.exec_command('scp -t -v %s\n' % destpath)
         except paramiko.SSHException, e:
             channel.close()
             return str(e)
         fl = 'C0755 %d 1\n' % os.path.getsize(self.scriptpath)
         channel.send(fl)
         while not channel.recv_ready():
             time.sleep(0.1)
         try:
             channel.send(cnt)
         except socket.error, e:
             channel.close()
             return str(e)
         channel.close()
         return ''

     def setcmdtoscript(self):
         self.cmd = self.scriptdir + '/' + self.scriptname + self.scriptargs

     def execcmdonscript(self, cmd):
         si, so, se = self.conobj.exec_command(cmd)
         sol = so.readlines()
         sel = se.readlines()
         if sol != [] or sel != []:
             self.lock.acquire()
             print RED + "Host %s, Error while executing %s on script:" 
% (self.ip, cmd), "".join(sel), "".join(sol) + ENDC
             self.lock.release()

     def chmodscript(self):
         # just in case, as sometimes and on some operating systems the 
execution flags on the script are not always set
         self.execcmdonscript('chmod 0755 %s' % self.rspath)

     def delscript(self):
         self.execcmdonscript('rm -f %s' % self.rspath)

     def run(self):
         self.ssh_connect()
         so, se = ('', '')
         res = ''
         if self.conobj != None:
             if self.cmd == None:
                 res = self.sendscript()
                 self.setcmdtoscript()
                 self.chmodscript()
             if res == '':
                 so, se = self.execcmds()
             if opts.cleanup:
                 time.sleep(0.5)
                 self.delscript()
         self.lock.acquire()
         print OKBLUE + "%-20s" % self.ip + ENDC, ":",
         if self.conobj == None:
             print RED + "SSH connection failed" + ENDC
             self.confailed = True
         elif res != '':
             print RED + "Sending script failed: %s" % res + ENDC
             self.confailed = True
         else:
             self.confailed = False
             print OKBLUE + "SSH connection successful" + ENDC
             print so
             if se != '':
                 print MAGENTA + "command standard error output:" + ENDC
                 print se
             if opts.dir != None:
                 if not os.path.isdir(opts.dir):
                     os.mkdir(opts.dir)
                 path = opts.dir + os.sep + self.ip
                 if not os.path.isdir(path):
                     os.mkdir(path)
                 of = open(path + os.sep + 'stdout','w')
                 of.write(so)
                 of.close()
                 of = open(path + os.sep + 'stderr','w')
                 of.write(se)
                 of.close()
         self.lock.release()
         self.finished = True

     def sshclose(self):
         if self.conobj != None:
             self.conobj.close()


lock = threading.Lock()

queue = []
thfinished = []

def getparams(h):
     ip = h[0]
     try:
         username = h[1]
     except IndexError:
         username = opts.username

     try:
         passw = h[2]
     except IndexError:
         passw = opts.password

     port = None
     try:
         port = int(h[3])
     except IndexError:
         port = 22
     except ValueError, e:
         print RED + "%-20s" % ip, ": error converting port:", str(e) + ENDC
     return (ip, username, passw, port)

while len(hosts) > 0:
     if len(queue) <= opts.maxthreads:
         h = hosts.pop()
         (ip, username, passw, port) = getparams(h)
         if port != None:
             th = SSHThread(lock, opts.cmd, ip, username=username, 
sshprivkey=opts.key, passw=passw, port=port, script=opts.script, 
scriptdir=opts.scriptdir)
             queue.append((ip, th))
             th.daemon = True
             th.start()
         else:
             thfinished.append((ip,None))
     else:
         time.sleep(1)
     for ip, th in queue:
         if th.finished:
             th.sshclose()
             th.join()
             thfinished.append((ip,th))
             queue.remove((ip,th))


while len(queue) > 0:
     for ip, th in queue:
         if th.finished:
             th.sshclose()
             th.join()
             thfinished.append((ip,th))
             queue.remove((ip,th))
     time.sleep(1)

if not opts.quiet:
     print
     print OKBLUE + 'Communication SUCCEEDED for following IP addresses 
(SSH could open connection):' + ENDC

     for ip, th in thfinished:
         if th != None and not th.confailed:
             print ip

     print
     print OKBLUE + 'Communication FAILED for following IP addresses 
(SSH could not open connection / error in parameters):' + ENDC

     for ip, th in thfinished:
         if th == None or th.confailed:
             print ip








More information about the Python-list mailing list