[Python-checkins] cpython: Issue #12803: SSLContext.load_cert_chain() now accepts a password argument

antoine.pitrou python-checkins at python.org
Thu Aug 25 14:45:48 CEST 2011


http://hg.python.org/cpython/rev/cdc6c1b072a5
changeset:   72071:cdc6c1b072a5
parent:      72069:420f3f6d4f46
user:        Antoine Pitrou <solipsis at pitrou.net>
date:        Thu Aug 25 14:39:44 2011 +0200
summary:
  Issue #12803: SSLContext.load_cert_chain() now accepts a password argument
to be used if the private key is encrypted.  Patch by Adam Simpkins.

files:
  Doc/library/ssl.rst  |   18 +++-
  Lib/test/test_ssl.py |   57 ++++++++++
  Misc/ACKS            |    1 +
  Misc/NEWS            |    3 +
  Modules/_ssl.c       |  170 +++++++++++++++++++++++++++---
  5 files changed, 227 insertions(+), 22 deletions(-)


diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst
--- a/Doc/library/ssl.rst
+++ b/Doc/library/ssl.rst
@@ -553,7 +553,7 @@
 
 :class:`SSLContext` objects have the following methods and attributes:
 
-.. method:: SSLContext.load_cert_chain(certfile, keyfile=None)
+.. method:: SSLContext.load_cert_chain(certfile, keyfile=None, password=None)
 
    Load a private key and the corresponding certificate.  The *certfile*
    string must be the path to a single file in PEM format containing the
@@ -564,9 +564,25 @@
    :ref:`ssl-certificates` for more information on how the certificate
    is stored in the *certfile*.
 
+   The *password* argument may be a function to call to get the password for
+   decrypting the private key.  It will only be called if the private key is
+   encrypted and a password is necessary.  It will be called with no arguments,
+   and it should return a string, bytes, or bytearray.  If the return value is
+   a string it will be encoded as UTF-8 before using it to decrypt the key.
+   Alternatively a string, bytes, or bytearray value may be supplied directly
+   as the *password* argument.  It will be ignored if the private key is not
+   encrypted and no password is needed.
+
+   If the *password* argument is not specified and a password is required,
+   OpenSSL's built-in password prompting mechanism will be used to
+   interactively prompt the user for a password.
+
    An :class:`SSLError` is raised if the private key doesn't
    match with the certificate.
 
+   .. versionchanged:: 3.3
+      New optional argument *password*.
+
 .. method:: SSLContext.load_verify_locations(cafile=None, capath=None)
 
    Load a set of "certification authority" (CA) certificates used to validate
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -42,6 +42,9 @@
 ONLYKEY = data_file("ssl_key.pem")
 BYTES_ONLYCERT = os.fsencode(ONLYCERT)
 BYTES_ONLYKEY = os.fsencode(ONLYKEY)
+CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
+ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
+KEY_PASSWORD = "somepass"
 CAPATH = data_file("capath")
 BYTES_CAPATH = os.fsencode(CAPATH)
 
@@ -430,6 +433,60 @@
         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
         with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
             ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
