diff options
Diffstat (limited to 'test/test_conn.py')
| -rw-r--r-- | test/test_conn.py | 318 | 
1 files changed, 79 insertions, 239 deletions
diff --git a/test/test_conn.py b/test/test_conn.py index f0ef8fb..d394f74 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -1,242 +1,82 @@ +# pylint: skip-file +from __future__ import absolute_import + +from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET  import socket -import struct -from threading import Thread - -import mock -from . import unittest - -from kafka.common import ConnectionError -from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS - -class ConnTest(unittest.TestCase): -    def setUp(self): - -        self.config = { -            'host': 'localhost', -            'port': 9090, -            'request_id': 0, -            'payload': b'test data', -            'payload2': b'another packet' -        } - -        # Mocking socket.create_connection will cause _sock to always be a -        # MagicMock() -        patcher = mock.patch('socket.create_connection', spec=True) -        self.MockCreateConn = patcher.start() -        self.addCleanup(patcher.stop) - -        # Also mock socket.sendall() to appear successful -        self.MockCreateConn().sendall.return_value = None - -        # And mock socket.recv() to return two payloads, then '', then raise -        # Note that this currently ignores the num_bytes parameter to sock.recv() -        payload_size = len(self.config['payload']) -        payload2_size = len(self.config['payload2']) -        self.MockCreateConn().recv.side_effect = [ -            struct.pack('>i', payload_size), -            struct.pack('>%ds' % payload_size, self.config['payload']), -            struct.pack('>i', payload2_size), -            struct.pack('>%ds' % payload2_size, self.config['payload2']), -            b'' -        ] - -        # Create a connection object -        self.conn = KafkaConnection(self.config['host'], self.config['port']) - -        # Reset any mock counts caused by __init__ -        self.MockCreateConn.reset_mock() - -    def test_collect_hosts__happy_path(self): -        hosts = "localhost:1234,localhost" -        results = collect_hosts(hosts) - -        self.assertEqual(set(results), set([ -            ('localhost', 1234, socket.AF_INET), -            ('localhost', 9092, socket.AF_INET), -        ])) - -    def test_collect_hosts__ipv6(self): -        hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234" -        results = collect_hosts(hosts) - -        self.assertEqual(set(results), set([ -            ('localhost', 1234, socket.AF_INET6), -            ('2001:1000:2000::1', 9092, socket.AF_INET6), -            ('2001:1000:2000::1', 1234, socket.AF_INET6), -        ])) - -    def test_collect_hosts__string_list(self): -        hosts = [ -            'localhost:1234', -            'localhost', -            '[localhost]', -            '2001::1', -            '[2001::1]:1234', -        ] - -        results = collect_hosts(hosts) - -        self.assertEqual(set(results), set([ -            ('localhost', 1234, socket.AF_INET), -            ('localhost', 9092, socket.AF_INET), -            ('localhost', 9092, socket.AF_INET6), -            ('2001::1', 9092, socket.AF_INET6), -            ('2001::1', 1234, socket.AF_INET6), -        ])) - -    def test_collect_hosts__with_spaces(self): -        hosts = "localhost:1234, localhost" -        results = collect_hosts(hosts) - -        self.assertEqual(set(results), set([ -            ('localhost', 1234, socket.AF_INET), -            ('localhost', 9092, socket.AF_INET), -        ])) - - -    def test_send(self): -        self.conn.send(self.config['request_id'], self.config['payload']) -        self.conn._sock.sendall.assert_called_with(self.config['payload']) - -    def test_init_creates_socket_connection(self): -        KafkaConnection(self.config['host'], self.config['port']) -        self.MockCreateConn.assert_called_with((self.config['host'], self.config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS) - -    def test_init_failure_raises_connection_error(self): - -        def raise_error(*args): -            raise socket.error - -        assert socket.create_connection is self.MockCreateConn -        socket.create_connection.side_effect=raise_error -        with self.assertRaises(ConnectionError): -            KafkaConnection(self.config['host'], self.config['port']) - -    def test_send__reconnects_on_dirty_conn(self): - -        # Dirty the connection -        try: -            self.conn._raise_connection_error() -        except ConnectionError: -            pass - -        # Now test that sending attempts to reconnect -        self.assertEqual(self.MockCreateConn.call_count, 0) -        self.conn.send(self.config['request_id'], self.config['payload']) -        self.assertEqual(self.MockCreateConn.call_count, 1) - -    def test_send__failure_sets_dirty_connection(self): - -        def raise_error(*args): -            raise socket.error - -        assert isinstance(self.conn._sock, mock.Mock) -        self.conn._sock.sendall.side_effect=raise_error -        try: -            self.conn.send(self.config['request_id'], self.config['payload']) -        except ConnectionError: -            self.assertIsNone(self.conn._sock) - -    def test_recv(self): - -        self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload']) - -    def test_recv__reconnects_on_dirty_conn(self): - -        # Dirty the connection -        try: -            self.conn._raise_connection_error() -        except ConnectionError: -            pass - -        # Now test that recv'ing attempts to reconnect -        self.assertEqual(self.MockCreateConn.call_count, 0) -        self.conn.recv(self.config['request_id']) -        self.assertEqual(self.MockCreateConn.call_count, 1) - -    def test_recv__failure_sets_dirty_connection(self): - -        def raise_error(*args): -            raise socket.error - -        # test that recv'ing attempts to reconnect -        assert isinstance(self.conn._sock, mock.Mock) -        self.conn._sock.recv.side_effect=raise_error -        try: -            self.conn.recv(self.config['request_id']) -        except ConnectionError: -            self.assertIsNone(self.conn._sock) - -    def test_recv__doesnt_consume_extra_data_in_stream(self): - -        # Here just test that each call to recv will return a single payload -        self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload']) -        self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2']) - -    def test_get_connected_socket(self): -        s = self.conn.get_connected_socket() - -        self.assertEqual(s, self.MockCreateConn()) - -    def test_get_connected_socket_on_dirty_conn(self): -        # Dirty the connection -        try: -            self.conn._raise_connection_error() -        except ConnectionError: -            pass - -        # Test that get_connected_socket tries to connect -        self.assertEqual(self.MockCreateConn.call_count, 0) -        self.conn.get_connected_socket() -        self.assertEqual(self.MockCreateConn.call_count, 1) +import time + +import pytest + +from kafka.conn import BrokerConnection, ConnectionStates + + +@pytest.fixture +def socket(mocker): +    socket = mocker.MagicMock() +    socket.connect_ex.return_value = 0 +    mocker.patch('socket.socket', return_value=socket) +    return socket + + +@pytest.fixture +def conn(socket): +    conn = BrokerConnection('localhost', 9092, socket.AF_INET) +    return conn + + +@pytest.mark.parametrize("states", [ +  (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),), +  (([EALREADY, EALREADY], ConnectionStates.CONNECTING),), +  (([0], ConnectionStates.CONNECTED),), +  (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING), +   ([ECONNRESET], ConnectionStates.DISCONNECTED)), +  (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING), +   ([EALREADY], ConnectionStates.CONNECTING), +   ([EISCONN], ConnectionStates.CONNECTED)), +]) +def test_connect(socket, conn, states): +    assert conn.state is ConnectionStates.DISCONNECTED + +    for errno, state in states: +        socket.connect_ex.side_effect = errno +        conn.connect() +        assert conn.state is state + + +def test_connect_timeout(socket, conn): +    assert conn.state is ConnectionStates.DISCONNECTED + +    # Initial connect returns EINPROGRESS +    # immediate inline connect returns EALREADY +    # second explicit connect returns EALREADY +    # third explicit connect returns EALREADY and times out via last_attempt +    socket.connect_ex.side_effect = [EINPROGRESS, EALREADY, EALREADY, EALREADY] +    conn.connect() +    assert conn.state is ConnectionStates.CONNECTING +    conn.connect() +    assert conn.state is ConnectionStates.CONNECTING +    conn.last_attempt = 0 +    conn.connect() +    assert conn.state is ConnectionStates.DISCONNECTED + + +def test_blacked_out(conn): +    assert not conn.blacked_out() +    conn.last_attempt = time.time() +    assert conn.blacked_out() + + +def test_connected(conn): +    assert not conn.connected() +    conn.state = ConnectionStates.CONNECTED +    assert conn.connected() + -    def test_close__object_is_reusable(self): +def test_connecting(conn): +    assert not conn.connecting() +    conn.state = ConnectionStates.CONNECTING +    assert conn.connecting() +    conn.state = ConnectionStates.CONNECTED +    assert not conn.connecting() -        # test that sending to a closed connection -        # will re-connect and send data to the socket -        self.conn.close() -        self.conn.send(self.config['request_id'], self.config['payload']) -        self.assertEqual(self.MockCreateConn.call_count, 1) -        self.conn._sock.sendall.assert_called_with(self.config['payload']) - - -class TestKafkaConnection(unittest.TestCase): -    @mock.patch('socket.create_connection') -    def test_copy(self, socket): -        """KafkaConnection copies work as expected""" - -        conn = KafkaConnection('kafka', 9092) -        self.assertEqual(socket.call_count, 1) - -        copy = conn.copy() -        self.assertEqual(socket.call_count, 1) -        self.assertEqual(copy.host, 'kafka') -        self.assertEqual(copy.port, 9092) -        self.assertEqual(copy._sock, None) - -        copy.reinit() -        self.assertEqual(socket.call_count, 2) -        self.assertNotEqual(copy._sock, None) - -    @mock.patch('socket.create_connection') -    def test_copy_thread(self, socket): -        """KafkaConnection copies work in other threads""" - -        err = [] -        copy = KafkaConnection('kafka', 9092).copy() - -        def thread_func(err, copy): -            try: -                self.assertEqual(copy.host, 'kafka') -                self.assertEqual(copy.port, 9092) -                self.assertNotEqual(copy._sock, None) -            except Exception as e: -                err.append(e) -            else: -                err.append(None) -        thread = Thread(target=thread_func, args=(err, copy)) -        thread.start() -        thread.join() - -        self.assertEqual(err, [None]) -        self.assertEqual(socket.call_count, 2) +# TODO: test_send, test_recv, test_can_send_more, test_close  | 
