[Python-checkins] cpython: Issue #18147: Add diagnostic functions to ssl.SSLContext().

christian.heimes python-checkins at python.org
Mon Jun 17 15:45:21 CEST 2013


http://hg.python.org/cpython/rev/38e759e4c9e6
changeset:   84185:38e759e4c9e6
parent:      84180:c484ca129288
user:        Christian Heimes <christian at cheimes.de>
date:        Mon Jun 17 15:44:12 2013 +0200
summary:
  Issue #18147: Add diagnostic functions to ssl.SSLContext().

get_ca_list() lists all loaded CA certificates and cert_store_stats() returns
amount of loaded X.509 certs, X.509 CA certs and CRLs.

files:
  Doc/library/ssl.rst  |   24 +++++
  Lib/test/test_ssl.py |   57 ++++++++++++
  Misc/NEWS            |    4 +
  Modules/_ssl.c       |  144 +++++++++++++++++++++++++++---
  4 files changed, 212 insertions(+), 17 deletions(-)


diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst
--- a/Doc/library/ssl.rst
+++ b/Doc/library/ssl.rst
@@ -791,6 +791,19 @@
 
 :class:`SSLContext` objects have the following methods and attributes:
 
+.. method:: SSLContext.cert_store_stats()
+
+   Get statistics about quantities of loaded X.509 certificates, count of
+   X.509 certificates flagged as CA certificates and certificate revocation
+   lists as dictionary.
+
+   Example for a context with one CA cert and one other cert::
+
+      >>> context.cert_store_stats()
+      {'crl': 0, 'x509_ca': 1, 'x509': 2}
+
+   .. versionadded:: 3.4
+
 .. method:: SSLContext.load_cert_chain(certfile, keyfile=None, password=None)
 
    Load a private key and the corresponding certificate.  The *certfile*
@@ -837,6 +850,17 @@
    following an `OpenSSL specific layout
    <http://www.openssl.org/docs/ssl/SSL_CTX_load_verify_locations.html>`_.
 
+.. method:: SSLContext.get_ca_certs(binary_form=False)
+
+   Get a list of loaded "certification authority" (CA) certificates. If the
+   ``binary_form`` parameter is :const:`False` each list
+   entry is a dict like the output of :meth:`SSLSocket.getpeercert`. Otherwise
+   the method returns a list of DER-encoded certificates. The returned list
+   does not contain certificates from *capath* unless a certificate was
+   requested and loaded by a SSL connection.
+
+   ..versionadded:: 3.4
+
 .. method:: SSLContext.set_default_verify_paths()
 
    Load a set of default "certification authority" (CA) certificates from
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -680,6 +680,47 @@
         gc.collect()
         self.assertIs(wr(), None)
 
