diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-16 10:29:37 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-16 12:07:26 -0700 |
commit | 2c4b9cdf2cc18ccd40d2489e74766c6b702c2725 (patch) | |
tree | 72be396969e5f04509787f496219c0c8b298336b /test/test_client_async.py | |
parent | 061cb4e83469166873912fca2aac62ca8376377f (diff) | |
download | kafka-python-client_api_version.tar.gz |
Add api_version config to KafkaClient, deprecate str in favor of tuplesclient_api_version
Diffstat (limited to 'test/test_client_async.py')
-rw-r--r-- | test/test_client_async.py | 140 |
1 files changed, 48 insertions, 92 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py index 06c2bf5..dfe11ea 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -19,6 +19,11 @@ from kafka.protocol.produce import ProduceRequest from kafka.structs import BrokerMetadata +@pytest.fixture +def cli(conn): + return KafkaClient(api_version=(0, 9)) + + @pytest.mark.parametrize("bootstrap,expected_hosts", [ (None, [('localhost', 9092, socket.AF_UNSPEC)]), ('foobar:1234', [('foobar', 1234, socket.AF_UNSPEC)]), @@ -29,9 +34,9 @@ from kafka.structs import BrokerMetadata def test_bootstrap_servers(mocker, bootstrap, expected_hosts): mocker.patch.object(KafkaClient, '_bootstrap') if bootstrap is None: - KafkaClient() + KafkaClient(api_version=(0, 9)) # pass api_version to skip auto version checks else: - KafkaClient(bootstrap_servers=bootstrap) + KafkaClient(bootstrap_servers=bootstrap, api_version=(0, 9)) # host order is randomized internally, so resort before testing (hosts,), _ = KafkaClient._bootstrap.call_args # pylint: disable=no-member @@ -40,7 +45,7 @@ def test_bootstrap_servers(mocker, bootstrap, expected_hosts): def test_bootstrap_success(conn): conn.state = ConnectionStates.CONNECTED - cli = KafkaClient() + cli = KafkaClient(api_version=(0, 9)) args, kwargs = conn.call_args assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') @@ -53,7 +58,7 @@ def test_bootstrap_success(conn): def test_bootstrap_failure(conn): conn.state = ConnectionStates.DISCONNECTED - cli = KafkaClient() + cli = KafkaClient(api_version=(0, 9)) args, kwargs = conn.call_args assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') @@ -64,9 +69,7 @@ def test_bootstrap_failure(conn): assert cli.cluster.brokers() == set() -def test_can_connect(conn): - cli = KafkaClient() - +def test_can_connect(cli, conn): # Node is not in broker metadata - cant connect assert not cli._can_connect(2) @@ -86,8 +89,7 @@ def test_can_connect(conn): conn.blacked_out.return_value = True assert not cli._can_connect(0) -def test_maybe_connect(conn): - cli = KafkaClient() +def test_maybe_connect(cli, conn): try: # Node not in metadata, raises AssertionError cli._maybe_connect(2) @@ -104,8 +106,7 @@ def test_maybe_connect(conn): assert cli._conns[0] is conn -def test_conn_state_change(mocker, conn): - cli = KafkaClient() +def test_conn_state_change(mocker, cli, conn): sel = mocker.patch.object(cli, '_selector') node_id = 0 @@ -136,16 +137,14 @@ def test_conn_state_change(mocker, conn): assert node_id not in cli._connecting -def test_ready(mocker, conn): - cli = KafkaClient() +def test_ready(mocker, cli, conn): maybe_connect = mocker.patch.object(cli, '_maybe_connect') node_id = 1 cli.ready(node_id) maybe_connect.assert_called_with(node_id) -def test_is_ready(mocker, conn): - cli = KafkaClient() +def test_is_ready(mocker, cli, conn): cli._maybe_connect(0) cli._maybe_connect(1) @@ -179,8 +178,7 @@ def test_is_ready(mocker, conn): assert not cli.is_ready(0) -def test_close(mocker, conn): - cli = KafkaClient() +def test_close(mocker, cli, conn): mocker.patch.object(cli, '_selector') # bootstrap connection should have been closed @@ -201,9 +199,7 @@ def test_close(mocker, conn): assert conn.close.call_count == 4 -def test_is_disconnected(conn): - cli = KafkaClient() - +def test_is_disconnected(cli, conn): # False if not connected yet conn.state = ConnectionStates.DISCONNECTED assert not cli.is_disconnected(0) @@ -218,9 +214,7 @@ def test_is_disconnected(conn): assert not cli.is_disconnected(0) -def test_send(conn): - cli = KafkaClient() - +def test_send(cli, conn): # Send to unknown node => raises AssertionError try: cli.send(2, None) @@ -251,7 +245,7 @@ def test_poll(mocker): mocker.patch.object(KafkaClient, '_bootstrap') metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata') _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient() + cli = KafkaClient(api_version=(0, 9)) tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') # metadata timeout wins @@ -293,106 +287,68 @@ def test_set_topics(): pass -def test_maybe_refresh_metadata_ttl(mocker): +@pytest.fixture +def client(mocker): mocker.patch.object(KafkaClient, '_bootstrap') _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222, api_version=(0, 9)) tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') tasks.return_value = 9999999 ttl = mocker.patch.object(cli.cluster, 'ttl') - ttl.return_value = 1234 - - cli.poll(timeout_ms=9999999, sleep=True) - _poll.assert_called_with(1.234, sleep=True) - + ttl.return_value = 0 + return cli -def test_maybe_refresh_metadata_backoff(mocker): - mocker.patch.object(KafkaClient, '_bootstrap') - _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) +def test_maybe_refresh_metadata_ttl(mocker, client): + client.cluster.ttl.return_value = 1234 - tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') - tasks.return_value = 9999999 + client.poll(timeout_ms=9999999, sleep=True) + client._poll.assert_called_with(1.234, sleep=True) - ttl = mocker.patch.object(cli.cluster, 'ttl') - ttl.return_value = 0 +def test_maybe_refresh_metadata_backoff(mocker, client): now = time.time() t = mocker.patch('time.time') t.return_value = now - cli._last_no_node_available_ms = now * 1000 - - cli.poll(timeout_ms=9999999, sleep=True) - _poll.assert_called_with(2.222, sleep=True) - - -def test_maybe_refresh_metadata_in_progress(mocker): - mocker.patch.object(KafkaClient, '_bootstrap') - _poll = mocker.patch.object(KafkaClient, '_poll') - - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) - - tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') - tasks.return_value = 9999999 - - ttl = mocker.patch.object(cli.cluster, 'ttl') - ttl.return_value = 0 - - cli._metadata_refresh_in_progress = True - - cli.poll(timeout_ms=9999999, sleep=True) - _poll.assert_called_with(9999.999, sleep=True) + client._last_no_node_available_ms = now * 1000 + client.poll(timeout_ms=9999999, sleep=True) + client._poll.assert_called_with(2.222, sleep=True) -def test_maybe_refresh_metadata_update(mocker): - mocker.patch.object(KafkaClient, '_bootstrap') - _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) +def test_maybe_refresh_metadata_in_progress(mocker, client): + client._metadata_refresh_in_progress = True - tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') - tasks.return_value = 9999999 + client.poll(timeout_ms=9999999, sleep=True) + client._poll.assert_called_with(9999.999, sleep=True) - ttl = mocker.patch.object(cli.cluster, 'ttl') - ttl.return_value = 0 - mocker.patch.object(cli, 'least_loaded_node', return_value='foobar') - mocker.patch.object(cli, '_can_send_request', return_value=True) - send = mocker.patch.object(cli, 'send') +def test_maybe_refresh_metadata_update(mocker, client): + mocker.patch.object(client, 'least_loaded_node', return_value='foobar') + mocker.patch.object(client, '_can_send_request', return_value=True) + send = mocker.patch.object(client, 'send') - cli.poll(timeout_ms=9999999, sleep=True) - _poll.assert_called_with(0, sleep=True) - assert cli._metadata_refresh_in_progress + client.poll(timeout_ms=9999999, sleep=True) + client._poll.assert_called_with(0, sleep=True) + assert client._metadata_refresh_in_progress request = MetadataRequest[0]([]) send.assert_called_with('foobar', request) -def test_maybe_refresh_metadata_failure(mocker): - mocker.patch.object(KafkaClient, '_bootstrap') - _poll = mocker.patch.object(KafkaClient, '_poll') - - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) - - tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') - tasks.return_value = 9999999 - - ttl = mocker.patch.object(cli.cluster, 'ttl') - ttl.return_value = 0 - - mocker.patch.object(cli, 'least_loaded_node', return_value='foobar') +def test_maybe_refresh_metadata_failure(mocker, client): + mocker.patch.object(client, 'least_loaded_node', return_value='foobar') now = time.time() t = mocker.patch('time.time') t.return_value = now - cli.poll(timeout_ms=9999999, sleep=True) - _poll.assert_called_with(0, sleep=True) - assert cli._last_no_node_available_ms == now * 1000 - assert not cli._metadata_refresh_in_progress + client.poll(timeout_ms=9999999, sleep=True) + client._poll.assert_called_with(0, sleep=True) + assert client._last_no_node_available_ms == now * 1000 + assert not client._metadata_refresh_in_progress def test_schedule(): |