+        # Password protected key and cert
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
+        ctx.load_cert_chain(CERTFILE_PROTECTED,
+                            password=bytearray(KEY_PASSWORD.encode()))
+        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
+        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
+        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
+                            bytearray(KEY_PASSWORD.encode()))
+        with self.assertRaisesRegex(TypeError, "should be a string"):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
+        with self.assertRaises(ssl.SSLError):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
+        with self.assertRaisesRegex(ValueError, "cannot be longer"):
+            # openssl has a fixed limit on the password buffer.
+            # PEM_BUFSIZE is generally set to 1kb.
+            # Return a string larger than this.
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
+        # Password callback
+        def getpass_unicode():
+            return KEY_PASSWORD
+        def getpass_bytes():
+            return KEY_PASSWORD.encode()
+        def getpass_bytearray():
+            return bytearray(KEY_PASSWORD.encode())
+        def getpass_badpass():
+            return "badpass"
+        def getpass_huge():
+            return b'a' * (1024 * 1024)
+        def getpass_bad_type():
+            return 9
+        def getpass_exception():
+            raise Exception('getpass error')
+        class GetPassCallable:
+            def __call__(self):
+                return KEY_PASSWORD
+            def getpass(self):
+                return KEY_PASSWORD
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
+        ctx.load_cert_chain(CERTFILE_PROTECTED,
+                            password=GetPassCallable().getpass)
+        with self.assertRaises(ssl.SSLError):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
+        with self.assertRaisesRegex(ValueError, "cannot be longer"):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
+        with self.assertRaisesRegex(TypeError, "must return a string"):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
+        with self.assertRaisesRegex(Exception, "getpass error"):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
+        # Make sure the password function isn't called if it isn't needed
+        ctx.load_cert_chain(CERTFILE, password=getpass_exception)
 
     def test_load_verify_locations(self):
         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
diff --git a/Misc/ACKS b/Misc/ACKS
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -876,6 +876,7 @@
 Paul Sijben
 Kirill Simonov
 Nathan Paul Simons
+Adam Simpkins
 Janne Sinkkonen
 George Sipe
 J. Sipprell
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -268,6 +268,9 @@
 Library
 -------
 
+- Issue #12803: SSLContext.load_cert_chain() now accepts a password argument
+  to be used if the private key is encrypted.  Patch by Adam Simpkins.
+
 - Issue #11657: Fix sending file descriptors over 255 over a multiprocessing
   Pipe.
 
diff --git a/Modules/_ssl.c b/Modules/_ssl.c
--- a/Modules/_ssl.c
+++ b/Modules/_ssl.c
@@ -18,16 +18,21 @@
 
 #ifdef WITH_THREAD
 #include "pythread.h"
+#define PySSL_BEGIN_ALLOW_THREADS_S(save) \
+    do { if (_ssl_locks_count>0) { (save) = PyEval_SaveThread(); } } while (0)
+#define PySSL_END_ALLOW_THREADS_S(save) \
+    do { if (_ssl_locks_count>0) { PyEval_RestoreThread(save); } } while (0)
 #define PySSL_BEGIN_ALLOW_THREADS { \
             PyThreadState *_save = NULL;  \
-            if (_ssl_locks_count>0) {_save = PyEval_SaveThread();}
-#define PySSL_BLOCK_THREADS     if (_ssl_locks_count>0){PyEval_RestoreThread(_save)};
-#define PySSL_UNBLOCK_THREADS   if (_ssl_locks_count>0){_save = PyEval_SaveThread()};
-#define PySSL_END_ALLOW_THREADS if (_ssl_locks_count>0){PyEval_RestoreThread(_save);} \
-         }
+            PySSL_BEGIN_ALLOW_THREADS_S(_save);
+#define PySSL_BLOCK_THREADS     PySSL_END_ALLOW_THREADS_S(_save);
+#define PySSL_UNBLOCK_THREADS   PySSL_BEGIN_ALLOW_THREADS_S(_save);
+#define PySSL_END_ALLOW_THREADS PySSL_END_ALLOW_THREADS_S(_save); }
 
 #else   /* no WITH_THREAD */
 
+#define PySSL_BEGIN_ALLOW_THREADS_S(save)
+#define PySSL_END_ALLOW_THREADS_S(save)
 #define PySSL_BEGIN_ALLOW_THREADS
 #define PySSL_BLOCK_THREADS
 #define PySSL_UNBLOCK_THREADS
@@ -1635,19 +1640,118 @@
     return 0;
 }
 
