
Dirk Loss <lists@dirk-loss.de> writes:
Jean-Paul Calderone wrote: (...)
Another possible solution might be to do your verification using the SSL context object.
Could you elaborate on this? I think I am already using the SSL context object to do the verification:
Not sure if it helps, but here's some old code of mine where I experimented with the echo SSL examples to add symmetric certificate checking. Just checked and it seems ok with Python 2.5.1 and Twisted 2.5.0 (pyOpenSSL 0.6). It uses direct SSL context objects rather than the Twisted wrapper versions. To be honest, at the time it was because I was still feeling my way around the SSL support and found using the direct context easier, but I believe you do have full access to the certificate in the context's _verify method. Returning 0/False from _verify rather than just propagating ok can reject the handshake. (Note that _verify can be called multiple times during the sequence as well as in cases where ok is already 0 I believe). There are a bunch of debugging prints still in the code where I was seeing what sort of stuff was available to the context factory/verification. -- David echoserv_ssl.py: --------------- # Twisted, the Framework of Your Internet # Copyright (C) 2001 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA from OpenSSL import SSL, crypto class ServerContextFactory: def _verify(self, connection, x509, errnum, errdepth, ok): print '_verify (ok=%d):' % ok print ' subject:', x509.get_subject() print ' issuer:', x509.get_issuer() print ' errnum %s, errdepth %d' % (errnum, errdepth) return False # ok def getContext(self): """Create an SSL context. This is a sample implementation that loads a certificate from a file called 'server.pem'.""" ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.use_certificate_file('server.pem') ctx.use_privatekey_file('server.pem') print 'Context additions' ctx.load_client_ca('ca/all-cas.cert') ctx.load_verify_locations('ca/ca.cert') ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT, self._verify) print 'verify depth:', ctx.get_verify_depth() ctx.set_verify_depth(10) print 'verify depth:', ctx.get_verify_depth() return ctx import echoserv class MyProtocol(echoserv.Echo): def connectionMade(self): print 'connectionMade', self.transport.getPeerCertificate() return echoserv.Echo.connectionMade(self) def dataReceived(self, data): print 'dataReceived', self.transport.getPeerCertificate() return echoserv.Echo.dataReceived(self, data) if __name__ == '__main__': import echoserv, sys from twisted.internet.protocol import Factory from twisted.internet import ssl, reactor from twisted.python import log log.startLogging(sys.stdout) factory = Factory() factory.protocol = MyProtocol reactor.listenSSL(9000, factory, ServerContextFactory()) reactor.run() echoclient_ssl.py: ----------------- # Twisted, the Framework of Your Internet # Copyright (C) 2001 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA from OpenSSL import SSL import sys from twisted.internet.protocol import ClientFactory from twisted.protocols.basic import LineReceiver from twisted.internet import ssl, reactor import inspect class ClientContextFactory(ssl.ClientContextFactory): def _verify(self, connection, x509, errnum, errdepth, ok): print '_verify (ok=%d):' % ok print ' subject:', x509.get_subject() print ' issuer:', x509.get_issuer() print ' errnum %s, errdepth %d' % (errnum, errdepth) return ok def getContext(self): ctx = ssl.ClientContextFactory.getContext(self) ctx.use_certificate_file('client.pem') ctx.use_privatekey_file('client.pem') ctx.load_verify_locations('ca/ca.cert') ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT, self._verify) return ctx class EchoClient(LineReceiver): end="Bye-bye!" def connectionMade(self): self.sendLine("Hello, world!") self.sendLine("What a fine day it is.") self.sendLine(self.end) def connectionLost(self, reason): print 'connection lost (protocol)' def lineReceived(self, line): x509 = self.transport.getPeerCertificate() methods = [x for x in dir(x509) if callable(getattr(x509,x)) and not (x.startswith('set_') or x.startswith('add_') or x.startswith('gmtime_') or x in ('sign','digest'))] for m in methods: print m, getattr(x509,m)() print "receive:", line if line==self.end: self.transport.loseConnection() class EchoClientFactory(ClientFactory): protocol = EchoClient def clientConnectionFailed(self, connector, reason): print 'connection failed:', reason.getErrorMessage() reactor.stop() def clientConnectionLost(self, connector, reason): print 'connection lost:', reason.getErrorMessage() reactor.stop() def main(): if len(sys.argv) > 1: host = sys.argv[1] else: host = 'localhost' factory = EchoClientFactory() reactor.connectSSL(host, 9000, factory, ClientContextFactory()) reactor.run() if __name__ == '__main__': main()