diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-16 10:29:37 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-16 12:07:26 -0700 |
commit | 2c4b9cdf2cc18ccd40d2489e74766c6b702c2725 (patch) | |
tree | 72be396969e5f04509787f496219c0c8b298336b /test | |
parent | 061cb4e83469166873912fca2aac62ca8376377f (diff) | |
download | kafka-python-client_api_version.tar.gz |
Add api_version config to KafkaClient, deprecate str in favor of tuplesclient_api_version
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client_async.py | 140 | ||||
-rw-r--r-- | test/test_consumer_group.py | 2 | ||||
-rw-r--r-- | test/test_coordinator.py | 27 | ||||
-rw-r--r-- | test/test_fetcher.py | 2 | ||||
-rw-r--r-- | test/test_sender.py | 2 |
5 files changed, 65 insertions, 108 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py index 06c2bf5..dfe11ea 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -19,6 +19,11 @@ from kafka.protocol.produce import ProduceRequest from kafka.structs import BrokerMetadata +@pytest.fixture +def cli(conn): + return KafkaClient(api_version=(0, 9)) + + @pytest.mark.parametrize("bootstrap,expected_hosts", [ (None, [('localhost', 9092, socket.AF_UNSPEC)]), ('foobar:1234', [('foobar', 1234, socket.AF_UNSPEC)]), @@ -29,9 +34,9 @@ from kafka.structs import BrokerMetadata def test_bootstrap_servers(mocker, bootstrap, expected_hosts): mocker.patch.object(KafkaClient, '_bootstrap') if bootstrap is None: - KafkaClient() + KafkaClient(api_version=(0, 9)) # pass api_version to skip auto version checks else: - KafkaClient(bootstrap_servers=bootstrap) + KafkaClient(bootstrap_servers=bootstrap, api_version=(0, 9)) # host order is randomized internally, so resort before testing (hosts,), _ = KafkaClient._bootstrap.call_args # pylint: disable=no-member @@ -40,7 +45,7 @@ def test_bootstrap_servers(mocker, bootstrap, expected_hosts): def test_bootstrap_success(conn): conn.state = ConnectionStates.CONNECTED - cli = KafkaClient() + cli = KafkaClient(api_version=(0, 9)) args, kwargs = conn.call_args assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') @@ -53,7 +58,7 @@ def test_bootstrap_success(conn): def test_bootstrap_failure(conn): conn.state = ConnectionStates.DISCONNECTED - cli = KafkaClient() + cli = KafkaClient(api_version=(0, 9)) args, kwargs = conn.call_args assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') @@ -64,9 +69,7 @@ def test_bootstrap_failure(conn): assert cli.cluster.brokers() == set() -def test_can_connect(conn): - cli = KafkaClient() - +def test_can_connect(cli, conn): # Node is not in broker metadata - cant connect assert not cli._can_connect(2) @@ -86,8 +89,7 @@ def test_can_connect(conn): conn.blacked_out.return_value = True assert not cli._can_connect(0) -def test_maybe_connect(conn): - cli = KafkaClient() +def test_maybe_connect(cli, conn): try: # Node not in metadata, raises AssertionError cli._maybe_connect(2) @@ -104,8 +106,7 @@ def test_maybe_connect(conn): assert cli._conns[0] is conn -def test_conn_state_change(mocker, conn): - cli = KafkaClient() +def test_conn_state_change(mocker, cli, conn): sel = mocker.patch.object(cli, '_selector') node_id = 0 @@ -136,16 +137,14 @@ def test_conn_state_change(mocker, conn): assert node_id not in cli._connecting -def test_ready(mocker, conn): - cli = KafkaClient() +def test_ready(mocker, cli, conn): maybe_connect = mocker.patch.object(cli, '_maybe_connect') node_id = 1 cli.ready(node_id) maybe_connect.assert_called_with(node_id) -def test_is_ready(mocker, conn): - cli = KafkaClient() +def test_is_ready(mocker, cli, conn): cli._maybe_connect(0) cli._maybe_connect(1) @@ -179,8 +178,7 @@ def test_is_ready(mocker, conn): assert not cli.is_ready(0) -def test_close(mocker, conn): - cli = KafkaClient() +def test_close(mocker, cli, conn): mocker.patch.object(cli, '_selector') # bootstrap connection should have been closed @@ -201,9 +199,7 @@ def test_close(mocker, conn): assert conn.close.call_count == 4 -def test_is_disconnected(conn): - cli = KafkaClient() - +def test_is_disconnected(cli, conn): # False if not connected yet conn.state = ConnectionStates.DISCONNECTED assert not cli.is_disconnected(0) @@ -218,9 +214,7 @@ def test_is_disconnected(conn): assert not cli.is_disconnected(0) -def test_send(conn): - cli = KafkaClient() - +def test_send(cli, conn): # Send to unknown node => raises AssertionError try: cli.send(2, None) @@ -251,7 +245,7 @@ def test_poll(mocker): mocker.patch.object(KafkaClient, '_bootstrap') metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata') _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient() + cli = KafkaClient(api_version=(0, 9)) tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') # metadata timeout wins @@ -293,106 +287,68 @@ def test_set_topics(): pass -def test_maybe_refresh_metadata_ttl(mocker): +@pytest.fixture +def client(mocker): mocker.patch.object(KafkaClient, '_bootstrap') _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222, api_version=(0, 9)) tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') tasks.return_value = 9999999 ttl = mocker.patch.object(cli.cluster, 'ttl') - ttl.return_value = 1234 - - cli.poll(timeout_ms=9999999, sleep=True) - _poll.assert_called_with(1.234, sleep=True) - + ttl.return_value = 0 + return cli -def test_maybe_refresh_metadata_backoff(mocker): - mocker.patch.object(KafkaClient, '_bootstrap') - _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) +def test_maybe_refresh_metadata_ttl(mocker, client): + client.cluster.ttl.return_value = 1234 - tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') - tasks.return_value = 9999999 + client.poll(timeout_ms=9999999, sleep=True) + client._poll.assert_called_with(1.234, sleep=True) - ttl = mocker.patch.object(cli.cluster, 'ttl') - ttl.return_value = 0 +def test_maybe_refresh_metadata_backoff(mocker, client): now = time.time() t = mocker.patch('time.time') t.return_value = now - cli._last_no_node_available_ms = now * 1000 - - cli.poll(timeout_ms=9999999, sleep=True) - _poll.assert_called_with(2.222, sleep=True) - - -def test_maybe_refresh_metadata_in_progress(mocker): - mocker.patch.object(KafkaClient, '_bootstrap') - _poll = mocker.patch.object(KafkaClient, '_poll') - - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) - - tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') - tasks.return_value = 9999999 - - ttl = mocker.patch.object(cli.cluster, 'ttl') - ttl.return_value = 0 - - cli._metadata_refresh_in_progress = True - - cli.poll(timeout_ms=9999999, sleep=True) - _poll.assert_called_with(9999.999, sleep=True) + client._last_no_node_available_ms = now * 1000 + client.poll(timeout_ms=9999999, sleep=True) + client._poll.assert_called_with(2.222, sleep=True) -def test_maybe_refresh_metadata_update(mocker): - mocker.patch.object(KafkaClient, '_bootstrap') - _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) +def test_maybe_refresh_metadata_in_progress(mocker, client): + client._metadata_refresh_in_progress = True - tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') - tasks.return_value = 9999999 + client.poll(timeout_ms=9999999, sleep=True) + client._poll.assert_called_with(9999.999, sleep=True) - ttl = mocker.patch.object(cli.cluster, 'ttl') - ttl.return_value = 0 - mocker.patch.object(cli, 'least_loaded_node', return_value='foobar') - mocker.patch.object(cli, '_can_send_request', return_value=True) - send = mocker.patch.object(cli, 'send') +def test_maybe_refresh_metadata_update(mocker, client): + mocker.patch.object(client, 'least_loaded_node', return_value='foobar') + mocker.patch.object(client, '_can_send_request', return_value=True) + send = mocker.patch.object(client, 'send') - cli.poll(timeout_ms=9999999, sleep=True) - _poll.assert_called_with(0, sleep=True) - assert cli._metadata_refresh_in_progress + client.poll(timeout_ms=9999999, sleep=True) + client._poll.assert_called_with(0, sleep=True) + assert client._metadata_refresh_in_progress request = MetadataRequest[0]([]) send.assert_called_with('foobar', request) -def test_maybe_refresh_metadata_failure(mocker): - mocker.patch.object(KafkaClient, '_bootstrap') - _poll = mocker.patch.object(KafkaClient, '_poll') - - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) - - tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') - tasks.return_value = 9999999 - - ttl = mocker.patch.object(cli.cluster, 'ttl') - ttl.return_value = 0 - - mocker.patch.object(cli, 'least_loaded_node', return_value='foobar') +def test_maybe_refresh_metadata_failure(mocker, client): + mocker.patch.object(client, 'least_loaded_node', return_value='foobar') now = time.time() t = mocker.patch('time.time') t.return_value = now - cli.poll(timeout_ms=9999999, sleep=True) - _poll.assert_called_with(0, sleep=True) - assert cli._last_no_node_available_ms == now * 1000 - assert not cli._metadata_refresh_in_progress + client.poll(timeout_ms=9999999, sleep=True) + client._poll.assert_called_with(0, sleep=True) + assert client._last_no_node_available_ms == now * 1000 + assert not client._metadata_refresh_in_progress def test_schedule(): diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 04ed9bb..9fb057e 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -139,7 +139,7 @@ def test_paused(kafka_broker, topic): def test_heartbeat_timeout(conn, mocker): - mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9') + mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = (0, 9)) mocker.patch('time.time', return_value = 1234) consumer = KafkaConsumer('foobar') mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index bb62b7b..3435292 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -24,24 +24,24 @@ from kafka.util import WeakMethod @pytest.fixture -def coordinator(conn): - return ConsumerCoordinator(KafkaClient(), SubscriptionState(), Metrics(), - 'consumer') +def client(conn): + return KafkaClient(api_version=(0, 9)) +@pytest.fixture +def coordinator(client): + return ConsumerCoordinator(client, SubscriptionState(), Metrics(), + 'consumer') -def test_init(conn): - cli = KafkaClient() - coordinator = ConsumerCoordinator(cli, SubscriptionState(), Metrics(), - 'consumer') +def test_init(client, coordinator): # metadata update on init - assert cli.cluster._need_update is True - assert WeakMethod(coordinator._handle_metadata_update) in cli.cluster._listeners + assert client.cluster._need_update is True + assert WeakMethod(coordinator._handle_metadata_update) in client.cluster._listeners @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) -def test_autocommit_enable_api_version(conn, api_version): - coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), +def test_autocommit_enable_api_version(client, api_version): + coordinator = ConsumerCoordinator(client, SubscriptionState(), Metrics(), 'consumer', enable_auto_commit=True, group_id='foobar', @@ -80,7 +80,7 @@ def test_group_protocols(coordinator): ] -@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)]) +@pytest.mark.parametrize('api_version', [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) def test_pattern_subscription(coordinator, api_version): coordinator.config['api_version'] = api_version coordinator._subscription.subscribe(pattern='foo') @@ -360,7 +360,8 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, warn, exc): mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning') mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') - coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), + client = KafkaClient(api_version=api_version) + coordinator = ConsumerCoordinator(client, SubscriptionState(), Metrics(), 'consumer', api_version=api_version, enable_auto_commit=enable, diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 7e529bc..1f1f7d3 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -15,7 +15,7 @@ from kafka.structs import TopicPartition, OffsetAndMetadata @pytest.fixture def client(mocker): - return mocker.Mock(spec=KafkaClient(bootstrap_servers=[])) + return mocker.Mock(spec=KafkaClient(bootstrap_servers=[], api_version=(0, 9))) @pytest.fixture diff --git a/test/test_sender.py b/test/test_sender.py index bb9068e..44105e2 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -18,7 +18,7 @@ from kafka.structs import TopicPartition, OffsetAndMetadata @pytest.fixture def client(mocker): - _cli = mocker.Mock(spec=KafkaClient(bootstrap_servers=[])) + _cli = mocker.Mock(spec=KafkaClient(bootstrap_servers=[], api_version=(0, 9))) _cli.cluster = mocker.Mock(spec=ClusterMetadata()) return _cli |