+typedef struct {
+    PyThreadState *thread_state;
+    PyObject *callable;
+    char *password;
+    Py_ssize_t size;
+    int error;
+} _PySSLPasswordInfo;
+
+static int
+_pwinfo_set(_PySSLPasswordInfo *pw_info, PyObject* password,
+            const char *bad_type_error)
+{
+    /* Set the password and size fields of a _PySSLPasswordInfo struct
+       from a unicode, bytes, or byte array object.
+       The password field will be dynamically allocated and must be freed
+       by the caller */
+    PyObject *password_bytes = NULL;
+    const char *data = NULL;
+    Py_ssize_t size;
+
+    if (PyUnicode_Check(password)) {
+        password_bytes = PyUnicode_AsEncodedString(password, NULL, NULL);
+        if (!password_bytes) {
+            goto error;
+        }
+        data = PyBytes_AS_STRING(password_bytes);
+        size = PyBytes_GET_SIZE(password_bytes);
+    } else if (PyBytes_Check(password)) {
+        data = PyBytes_AS_STRING(password);
+        size = PyBytes_GET_SIZE(password);
+    } else if (PyByteArray_Check(password)) {
+        data = PyByteArray_AS_STRING(password);
+        size = PyByteArray_GET_SIZE(password);
+    } else {
+        PyErr_SetString(PyExc_TypeError, bad_type_error);
+        goto error;
+    }
+
+    free(pw_info->password);
+    pw_info->password = malloc(size);
+    if (!pw_info->password) {
+        PyErr_SetString(PyExc_MemoryError,
+                        "unable to allocate password buffer");
+        goto error;
+    }
+    memcpy(pw_info->password, data, size);
+    pw_info->size = size;
+
+    Py_XDECREF(password_bytes);
+    return 1;
+
+error:
+    Py_XDECREF(password_bytes);
+    return 0;
+}
+
+static int
+_password_callback(char *buf, int size, int rwflag, void *userdata)
+{
+    _PySSLPasswordInfo *pw_info = (_PySSLPasswordInfo*) userdata;
+    PyObject *fn_ret = NULL;
+
+    PySSL_END_ALLOW_THREADS_S(pw_info->thread_state);
+
+    if (pw_info->callable) {
+        fn_ret = PyObject_CallFunctionObjArgs(pw_info->callable, NULL);
+        if (!fn_ret) {
+            /* TODO: It would be nice to move _ctypes_add_traceback() into the
+               core python API, so we could use it to add a frame here */
+            goto error;
+        }
+
+        if (!_pwinfo_set(pw_info, fn_ret,
+                         "password callback must return a string")) {
+            goto error;
+        }
+        Py_CLEAR(fn_ret);
+    }
+
+    if (pw_info->size > size) {
+        PyErr_Format(PyExc_ValueError,
+                     "password cannot be longer than %d bytes", size);
+        goto error;
+    }
+
+    PySSL_BEGIN_ALLOW_THREADS_S(pw_info->thread_state);
+    memcpy(buf, pw_info->password, pw_info->size);
+    return pw_info->size;
+
+error:
+    Py_XDECREF(fn_ret);
+    PySSL_BEGIN_ALLOW_THREADS_S(pw_info->thread_state);
+    pw_info->error = 1;
+    return -1;
+}
+
 static PyObject *
 load_cert_chain(PySSLContext *self, PyObject *args, PyObject *kwds)
 {
-    char *kwlist[] = {"certfile", "keyfile", NULL};
-    PyObject *certfile, *keyfile = NULL;
+    char *kwlist[] = {"certfile", "keyfile", "password", NULL};
+    PyObject *certfile, *keyfile = NULL, *password = NULL;
     PyObject *certfile_bytes = NULL, *keyfile_bytes = NULL;
+    pem_password_cb *orig_passwd_cb = self->ctx->default_passwd_callback;
+    void *orig_passwd_userdata = self->ctx->default_passwd_callback_userdata;
+    _PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 };
     int r;
 
     errno = 0;
     ERR_clear_error();
     if (!PyArg_ParseTupleAndKeywords(args, kwds,
-        "O|O:load_cert_chain", kwlist,
-        &certfile, &keyfile))
+        "O|OO:load_cert_chain", kwlist,
+        &certfile, &keyfile, &password))
         return NULL;
     if (keyfile == Py_None)
         keyfile = NULL;