+    def test_cert_store_stats(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        self.assertEqual(ctx.cert_store_stats(),
+            {'x509_ca': 0, 'crl': 0, 'x509': 0})
+        ctx.load_cert_chain(CERTFILE)
+        self.assertEqual(ctx.cert_store_stats(),
+            {'x509_ca': 0, 'crl': 0, 'x509': 0})
+        ctx.load_verify_locations(CERTFILE)
+        self.assertEqual(ctx.cert_store_stats(),
+            {'x509_ca': 0, 'crl': 0, 'x509': 1})
+        ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
+        self.assertEqual(ctx.cert_store_stats(),
+            {'x509_ca': 1, 'crl': 0, 'x509': 2})
+
+    def test_get_ca_certs(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        self.assertEqual(ctx.get_ca_certs(), [])
+        # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
+        ctx.load_verify_locations(CERTFILE)
+        self.assertEqual(ctx.get_ca_certs(), [])
+        # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
+        ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
+        self.assertEqual(ctx.get_ca_certs(),
+            [{'issuer': ((('organizationName', 'Root CA'),),
+                         (('organizationalUnitName', 'http://www.cacert.org'),),
+                         (('commonName', 'CA Cert Signing Authority'),),
+                         (('emailAddress', 'support at cacert.org'),)),
+              'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
+              'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
+              'serialNumber': '00',
+              'subject': ((('organizationName', 'Root CA'),),
+                          (('organizationalUnitName', 'http://www.cacert.org'),),
+                          (('commonName', 'CA Cert Signing Authority'),),
+                          (('emailAddress', 'support at cacert.org'),)),
+              'version': 3}])
+
+        with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
+            pem = f.read()
+        der = ssl.PEM_cert_to_DER_cert(pem)
+        self.assertEqual(ctx.get_ca_certs(True), [der])
+
 
 class SSLErrorTests(unittest.TestCase):
 
@@ -995,6 +1036,22 @@
             finally:
                 s.close()
 
+    def test_get_ca_certs_capath(self):
+        # capath certs are loaded on request
+        with support.transient_internet("svn.python.org"):
+            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+            ctx.verify_mode = ssl.CERT_REQUIRED
+            ctx.load_verify_locations(capath=CAPATH)
+            self.assertEqual(ctx.get_ca_certs(), [])
+            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
+            s.connect(("svn.python.org", 443))
+            try:
+                cert = s.getpeercert()
+                self.assertTrue(cert)
+            finally:
+                s.close()
+            self.assertEqual(len(ctx.get_ca_certs()), 1)
+
 
 try:
     import threading
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -123,6 +123,10 @@
 Library
 -------
 
+- Issue #18147: Add diagnostic functions to ssl.SSLContext(). get_ca_list()
+  lists all loaded CA certificates and cert_store_stats() returns amount of
+  loaded X.509 certs, X.509 CA certs and CRLs.
+
 - Issue #18076: Introduce importlib.util.decode_source().
 
 - importlib.abc.SourceLoader.get_source() no longer changes SyntaxError or
diff --git a/Modules/_ssl.c b/Modules/_ssl.c
--- a/Modules/_ssl.c
+++ b/Modules/_ssl.c
@@ -1023,6 +1023,24 @@
     return NULL;
 }
 
+static PyObject *
+_certificate_to_der(X509 *certificate)
+{
+    unsigned char *bytes_buf = NULL;
+    int len;
+    PyObject *retval;
+
+    bytes_buf = NULL;
+    len = i2d_X509(certificate, &bytes_buf);
+    if (len < 0) {
+        _setSSLError(NULL, 0, __FILE__, __LINE__);
+        return NULL;
+    }
+    /* this is actually an immutable bytes sequence */
+    retval = PyBytes_FromStringAndSize((const char *) bytes_buf, len);
+    OPENSSL_free(bytes_buf);
+    return retval;
+}
 
 static PyObject *
 PySSL_test_decode_certificate (PyObject *mod, PyObject *args) {
@@ -1068,8 +1086,6 @@
 static PyObject *
 PySSL_peercert(PySSLSocket *self, PyObject *args)
 {
-    PyObject *retval = NULL;
-    int len;
     int verification;
     int binary_mode = 0;
 
@@ -1081,21 +1097,7 @@
 
     if (binary_mode) {
         /* return cert in DER-encoded format */
-
-        unsigned char *bytes_buf = NULL;
-
-        bytes_buf = NULL;
-        len = i2d_X509(self->peer_cert, &bytes_buf);
-        if (len < 0) {
-            PySSL_SetError(self, len, __FILE__, __LINE__);
-            return NULL;
-        }
-        /* this is actually an immutable bytes sequence */
-        retval = PyBytes_FromStringAndSize
-          ((const char *) bytes_buf, len);
-        OPENSSL_free(bytes_buf);
-        return retval;
-
+        return _certificate_to_der(self->peer_cert);
     } else {
         verification = SSL_CTX_get_verify_mode(SSL_get_SSL_CTX(self->ssl));
         if ((verification & SSL_VERIFY_PEER) == 0)
@@ -2555,6 +2557,110 @@
 #endif
 }
 
+PyDoc_STRVAR(PySSL_get_stats_doc,
+"cert_store_stats() -> {'crl': int, 'x509_ca': int, 'x509': int}\n\
+\n\
+Returns quantities of loaded X.509 certificates. X.509 certificates with a\n\
+CA extension and certificate revocation lists inside the context's cert\n\
+store.\n\
+NOTE: Certificates in a capath directory aren't loaded unless they have\n\
+been used at least once.");
+
+static PyObject *
+cert_store_stats(PySSLContext *self)
+{
+    X509_STORE *store;
+    X509_OBJECT *obj;
+    int x509 = 0, crl = 0, pkey = 0, ca = 0, i;
+
+    store = SSL_CTX_get_cert_store(self->ctx);
+    for (i = 0; i < sk_X509_OBJECT_num(store->objs); i++) {
+        obj = sk_X509_OBJECT_value(store->objs, i);
+        switch (obj->type) {
+            case X509_LU_X509:
+                x509++;
+                if (X509_check_ca(obj->data.x509)) {
+                    ca++;
+                }
+                break;
+            case X509_LU_CRL:
+                crl++;
+                break;
+            case X509_LU_PKEY:
+                pkey++;
+                break;
+            default:
+                /* Ignore X509_LU_FAIL, X509_LU_RETRY, X509_LU_PKEY.
+                 * As far as I can tell they are internal states and never
+                 * stored in a cert store */
+                break;
+        }
+    }
+    return Py_BuildValue("{sisisi}", "x509", x509, "crl", crl,
+        "x509_ca", ca);
+}
+
+PyDoc_STRVAR(PySSL_get_ca_certs_doc,
+"get_ca_certs([der=False]) -> list of loaded certificate\n\
+\n\
+Returns a list of dicts with information of loaded CA certs. If the\n\
+optional argument is True, returns a DER-encoded copy of the CA certificate.\n\
+NOTE: Certificates in a capath directory aren't loaded unless they have\n\
+been used at least once.");
+
+static PyObject *
+get_ca_certs(PySSLContext *self, PyObject *args)
+{
+    X509_STORE *store;
+    PyObject *ci = NULL, *rlist = NULL;
+    int i;
+    int binary_mode = 0;
+
+    if (!PyArg_ParseTuple(args, "|p:get_ca_certs", &binary_mode)) {
+        return NULL;
+    }
+
+    if ((rlist = PyList_New(0)) == NULL) {
+        return NULL;
+    }
+
+    store = SSL_CTX_get_cert_store(self->ctx);
+    for (i = 0; i < sk_X509_OBJECT_num(store->objs); i++) {
+        X509_OBJECT *obj;
+        X509 *cert;
+
+        obj = sk_X509_OBJECT_value(store->objs, i);
+        if (obj->type != X509_LU_X509) {
+            /* not a x509 cert */
+            continue;
+        }
+        /* CA for any purpose */
+        cert = obj->data.x509;
+        if (!X509_check_ca(cert)) {
+            continue;
+        }
+        if (binary_mode) {
+            ci = _certificate_to_der(cert);
+        } else {
+            ci = _decode_certificate(cert);
+        }
+        if (ci == NULL) {
+            goto error;
+        }
+        if (PyList_Append(rlist, ci) == -1) {
+            goto error;
+        }
+        Py_CLEAR(ci);
+    }
+    return rlist;
+
+  error:
+    Py_XDECREF(ci);
+    Py_XDECREF(rlist);
+    return NULL;
+}
+
+
 static PyGetSetDef context_getsetlist[] = {
     {"options", (getter) get_options,
                 (setter) set_options, NULL},
@@ -2586,6 +2692,10 @@
 #endif
     {"set_servername_callback", (PyCFunction) set_servername_callback,
                     METH_VARARGS, PySSL_set_servername_callback_doc},
+    {"cert_store_stats", (PyCFunction) cert_store_stats,
+                    METH_NOARGS, PySSL_get_stats_doc},
+    {"get_ca_certs", (PyCFunction) get_ca_certs,
+                    METH_VARARGS, PySSL_get_ca_certs_doc},
     {NULL, NULL}        /* sentinel */
 };
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list