diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client_async.py | 68 |
1 files changed, 23 insertions, 45 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py index a4dc9db..3588423 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -23,58 +23,34 @@ from kafka.structs import BrokerMetadata @pytest.fixture -def cli(conn): - return KafkaClient(api_version=(0, 9)) - - -@pytest.mark.parametrize("bootstrap,expected_hosts", [ - (None, [('localhost', 9092, socket.AF_UNSPEC)]), - ('foobar:1234', [('foobar', 1234, socket.AF_UNSPEC)]), - ('fizzbuzz', [('fizzbuzz', 9092, socket.AF_UNSPEC)]), - ('foo:12,bar:34', [('foo', 12, socket.AF_UNSPEC), ('bar', 34, socket.AF_UNSPEC)]), - (['fizz:56', 'buzz'], [('fizz', 56, socket.AF_UNSPEC), ('buzz', 9092, socket.AF_UNSPEC)]), -]) -def test_bootstrap_servers(mocker, bootstrap, expected_hosts): - mocker.patch.object(KafkaClient, '_bootstrap') - if bootstrap is None: - KafkaClient(api_version=(0, 9)) # pass api_version to skip auto version checks - else: - KafkaClient(bootstrap_servers=bootstrap, api_version=(0, 9)) - - # host order is randomized internally, so resort before testing - (hosts,), _ = KafkaClient._bootstrap.call_args # pylint: disable=no-member - assert sorted(hosts) == sorted(expected_hosts) +def cli(mocker, conn): + mocker.patch('kafka.cluster.dns_lookup', + return_value=[(socket.AF_INET, None, None, None, ('localhost', 9092))]) + client = KafkaClient(api_version=(0, 9)) + client.poll(future=client.cluster.request_update()) + return client -def test_bootstrap_success(conn): +def test_bootstrap(mocker, conn): conn.state = ConnectionStates.CONNECTED + mocker.patch('kafka.cluster.dns_lookup', + return_value=[(socket.AF_INET, None, None, None, ('localhost', 9092))]) cli = KafkaClient(api_version=(0, 9)) + future = cli.cluster.request_update() + cli.poll(future=future) + + assert future.succeeded() args, kwargs = conn.call_args assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') kwargs.pop('node_id') assert kwargs == cli.config - conn.connect_blocking.assert_called_with() - conn.send.assert_called_once_with(MetadataRequest[0]([])) + conn.send.assert_called_once_with(MetadataRequest[0]([]), blocking=False) assert cli._bootstrap_fails == 0 assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None), BrokerMetadata(1, 'bar', 34, None)]) -def test_bootstrap_failure(conn): - conn.connect_blocking.return_value = False - cli = KafkaClient(api_version=(0, 9)) - args, kwargs = conn.call_args - assert args == ('localhost', 9092, socket.AF_UNSPEC) - kwargs.pop('state_change_callback') - kwargs.pop('node_id') - assert kwargs == cli.config - conn.connect_blocking.assert_called_with() - conn.close.assert_called_with() - assert cli._bootstrap_fails == 1 - assert cli.cluster.brokers() == set() - - def test_can_connect(cli, conn): # Node is not in broker metadata - can't connect assert not cli._can_connect(2) @@ -187,22 +163,26 @@ def test_is_ready(mocker, cli, conn): def test_close(mocker, cli, conn): mocker.patch.object(cli, '_selector') - # bootstrap connection should have been closed - assert conn.close.call_count == 1 + call_count = conn.close.call_count # Unknown node - silent cli.close(2) + call_count += 0 + assert conn.close.call_count == call_count # Single node close cli._maybe_connect(0) - assert conn.close.call_count == 1 + assert conn.close.call_count == call_count cli.close(0) - assert conn.close.call_count == 2 + call_count += 1 + assert conn.close.call_count == call_count # All node close cli._maybe_connect(1) cli.close() - assert conn.close.call_count == 4 + # +3 close: node 0, node 1, node bootstrap + call_count += 3 + assert conn.close.call_count == call_count def test_is_disconnected(cli, conn): @@ -249,7 +229,6 @@ def test_send(cli, conn): def test_poll(mocker): - mocker.patch.object(KafkaClient, '_bootstrap') metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata') _poll = mocker.patch.object(KafkaClient, '_poll') cli = KafkaClient(api_version=(0, 9)) @@ -309,7 +288,6 @@ def test_set_topics(mocker): @pytest.fixture def client(mocker): - mocker.patch.object(KafkaClient, '_bootstrap') _poll = mocker.patch.object(KafkaClient, '_poll') cli = KafkaClient(request_timeout_ms=9999999, |