Sophie

Sophie

distrib > Fedora > 13 > i386 > by-pkgid > 552d72b401c5b4a5a4c52922e7b31f2c > files > 156

python-eventlet-doc-0.9.12-1.fc13.noarch.rpm

import unittest
import socket as _original_sock
from eventlet import api
from eventlet.green import socket

class TestSocketErrors(unittest.TestCase):    
    def test_connection_refused(self):
        # open and close a dummy server to find an unused port
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.bind(('127.0.0.1', 0))
        server.listen(1)
        port = server.getsockname()[1]
        server.close()
        del server
        s = socket.socket()
        try:
            s.connect(('127.0.0.1', port))
            self.fail("Shouldn't have connected")
        except socket.error, ex:
            code, text = ex.args
            assert code in [111, 61, 10061], (code, text)
            assert 'refused' in text.lower(), (code, text)

    def test_timeout_real_socket(self):
        """ Test underlying socket behavior to ensure correspondence
            between green sockets and the underlying socket module. """
        return self.test_timeout(socket=_original_sock)
        
    def test_timeout(self, socket=socket):
        """ Test that the socket timeout exception works correctly. """
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.bind(('127.0.0.1', 0))
        server.listen(1)
        port = server.getsockname()[1]

        s = socket.socket()
        
        s.connect(('127.0.0.1', port))

        cs, addr = server.accept()
        cs.settimeout(1)
        try:
            try:
                cs.recv(1024)
                self.fail("Should have timed out")
            except socket.timeout, ex:
                assert hasattr(ex, 'args')
                assert len(ex.args) == 1
                assert ex.args[0] == 'timed out'
        finally:
            s.close()
            cs.close()
            server.close()

if __name__=='__main__':
    unittest.main()