diff options
Diffstat (limited to 'test/test_coordinator.py')
-rw-r--r-- | test/test_coordinator.py | 27 |
1 files changed, 14 insertions, 13 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py index bb62b7b..3435292 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -24,24 +24,24 @@ from kafka.util import WeakMethod @pytest.fixture -def coordinator(conn): - return ConsumerCoordinator(KafkaClient(), SubscriptionState(), Metrics(), - 'consumer') +def client(conn): + return KafkaClient(api_version=(0, 9)) +@pytest.fixture +def coordinator(client): + return ConsumerCoordinator(client, SubscriptionState(), Metrics(), + 'consumer') -def test_init(conn): - cli = KafkaClient() - coordinator = ConsumerCoordinator(cli, SubscriptionState(), Metrics(), - 'consumer') +def test_init(client, coordinator): # metadata update on init - assert cli.cluster._need_update is True - assert WeakMethod(coordinator._handle_metadata_update) in cli.cluster._listeners + assert client.cluster._need_update is True + assert WeakMethod(coordinator._handle_metadata_update) in client.cluster._listeners @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) -def test_autocommit_enable_api_version(conn, api_version): - coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), +def test_autocommit_enable_api_version(client, api_version): + coordinator = ConsumerCoordinator(client, SubscriptionState(), Metrics(), 'consumer', enable_auto_commit=True, group_id='foobar', @@ -80,7 +80,7 @@ def test_group_protocols(coordinator): ] -@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)]) +@pytest.mark.parametrize('api_version', [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) def test_pattern_subscription(coordinator, api_version): coordinator.config['api_version'] = api_version coordinator._subscription.subscribe(pattern='foo') @@ -360,7 +360,8 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, warn, exc): mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning') mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') - coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), + client = KafkaClient(api_version=api_version) + coordinator = ConsumerCoordinator(client, SubscriptionState(), Metrics(), 'consumer', api_version=api_version, enable_auto_commit=enable, |