Threads shutting down in Py 2.7 but not in Py 3.69(or 3.x)
Muralidhar N
muralidhar.n at lge.com
Wed Jun 9 13:25:35 EDT 2021
Dear All,
Threads shutting down in Py 2.7 but not in Py 3.69(or 3.x) while making SSH connection using Paramiko/PySSH module or Socket
Executing code qa-test-execute.py in Py 2.7 (Ubuntu 14.04.6 LTS)
Command 1 :
sudo python ./qa-test-execute.py
Output 1 :
2021-05-24 23:35:59,889[BaseCommandLine __init__ 139916740505872][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
Exception AssertionError: 'attempt to release recursive lock not owned by thread' in <bound method BaseCommandLine1.__del__ of <__main__.BaseCommandLine1 object at 0x7f40e79db110>> ignored
Command 2 :
sudo python ./qa-test-execute.py 2>&1 | tee 24052021_py27_execute_1.log
Output 2 :
2021-05-24 23:50:16,303[BaseCommandLine __init__ 139863250567440][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
Exception AssertionError: 'attempt to release recursive lock not owned by thread' in <bound method BaseCommandLine1.__del__ of <__main__.BaseCommandLine1 object at 0x7f34735e0110>> ignored
Executing code qa-test-execute.py in Py 3.69 ( or 3.x - Ubuntu 18.04.5 LTS)
Command 1 :
sudo python ./qa-test-execute-xxx.py
Output 1 :
2021-05-24 23:53:49,293[BaseCommandLine __init__ 139973197423840][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
2021-05-24 23:53:49,293[BaseCommandLine __init__ 139973197423840][DEBUG]: Closing internal ssh-client.
Fatal Python error: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
Command 2 :
sudo python3 ./qa-test-execute-xxx.py 2>&1 | tee test.log
Output 2 : Terminal hangs & CTRL C to return prompt
2021-05-24 23:56:31,646[BaseCommandLine __init__ 140516619855072][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
2021-05-24 23:56:31,646[BaseCommandLine __init__ 140516619855072][DEBUG]: Closing internal ssh-client.
^C
Behaviour of same code is different when executed in Py 2.7 & Py 3.69 ( or 3.x ).
Threads not terminating normally conflicting with paramiko or pyssh or socket & logging module
-------------- next part --------------
#!/usr/bin/env python3
import sys
import traceback
import logging
import logging.config
import time
import threading
import multiprocessing
import paramiko
########################################################################
def lock_acquire_with_timeout1(lock, timeout_seconds):
"""
Try to acquire lock without specified timeout
@param lock: threading lock to be acquired
@type lock: threading.Lock
@param timeout_seconds: maximal time to make lock acquire attempts
@type timeout_seconds: float
"""
begin_time = time.time()
while time.time() - begin_time < timeout_seconds:
if lambda: lock.acquire(False):
return True
else:
time.sleep(1.0)
return None
def call_with_timeout1(method_to_call, timeout_seconds):
"""
Good for potentially "freeze" methods calls. Executes passed method in
separate thread. Waits for control returns within timeout. If timeout
exceed - return control to callee thread. Separate execution thread will
still be active.
@param method_to_call: method te be called
@type method_to_call: function
@param timeout_seconds: maximal time to wait for method call finished
@type timeout_seconds: float
"""
stop_thread = threading.Barrier(2)
thread_name = threading._newname("{}-%d".format(__name__))
call_thread = threading.Thread(target=method_to_call, name=thread_name)
call_thread.daemon = True
call_thread.start()
print ("threading.activeCount() : %s",threading.activeCount())
print ("threading.currentThread() : %s", threading.currentThread())
print ("threading.enumerate() : %s", threading.enumerate() )
call_thread.join(timeout=timeout_seconds)
if call_thread.is_alive():
stop_thread.abort()
return not call_thread.is_alive()
def format_all_threads_stacks1():
"""
@return: formatted stacks for all running threads.
"""
stacktraces = []
for thread_id, stack in list(dict(list(sys._current_frames().items())).items()):
for thread1 in threading.enumerate():
if thread1.ident == thread_id:
stacktraces.append('Thread %s (daemon=%r) stacktrace: \n%s' %
(thread1.name, thread1.daemon, ''.join(traceback.format_stack(stack))))
else:
thread = None
return '\n'.join(stacktraces)
class SSHClient_noauth1(paramiko.SSHClient):
def _auth(self, username, *args):
self._transport.auth_none(username)
return
class BaseCommandLine1(object):
def __init__(self, connection_timeout=None):
self._connection_timeout = connection_timeout
self._connection_lock = multiprocessing.RLock()
self._ssh_client = None
self.logger = logging.getLogger('BaseCommandLine __init__ {}'.format(id(self)))
self.reset_connection1()
def __del__(self):
self.close1()
def _wait_for_connection1(self):
begin_time = time.time()
self.logger.debug("Will attempt to connect with device for %s seconds." % self._connection_timeout)
while time.time() - begin_time < self._connection_timeout:
try:
self._connect_ssh1()
self._ssh_client.get_transport().set_keepalive(5)
self.logger.debug('BaseCommandLine1 new SSH connection with {}:{} established.'.
format("10.221.42.29", "22"))
break
except Exception as e:
self.logger.debug('BaseCommandLine1 SSH-connection failed after %d seconds passed with exception: %s' %
(time.time() - begin_time, e))
self.logger.debug('BaseCommandLine1 Next attempt after {} seconds.'.format(5.0))
time.sleep(5.0)
else:
self.logger.debug('BaseCommandLine1 Failed to connect to {}:{} within {} seconds: {}'.format(
"10.221.42.29","22",self._connection_timeout,traceback.format_exc()))
def reset_connection1(self):
with self._connection_lock:
self.close1()
self.logger.debug('reset connection begin.')
self._wait_for_connection1()
def close1(self):
if not self._ssh_client:
return
close_timeout = 10.0
begin_time = time.time()
self.logger.debug("Attempt to close ssh-session within %s seconds." % close_timeout)
if lock_acquire_with_timeout1(self._connection_lock, close_timeout):
try:
if call_with_timeout1(self.__nonblocking_close1, timeout_seconds=close_timeout):
self.logger.debug("Successfully closed SSHClient within %d seconds." % (time.time() - begin_time))
else:
self.logger.warning("Failed to close SSHClient within %d seconds." % close_timeout)
self.logger.warning("All threads state:\n%s" % format_all_threads_stacks1())
finally:
self._connection_lock.release()
else:
self.logger.warning("Failed to acquire lock within %s timeout." % close_timeout)
self._ssh_client = None
def __nonblocking_close1(self):
"""
Non-blocking call to close SSH connection. May freeze on internal SSHClient.close().
"""
self.logger.debug("Closing internal ssh-client.")
if self._ssh_client:
self._ssh_client.close1()
self.logger.debug("Internal ssh-client closed.")
self._ssh_client = None
def _connect_ssh1(self, key_filename=None):
client = SSHClient_noauth1()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# try using specified password first
self.logger.debug("BaseCommandLine1 _connect_ssh1 - try using specified password first")
client.connect("10.221.42.29", port="22", username="root", password="", timeout=60.0,
banner_timeout=60.0, key_filename=None, allow_agent=False)
except paramiko.AuthenticationException as e:
# in case of authentication error try to connect without password
self.logger.debug("BaseCommandLine1_connect_ssh1 - in case of authentication error try to connect without password : %s" % str(e))
self._ssh_client = client
return client
def config_logging():
formatter = logging.Formatter("%(asctime)s[%(name)s]"
"[%(levelname)s]: %(message)s")
console_logger = logging.StreamHandler(sys.stdout)
console_logger.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(logging.NullHandler())
paramiko_logger = logging.getLogger('paramiko')
paramiko_logger.setLevel(logging.CRITICAL)
paramiko_logger.addHandler(console_logger)
console_logger.setLevel(logging.DEBUG)
root_logger.addHandler(console_logger)
def main(argv):
config_logging()
try:
result = BaseCommandLine1(300)
except Exception:
logging.debug("Exception from BaseCommandLine1")
logging.debug("Show all threads status before exit attempt:\n %s", format_all_threads_stacks1())
if __name__ == "__main__":
main(sys.argv)
-------------- next part --------------
#!/usr/bin/env python3
import sys
import traceback
import logging
import logging.config
import time
import threading
import multiprocessing
import socket
########################################################################
def lock_acquire_with_timeout1(lock, timeout_seconds):
"""
Try to acquire lock without specified timeout
@param lock: threading lock to be acquired
@type lock: threading.Lock
@param timeout_seconds: maximal time to make lock acquire attempts
@type timeout_seconds: float
"""
begin_time = time.time()
while time.time() - begin_time < timeout_seconds:
if lambda: lock.acquire(False):
return True
else:
time.sleep(1.0)
return None
def call_with_timeout1(method_to_call, timeout_seconds):
"""
Good for potentially "freeze" methods calls. Executes passed method in
separate thread. Waits for control returns within timeout. If timeout
exceed - return control to callee thread. Separate execution thread will
still be active.
@param method_to_call: method te be called
@type method_to_call: function
@param timeout_seconds: maximal time to wait for method call finished
@type timeout_seconds: float
"""
stop_thread = threading.Barrier(2)
thread_name = threading._newname("{}-%d".format(__name__))
call_thread = threading.Thread(target=method_to_call, name=thread_name)
call_thread.daemon = True
call_thread.start()
print ("threading.activeCount() : %s",threading.activeCount())
print ("threading.currentThread() : %s", threading.currentThread())
print ("threading.enumerate() : %s", threading.enumerate() )
call_thread.join(timeout=timeout_seconds)
if call_thread.is_alive():
stop_thread.abort()
return not call_thread.is_alive()
#return True
def format_all_threads_stacks1():
"""
@return: formatted stacks for all running threads.
"""
stacktraces = []
for thread_id, stack in list(dict(list(sys._current_frames().items())).items()):
for thread1 in threading.enumerate():
if thread1.ident == thread_id:
stacktraces.append('Thread %s (daemon=%r) stacktrace: \n%s' %
(thread1.name, thread1.daemon, ''.join(traceback.format_stack(stack))))
else:
thread = None
return '\n'.join(stacktraces)
class BaseCommandLine1(object):
def __init__(self, connection_timeout=None):
self._connection_timeout = connection_timeout
self._connection_lock = multiprocessing.RLock()
self._ssh_client = None
self.logger = logging.getLogger('BaseCommandLine __init__ {}'.format(id(self)))
self.reset_connection1()
def __del__(self):
self.close1()
def _wait_for_connection1(self):
begin_time = time.time()
self.logger.debug("Will attempt to connect with device for %s seconds." % self._connection_timeout)
while time.time() - begin_time < self._connection_timeout:
try:
self._connect_ssh1()
self.logger.debug('BaseCommandLine1 new SSH connection with {}:{} established.'.
format("10.221.42.29", "22"))
break
except Exception as e:
self.logger.debug('BaseCommandLine1 SSH-connection failed after %d seconds passed with exception: %s' %
(time.time() - begin_time, e))
self.logger.debug('BaseCommandLine1 Next attempt after {} seconds.'.format(5.0))
time.sleep(5.0)
else:
self.logger.debug('BaseCommandLine1 Failed to connect to {}:{} within {} seconds: {}'.format(
"10.221.42.29","22",self._connection_timeout,traceback.format_exc()))
def reset_connection1(self):
with self._connection_lock:
self.close1()
self.logger.debug('reset connection begin.')
self._wait_for_connection1()
def close1(self):
if not self._ssh_client:
return
close_timeout = 10.0
begin_time = time.time()
self.logger.debug("Attempt to close ssh-session within %s seconds." % close_timeout)
if lock_acquire_with_timeout1(self._connection_lock, close_timeout):
try:
if call_with_timeout1(self.__nonblocking_close1, timeout_seconds=close_timeout):
#if self.__nonblocking_close1():
self.logger.debug("Successfully closed SSHClient within %d seconds." % (time.time() - begin_time))
else:
self.logger.warning("Failed to close SSHClient within %d seconds." % close_timeout)
self.logger.warning("All threads state:\n%s" % format_all_threads_stacks1())
finally:
#self._connection_lock.release()
self.logger.debug('self._connection_lock release')
else:
self.logger.warning("Failed to acquire lock within %s timeout." % close_timeout)
self._ssh_client = None
def __nonblocking_close1(self):
"""
Non-blocking call to close SSH connection. May freeze on internal SSHClient.close().
"""
result = False
self.logger.debug("Closing internal ssh-client.")
if self._ssh_client:
socket_close_response = self._ssh_client.close()
self.logger.debug('self._ssh_client.close1 : %s' % socket_close_response)
result = True
self.logger.debug("Internal ssh-client closed.")
self._ssh_client = None
return result
def _connect_ssh1(self, key_filename=None):
s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print("Socket Name : " , s1)
# get local machine name
host = '10.221.42.29'
port = 22
# connection to hostname on the port.
try:
s1.connect((host, port))
print("Connection Established", s1)
except Exception as e:
print("Connection error :", str(e))
self._ssh_client = s1
return s1
def config_logging():
formatter = logging.Formatter("%(asctime)s[%(name)s]"
"[%(levelname)s]: %(message)s")
console_logger = logging.StreamHandler(sys.stdout)
console_logger.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(logging.NullHandler())
paramiko_logger = logging.getLogger('paramiko')
paramiko_logger.setLevel(logging.CRITICAL)
paramiko_logger.addHandler(console_logger)
console_logger.setLevel(logging.DEBUG)
root_logger.addHandler(console_logger)
def main(argv):
config_logging()
try:
result = BaseCommandLine1(300)
except Exception:
logging.debug("Exception from BaseCommandLine1")
logging.debug("Show all threads status before exit attempt:\n %s", format_all_threads_stacks1())
if __name__ == "__main__":
main(sys.argv)
-------------- next part --------------
#!/usr/bin/env python3
import sys
import traceback
import logging
import logging.config
import time
import threading
import multiprocessing
import pyssh
########################################################################
def lock_acquire_with_timeout1(lock, timeout_seconds):
"""
Try to acquire lock without specified timeout
@param lock: threading lock to be acquired
@type lock: threading.Lock
@param timeout_seconds: maximal time to make lock acquire attempts
@type timeout_seconds: float
"""
begin_time = time.time()
while time.time() - begin_time < timeout_seconds:
if lambda: lock.acquire(False):
return True
else:
time.sleep(1.0)
return None
def call_with_timeout1(method_to_call, timeout_seconds):
"""
Good for potentially "freeze" methods calls. Executes passed method in
separate thread. Waits for control returns within timeout. If timeout
exceed - return control to callee thread. Separate execution thread will
still be active.
@param method_to_call: method te be called
@type method_to_call: function
@param timeout_seconds: maximal time to wait for method call finished
@type timeout_seconds: float
"""
stop_thread = threading.Barrier(2)
thread_name = threading._newname("{}-%d".format(__name__))
call_thread = threading.Thread(target=method_to_call, name=thread_name)
call_thread.daemon = True
call_thread.start()
print ("threading.activeCount() : %s",threading.activeCount())
print ("threading.currentThread() : %s", threading.currentThread())
print ("threading.enumerate() : %s", threading.enumerate() )
call_thread.join(timeout=timeout_seconds)
if call_thread.is_alive():
stop_thread.abort()
return not call_thread.is_alive()
def format_all_threads_stacks1():
"""
@return: formatted stacks for all running threads.
"""
stacktraces = []
for thread_id, stack in list(dict(list(sys._current_frames().items())).items()):
for thread1 in threading.enumerate():
if thread1.ident == thread_id:
stacktraces.append('Thread %s (daemon=%r) stacktrace: \n%s' %
(thread1.name, thread1.daemon, ''.join(traceback.format_stack(stack))))
else:
thread = None
return '\n'.join(stacktraces)
class BaseCommandLine1(object):
def __init__(self, connection_timeout=None):
self._connection_timeout = connection_timeout
self._connection_lock = multiprocessing.RLock()
self._ssh_client = None
self.logger = logging.getLogger('BaseCommandLine __init__ {}'.format(id(self)))
self.reset_connection1()
def __del__(self):
self.close1()
def _wait_for_connection1(self):
begin_time = time.time()
self.logger.debug("Will attempt to connect with device for %s seconds." % self._connection_timeout)
while time.time() - begin_time < self._connection_timeout:
try:
self._connect_ssh1()
self.logger.debug('BaseCommandLine1 new SSH connection with {}:{} established.'.
format("10.221.42.29", "22"))
break
except Exception as e:
self.logger.debug('BaseCommandLine1 SSH-connection failed after %d seconds passed with exception: %s' %
(time.time() - begin_time, e))
self.logger.debug('BaseCommandLine1 Next attempt after {} seconds.'.format(5.0))
time.sleep(5.0)
else:
self.logger.debug('BaseCommandLine1 Failed to connect to {}:{} within {} seconds: {}'.format(
"10.221.42.29","22",self._connection_timeout,traceback.format_exc()))
def reset_connection1(self):
with self._connection_lock:
self.close1()
self.logger.debug('reset connection begin.')
self._wait_for_connection1()
def close1(self):
if not self._ssh_client:
return
close_timeout = 10.0
begin_time = time.time()
self.logger.debug("Attempt to close ssh-session within %s seconds." % close_timeout)
if lock_acquire_with_timeout1(self._connection_lock, close_timeout):
try:
if call_with_timeout1(self.__nonblocking_close1, timeout_seconds=close_timeout):
self.logger.debug("Successfully closed SSHClient within %d seconds." % (time.time() - begin_time))
else:
self.logger.warning("Failed to close SSHClient within %d seconds." % close_timeout)
self.logger.warning("All threads state:\n%s" % format_all_threads_stacks1())
finally:
self._connection_lock.release()
else:
self.logger.warning("Failed to acquire lock within %s timeout." % close_timeout)
self._ssh_client = None
def __nonblocking_close1(self):
"""
Non-blocking call to close SSH connection. May freeze on internal SSHClient.close().
"""
self.logger.debug("Closing internal ssh-client.")
if self._ssh_client:
self._ssh_client.close1()
self.logger.debug("Internal ssh-client closed.")
self._ssh_client = None
def _connect_ssh1(self, key_filename=None):
try:
s = pyssh.new_session(hostname="10.221.42.29", port="22", username="root", password="")
r = s.execute("uname -a")
self.logger.debug("pyssh data : %s" % r.as_bytes())
except Exception as e:
self.logger.debug("Connection Error : %s" % str(e))
self._ssh_client = s
return s
def config_logging():
formatter = logging.Formatter("%(asctime)s[%(name)s]"
"[%(levelname)s]: %(message)s")
console_logger = logging.StreamHandler(sys.stdout)
console_logger.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(logging.NullHandler())
paramiko_logger = logging.getLogger('paramiko')
paramiko_logger.setLevel(logging.CRITICAL)
paramiko_logger.addHandler(console_logger)
console_logger.setLevel(logging.DEBUG)
root_logger.addHandler(console_logger)
def main(argv):
config_logging()
try:
result = BaseCommandLine1(300)
except Exception:
logging.debug("Exception from BaseCommandLine1")
logging.debug("Show all threads status before exit attempt:\n %s", format_all_threads_stacks1())
if __name__ == "__main__":
main(sys.argv)
More information about the Python-list
mailing list