@@ -1661,12 +1765,26 @@
                         "keyfile should be a valid filesystem path");
         goto error;
     }
-    PySSL_BEGIN_ALLOW_THREADS
+    if (password && password != Py_None) {
+        if (PyCallable_Check(password)) {
+            pw_info.callable = password;
+        } else if (!_pwinfo_set(&pw_info, password,
+                                "password should be a string or callable")) {
+            goto error;
+        }
+        SSL_CTX_set_default_passwd_cb(self->ctx, _password_callback);
+        SSL_CTX_set_default_passwd_cb_userdata(self->ctx, &pw_info);
+    }
+    PySSL_BEGIN_ALLOW_THREADS_S(pw_info.thread_state);
     r = SSL_CTX_use_certificate_chain_file(self->ctx,
         PyBytes_AS_STRING(certfile_bytes));
-    PySSL_END_ALLOW_THREADS
+    PySSL_END_ALLOW_THREADS_S(pw_info.thread_state);
     if (r != 1) {
-        if (errno != 0) {
+        if (pw_info.error) {
+            ERR_clear_error();
+            /* the password callback has already set the error information */
+        }
+        else if (errno != 0) {
             ERR_clear_error();
             PyErr_SetFromErrno(PyExc_IOError);
         }
@@ -1675,33 +1793,43 @@
         }
         goto error;
     }
-    PySSL_BEGIN_ALLOW_THREADS
+    PySSL_BEGIN_ALLOW_THREADS_S(pw_info.thread_state);
     r = SSL_CTX_use_PrivateKey_file(self->ctx,
         PyBytes_AS_STRING(keyfile ? keyfile_bytes : certfile_bytes),
         SSL_FILETYPE_PEM);
-    PySSL_END_ALLOW_THREADS
-    Py_XDECREF(keyfile_bytes);
-    Py_XDECREF(certfile_bytes);
+    PySSL_END_ALLOW_THREADS_S(pw_info.thread_state);
+    Py_CLEAR(keyfile_bytes);
+    Py_CLEAR(certfile_bytes);
     if (r != 1) {
-        if (errno != 0) {
+        if (pw_info.error) {
+            ERR_clear_error();
+            /* the password callback has already set the error information */
+        }
+        else if (errno != 0) {
             ERR_clear_error();
             PyErr_SetFromErrno(PyExc_IOError);
         }
         else {
             _setSSLError(NULL, 0, __FILE__, __LINE__);
         }
-        return NULL;
+        goto error;
     }
-    PySSL_BEGIN_ALLOW_THREADS
+    PySSL_BEGIN_ALLOW_THREADS_S(pw_info.thread_state);
     r = SSL_CTX_check_private_key(self->ctx);
-    PySSL_END_ALLOW_THREADS
+    PySSL_END_ALLOW_THREADS_S(pw_info.thread_state);
     if (r != 1) {
         _setSSLError(NULL, 0, __FILE__, __LINE__);
-        return NULL;
+        goto error;
     }
+    SSL_CTX_set_default_passwd_cb(self->ctx, orig_passwd_cb);
+    SSL_CTX_set_default_passwd_cb_userdata(self->ctx, orig_passwd_userdata);
+    free(pw_info.password);
     Py_RETURN_NONE;
 
 error:
+    SSL_CTX_set_default_passwd_cb(self->ctx, orig_passwd_cb);
+    SSL_CTX_set_default_passwd_cb_userdata(self->ctx, orig_passwd_userdata);
+    free(pw_info.password);
     Py_XDECREF(keyfile_bytes);
     Py_XDECREF(certfile_bytes);
     return NULL;

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list