Sophie

Sophie

distrib > CentOS > 5 > x86_64 > by-pkgid > c7d5af816329c171c658e5facf128c02 > files > 405

m2crypto-0.16-9.el5.x86_64.rpm

#!/usr/bin/python

"""Unit tests for M2Crypto.SSL.

Copyright (c) 2000-2004 Ng Pheng Siong. All rights reserved."""

"""
TODO

Server tests:
- ???

Others:
- ssl_dispatcher
- SSLServer
- ForkingSSLServer
- ThreadingSSLServer
"""

import os, socket, string, sys, tempfile, thread, time, unittest
from M2Crypto import Rand, SSL, m2

srv_host = 'localhost'
srv_port = 64000

def verify_cb_new_function(ok, store):
    try:
        assert not ok
        err = store.get_error()
        assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \
               err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \
               err == m2.X509_V_ERR_CERT_UNTRUSTED or \
               err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
        app_data = m2.x509_store_ctx_get_app_data(store.ctx)
        assert app_data
    except AssertionError, e:
        # If we let exceptions propagate from here the
        # caller may see strange errors. This is cleaner.
        return 0   
    return 1

class VerifyCB:
    def __call__(self, ok, store):
        return verify_cb_new_function(ok, store)


class SSLClientTestCase(unittest.TestCase):

    def start_server(self, args):
        pid = os.fork()
        if pid == 0:
            os.execvp('openssl', args)
        else:
            time.sleep(0.5)
            return pid

    def stop_server(self, pid):
        os.kill(pid, 1)
        os.waitpid(pid, 0)

    def http_get(self, s):
        s.send('GET / HTTP/1.0\n\n') 
        resp = ''
        while 1:
            try:
                r = s.recv(4096)
                if not r:
                    break
            except SSL.SSLError: # s_server throws an 'unexpected eof'...
                break
            resp = resp + r 
        return resp

    def setUp(self):
        self.srv_host = srv_host
        self.srv_port = srv_port
        self.srv_addr = (srv_host, srv_port)
        self.srv_url = 'https://%s:%s/' % (srv_host, srv_port)
        self.args = ['s_server', '-quiet', '-www',
                     #'-cert', 'server.pem', Implicitly using this
                     '-accept', str(self.srv_port)]

    def tearDown(self):
        global srv_port
        srv_port = srv_port - 1

    def test_no_connection(self):
        ctx = SSL.Context()
        s = SSL.Connection(ctx)

    def test_server_simple(self):
        pid = self.start_server(self.args)
        try:
            self.assertRaises(ValueError, SSL.Context, 'tlsv5')
            ctx = SSL.Context()
            s = SSL.Connection(ctx)
            s.connect(self.srv_addr)
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_tls1_nok(self):
        self.args.append('-no_tls1')
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context('tlsv1')
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                self.failUnlessEqual(e[0], 'wrong version number')
            s.close()
        finally:
            self.stop_server(pid)

    def test_tls1_ok(self):
        self.args.append('-tls1')
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context('tlsv1')
            s = SSL.Connection(ctx)
            s.connect(self.srv_addr)
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_sslv23_no_v2(self):
        self.args.append('-no_tls1')
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context('sslv23')
            s = SSL.Connection(ctx)
            s.connect(self.srv_addr)
            self.failUnlessEqual(s.get_version(), 'SSLv3')
            s.close()
        finally:
            self.stop_server(pid)

    def test_sslv23_no_v2_no_service(self):
        self.args = self.args + ['-no_tls1', '-no_ssl3']
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context('sslv23')
            s = SSL.Connection(ctx)
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
            s.close()
        finally:
            self.stop_server(pid)

    def test_sslv23_weak_crypto(self):
        self.args = self.args + ['-no_tls1', '-no_ssl3']
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context('sslv23', weak_crypto=1)
            s = SSL.Connection(ctx)
            s.connect(self.srv_addr)
            self.failUnlessEqual(s.get_version(), 'SSLv2')
            s.close()
        finally:
            self.stop_server(pid)

    def test_cipher_mismatch(self):
        self.args = self.args + ['-cipher', 'EXP-RC4-MD5']
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            s = SSL.Connection(ctx)
            s.set_cipher_list('EXP-RC2-CBC-MD5')
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')
            s.close()
        finally:
            self.stop_server(pid)
        
    def test_no_such_cipher(self):
        self.args = self.args + ['-cipher', 'EXP-RC4-MD5']
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            s = SSL.Connection(ctx)
            s.set_cipher_list('EXP-RC2-MD5')
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                self.failUnlessEqual(e[0], 'no ciphers available')
            s.close()
        finally:
            self.stop_server(pid)
        
    def test_no_weak_cipher(self):
        self.args = self.args + ['-cipher', 'EXP']
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')
            s.close()
        finally:
            self.stop_server(pid)
        
    def test_use_weak_cipher(self):
        self.args = self.args + ['-cipher', 'EXP']
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context(weak_crypto=1)
            s = SSL.Connection(ctx)
            s.connect(self.srv_addr)
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
        
    def test_cipher_ok(self):
        self.args = self.args + ['-cipher', 'EXP-RC4-MD5']
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            s = SSL.Connection(ctx)
            s.set_cipher_list('EXP-RC4-MD5')
            s.connect(self.srv_addr)
            data = self.http_get(s)
            
            cipher_stack = s.get_ciphers()
            assert cipher_stack[0].name() == 'EXP-RC4-MD5', cipher_stack[0].name()
            self.assertRaises(IndexError, cipher_stack.__getitem__, 2)
            # For some reason there are 2 entries in the stack
            #assert len(cipher_stack) == 1, len(cipher_stack)
            assert s.get_cipher_list() == 'EXP-RC4-MD5', s.get_cipher_list()
            
            # Test Cipher_Stack iterator
            i = 0
            for cipher in cipher_stack:
                i += 1
                assert cipher.name() == 'EXP-RC4-MD5', '"%s"' % cipher.name()
            # For some reason there are 2 entries in the stack
            #assert i == 1, i
            
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
        
    def verify_cb_new(self, ok, store):
        return verify_cb_new_function(ok, store)

    def test_verify_cb_new(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
                           self.verify_cb_new)
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                assert 0, e
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_verify_cb_new_class(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
                           VerifyCB())
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                assert 0, e
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_verify_cb_new_function(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
                           verify_cb_new_function)
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                assert 0, e
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_verify_cb_lambda(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
                           lambda ok, store: 1)
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                assert 0, e
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def verify_cb_exception(self, ok, store):
        raise Exception, 'We should fail verification'

    def test_verify_cb_exception(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
                           self.verify_cb_exception)
            s = SSL.Connection(ctx)
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
            s.close()
        finally:
            self.stop_server(pid)

    def test_verify_cb_not_callable(self):
        ctx = SSL.Context()
        self.assertRaises(TypeError,
                          ctx.set_verify,
                          SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
                          9,
                          1)

    def test_verify_cb_wrong_callable(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
                           lambda _: '')
            s = SSL.Connection(ctx)
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
            s.close()
        finally:
            self.stop_server(pid)

    def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):
        try:
            from M2Crypto import X509
            assert not ok
            assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \
                   err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \
                   err == m2.X509_V_ERR_CERT_UNTRUSTED or \
                   err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
            assert m2.ssl_ctx_get_cert_store(ctx_ptr)
            assert X509.X509(x509_ptr).as_pem()
        except AssertionError:
            # If we let exceptions propagate from here the
            # caller may see strange errors. This is cleaner.
            return 0
        return 1

    def test_verify_cb_old(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
                           self.verify_cb_old)
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                assert 0, e
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_verify_allow_unknown_old(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
                           SSL.cb.ssl_verify_callback)
            ctx.set_allow_unknown_ca(1)
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                assert 0, e
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_verify_allow_unknown_new(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
                           SSL.cb.ssl_verify_callback_allow_unknown_ca)
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                assert 0, e
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_verify_cert(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
            ctx.load_verify_locations('ca.pem')
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                assert 0, e
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_verify_cert_fail(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
            ctx.load_verify_locations('server.pem')
            s = SSL.Connection(ctx)
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
            s.close()
        finally:
            self.stop_server(pid)

    def test_verify_cert_mutual_auth(self):
        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])        
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
            ctx.load_verify_locations('ca.pem')
            ctx.load_cert('x509.pem')
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                assert 0, e
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_verify_cert_mutual_auth_servernbio(self):
        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
            ctx.load_verify_locations('ca.pem')
            ctx.load_cert('x509.pem')
            s = SSL.Connection(ctx)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                assert 0, e
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_verify_cert_mutual_auth_fail(self):
        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])        
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
            ctx.load_verify_locations('ca.pem')
            s = SSL.Connection(ctx)
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
            s.close()
        finally:
            self.stop_server(pid)

    def test_verify_nocert_fail(self):
        self.args.extend(['-nocert'])        
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
            ctx.load_verify_locations('ca.pem')
            s = SSL.Connection(ctx)
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
            s.close()
        finally:
            self.stop_server(pid)

    def test_HTTPSConnection(self):
        pid = self.start_server(self.args)
        try:
            from M2Crypto import httpslib
            c = httpslib.HTTPSConnection(srv_host, srv_port)
            c.request('GET', '/')
            data = c.getresponse().read()
            c.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_HTTPS(self):
        pid = self.start_server(self.args)
        try:
            from M2Crypto import httpslib
            c = httpslib.HTTPS(srv_host, srv_port)
            c.putrequest('GET', '/')
            c.putheader('Accept', 'text/html')
            c.putheader('Accept', 'text/plain')
            c.endheaders()
            err, msg, headers = c.getreply()
            assert err == 200, err
            f = c.getfile()
            data = f.read()
            c.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_urllib(self):
        pid = self.start_server(self.args)
        try:
            from M2Crypto import m2urllib
            url = m2urllib.FancyURLopener()
            url.addheader('Connection', 'close')
            u = url.open('https://%s:%s/' % (srv_host, srv_port))
            data = u.read()
            u.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    # XXX Don't actually know how to use m2urllib safely!
    #def test_urllib_safe_context(self):
    #def test_urllib_safe_context_fail(self):

    def test_urllib2(self):
        pid = self.start_server(self.args)
        try:
            from M2Crypto import m2urllib2
            opener = m2urllib2.build_opener()
            opener.addheaders = [('Connection', 'close')]
            u = opener.open('https://%s:%s/' % (srv_host, srv_port))
            data = u.read()
            u.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_urllib2_secure_context(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
            ctx.load_verify_locations('ca.pem')
            
            from M2Crypto import m2urllib2
            opener = m2urllib2.build_opener(ctx)
            opener.addheaders = [('Connection', 'close')]           
            u = opener.open('https://%s:%s/' % (srv_host, srv_port))
            data = u.read()
            u.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_urllib2_secure_context_fail(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
            ctx.load_verify_locations('server.pem')
            
            from M2Crypto import m2urllib2
            opener = m2urllib2.build_opener(ctx)
            opener.addheaders = [('Connection', 'close')]
            self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, srv_port))
        finally:
            self.stop_server(pid)

    def test_blocking0(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            s = SSL.Connection(ctx)
            s.setblocking(0)
            self.assertRaises(Exception, s.connect, self.srv_addr)
            s.close()
        finally:
            self.stop_server(pid)

    def test_blocking1(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            s = SSL.Connection(ctx)
            s.setblocking(1)
            try:
                s.connect(self.srv_addr)
            except SSL.SSLError, e:
                assert 0, e
            data = self.http_get(s)
            s.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_timeout(self):
        pid = self.start_server(self.args)
        try:
            ctx = SSL.Context()
            s = SSL.Connection(ctx)
            # Just a really small number so we can timeout
            s.settimeout(0.000000000000000000000000000001)
            self.assertRaises(SSL.SSLTimeoutError, s.connect, self.srv_addr)
            s.close()
        finally:
            self.stop_server(pid)

    def test_makefile_timeout(self):
        # httpslib uses makefile to read the response
        pid = self.start_server(self.args)
        try:
            from M2Crypto import httpslib
            c = httpslib.HTTPS(srv_host, srv_port)
            c.putrequest('GET', '/')
            c.putheader('Accept', 'text/html')
            c.putheader('Accept', 'text/plain')
            c.endheaders()
            c._conn.sock.settimeout(100)
            err, msg, headers = c.getreply()
            assert err == 200, err
            f = c.getfile()
            data = f.read()
            c.close()
        finally:
            self.stop_server(pid)
        self.failIf(string.find(data, 's_server -quiet -www') == -1)

    def test_makefile_timeout_fires(self):
        pid = self.start_server(self.args)
        try:
            from M2Crypto import httpslib
            c = httpslib.HTTPS(srv_host, srv_port)
            c.putrequest('GET', '/')
            c.putheader('Accept', 'text/html')
            c.putheader('Accept', 'text/plain')
            c.endheaders()
            c._conn.sock.settimeout(0.0000000001)
            self.assertRaises(socket.timeout, c.getreply)
            c.close()
        finally:
            self.stop_server(pid)

    def test_twisted_wrapper(self):
        #
        # LEAK ALERT!
        # If this method is commented out, the leak detection code does
        # not find any leaks.
        #
        
        # Test only when twisted and ZopeInterfaces are present
        try:
            from twisted.internet.protocol import ClientFactory
            from twisted.protocols.basic import LineReceiver
            from twisted.internet import reactor
            import M2Crypto.SSL.TwistedProtocolWrapper as wrapper
        except ImportError:
            import warnings
            warnings.warn('Skipping twisted wrapper test because twisted not found')
            return
        
        class EchoClient(LineReceiver):
            def connectionMade(self):
                self.sendLine('GET / HTTP/1.0\n\n')

            def lineReceived(self, line):
                global twisted_data
                twisted_data += line

        class EchoClientFactory(ClientFactory):
            protocol = EchoClient
        
            def clientConnectionFailed(self, connector, reason):
                reactor.stop()
                assert 0, reason
        
            def clientConnectionLost(self, connector, reason):
                reactor.stop()
                
        pid = self.start_server(self.args)

        class ContextFactory:
            def getContext(self):
                return SSL.Context()

        try:
            global twisted_data
            twisted_data = ''
            
            contextFactory = ContextFactory()
            factory = EchoClientFactory()
            wrapper.connectSSL(srv_host, srv_port, factory, contextFactory)
            reactor.run() # This will block until reactor.stop() is called
        finally:
            self.stop_server(pid)
        self.failIf(string.find(twisted_data, 's_server -quiet -www') == -1)


twisted_data = ''

class CheckerTestCase(unittest.TestCase):
    def test_checker(self):
        from M2Crypto.SSL import Checker
        from M2Crypto import X509

        check = Checker.Checker(host=srv_host,
                                peerCertHash='9594D272A975F58F4430511D15B4B7FF3D778113')
        x509 = X509.load_cert('server.pem')
        assert check(x509, srv_host)
        self.assertRaises(Checker.WrongHost, check, x509, 'example.com')
        
        import doctest
        doctest.testmod(Checker)
        
    
class ContextTestCase(unittest.TestCase):
    def test_ctx_load_verify_locations(self):
        ctx = SSL.Context()
        self.assertRaises(AssertionError, ctx.load_verify_locations, None, None)

def suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(CheckerTestCase))
    suite.addTest(unittest.makeSuite(ContextTestCase))
    suite.addTest(unittest.makeSuite(SSLClientTestCase))
    return suite    
    

def zap_servers():
    s = 's_server'
    fn = tempfile.mktemp() 
    cmd = 'ps | egrep %s > %s' % (s, fn)
    os.system(cmd)
    f = open(fn)
    while 1:
        ps = f.readline()
        if not ps:
            break
        chunk = string.split(ps)
        pid, cmd = chunk[0], chunk[4]
        if cmd == s:
            os.kill(int(pid), 1)
    f.close()
    os.unlink(fn)


if __name__ == '__main__':
    report_leaks = 0
    
    if report_leaks:
        import gc
        gc.enable()
        gc.set_debug(gc.DEBUG_LEAK & ~gc.DEBUG_SAVEALL)
    
    try:
        Rand.load_file('../randpool.dat', -1) 
        unittest.TextTestRunner().run(suite())
        Rand.save_file('../randpool.dat')
    finally:
        zap_servers()

    if report_leaks:
        import alltests
        alltests.dump_garbage()