summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_client_async.py68
1 files changed, 23 insertions, 45 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index a4dc9db..3588423 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -23,58 +23,34 @@ from kafka.structs import BrokerMetadata
@pytest.fixture
-def cli(conn):
- return KafkaClient(api_version=(0, 9))
-
-
-@pytest.mark.parametrize("bootstrap,expected_hosts", [
- (None, [('localhost', 9092, socket.AF_UNSPEC)]),
- ('foobar:1234', [('foobar', 1234, socket.AF_UNSPEC)]),
- ('fizzbuzz', [('fizzbuzz', 9092, socket.AF_UNSPEC)]),
- ('foo:12,bar:34', [('foo', 12, socket.AF_UNSPEC), ('bar', 34, socket.AF_UNSPEC)]),
- (['fizz:56', 'buzz'], [('fizz', 56, socket.AF_UNSPEC), ('buzz', 9092, socket.AF_UNSPEC)]),
-])
-def test_bootstrap_servers(mocker, bootstrap, expected_hosts):
- mocker.patch.object(KafkaClient, '_bootstrap')
- if bootstrap is None:
- KafkaClient(api_version=(0, 9)) # pass api_version to skip auto version checks
- else:
- KafkaClient(bootstrap_servers=bootstrap, api_version=(0, 9))
-
- # host order is randomized internally, so resort before testing
- (hosts,), _ = KafkaClient._bootstrap.call_args # pylint: disable=no-member
- assert sorted(hosts) == sorted(expected_hosts)
+def cli(mocker, conn):
+ mocker.patch('kafka.cluster.dns_lookup',
+ return_value=[(socket.AF_INET, None, None, None, ('localhost', 9092))])
+ client = KafkaClient(api_version=(0, 9))
+ client.poll(future=client.cluster.request_update())
+ return client
-def test_bootstrap_success(conn):
+def test_bootstrap(mocker, conn):
conn.state = ConnectionStates.CONNECTED
+ mocker.patch('kafka.cluster.dns_lookup',
+ return_value=[(socket.AF_INET, None, None, None, ('localhost', 9092))])
cli = KafkaClient(api_version=(0, 9))
+ future = cli.cluster.request_update()
+ cli.poll(future=future)
+
+ assert future.succeeded()
args, kwargs = conn.call_args
assert args == ('localhost', 9092, socket.AF_UNSPEC)
kwargs.pop('state_change_callback')
kwargs.pop('node_id')
assert kwargs == cli.config
- conn.connect_blocking.assert_called_with()
- conn.send.assert_called_once_with(MetadataRequest[0]([]))
+ conn.send.assert_called_once_with(MetadataRequest[0]([]), blocking=False)
assert cli._bootstrap_fails == 0
assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None),
BrokerMetadata(1, 'bar', 34, None)])
-def test_bootstrap_failure(conn):
- conn.connect_blocking.return_value = False
- cli = KafkaClient(api_version=(0, 9))
- args, kwargs = conn.call_args
- assert args == ('localhost', 9092, socket.AF_UNSPEC)
- kwargs.pop('state_change_callback')
- kwargs.pop('node_id')
- assert kwargs == cli.config
- conn.connect_blocking.assert_called_with()
- conn.close.assert_called_with()
- assert cli._bootstrap_fails == 1
- assert cli.cluster.brokers() == set()
-
-
def test_can_connect(cli, conn):
# Node is not in broker metadata - can't connect
assert not cli._can_connect(2)
@@ -187,22 +163,26 @@ def test_is_ready(mocker, cli, conn):
def test_close(mocker, cli, conn):
mocker.patch.object(cli, '_selector')
- # bootstrap connection should have been closed
- assert conn.close.call_count == 1
+ call_count = conn.close.call_count
# Unknown node - silent
cli.close(2)
+ call_count += 0
+ assert conn.close.call_count == call_count
# Single node close
cli._maybe_connect(0)
- assert conn.close.call_count == 1
+ assert conn.close.call_count == call_count
cli.close(0)
- assert conn.close.call_count == 2
+ call_count += 1
+ assert conn.close.call_count == call_count
# All node close
cli._maybe_connect(1)
cli.close()
- assert conn.close.call_count == 4
+ # +3 close: node 0, node 1, node bootstrap
+ call_count += 3
+ assert conn.close.call_count == call_count
def test_is_disconnected(cli, conn):
@@ -249,7 +229,6 @@ def test_send(cli, conn):
def test_poll(mocker):
- mocker.patch.object(KafkaClient, '_bootstrap')
metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata')
_poll = mocker.patch.object(KafkaClient, '_poll')
cli = KafkaClient(api_version=(0, 9))
@@ -309,7 +288,6 @@ def test_set_topics(mocker):
@pytest.fixture
def client(mocker):
- mocker.patch.object(KafkaClient, '_bootstrap')
_poll = mocker.patch.object(KafkaClient, '_poll')
cli = KafkaClient(request_timeout_ms=9999999,