diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-16 10:29:37 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-16 12:07:26 -0700 |
commit | 2c4b9cdf2cc18ccd40d2489e74766c6b702c2725 (patch) | |
tree | 72be396969e5f04509787f496219c0c8b298336b /test/test_coordinator.py | |
parent | 061cb4e83469166873912fca2aac62ca8376377f (diff) | |
download | kafka-python-client_api_version.tar.gz |
Add api_version config to KafkaClient, deprecate str in favor of tuplesclient_api_version
Diffstat (limited to 'test/test_coordinator.py')
-rw-r--r-- | test/test_coordinator.py | 27 |
1 files changed, 14 insertions, 13 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py index bb62b7b..3435292 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -24,24 +24,24 @@ from kafka.util import WeakMethod @pytest.fixture -def coordinator(conn): - return ConsumerCoordinator(KafkaClient(), SubscriptionState(), Metrics(), - 'consumer') +def client(conn): + return KafkaClient(api_version=(0, 9)) +@pytest.fixture +def coordinator(client): + return ConsumerCoordinator(client, SubscriptionState(), Metrics(), + 'consumer') -def test_init(conn): - cli = KafkaClient() - coordinator = ConsumerCoordinator(cli, SubscriptionState(), Metrics(), - 'consumer') +def test_init(client, coordinator): # metadata update on init - assert cli.cluster._need_update is True - assert WeakMethod(coordinator._handle_metadata_update) in cli.cluster._listeners + assert client.cluster._need_update is True + assert WeakMethod(coordinator._handle_metadata_update) in client.cluster._listeners @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) -def test_autocommit_enable_api_version(conn, api_version): - coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), +def test_autocommit_enable_api_version(client, api_version): + coordinator = ConsumerCoordinator(client, SubscriptionState(), Metrics(), 'consumer', enable_auto_commit=True, group_id='foobar', @@ -80,7 +80,7 @@ def test_group_protocols(coordinator): ] -@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)]) +@pytest.mark.parametrize('api_version', [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) def test_pattern_subscription(coordinator, api_version): coordinator.config['api_version'] = api_version coordinator._subscription.subscribe(pattern='foo') @@ -360,7 +360,8 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, warn, exc): mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning') mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') - coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), + client = KafkaClient(api_version=api_version) + coordinator = ConsumerCoordinator(client, SubscriptionState(), Metrics(), 'consumer', api_version=api_version, enable_auto_commit=enable, |