summaryrefslogtreecommitdiff
path: root/test/test_coordinator.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 10:29:37 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-16 12:07:26 -0700
commit2c4b9cdf2cc18ccd40d2489e74766c6b702c2725 (patch)
tree72be396969e5f04509787f496219c0c8b298336b /test/test_coordinator.py
parent061cb4e83469166873912fca2aac62ca8376377f (diff)
downloadkafka-python-client_api_version.tar.gz
Add api_version config to KafkaClient, deprecate str in favor of tuplesclient_api_version
Diffstat (limited to 'test/test_coordinator.py')
-rw-r--r--test/test_coordinator.py27
1 files changed, 14 insertions, 13 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index bb62b7b..3435292 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -24,24 +24,24 @@ from kafka.util import WeakMethod
@pytest.fixture
-def coordinator(conn):
- return ConsumerCoordinator(KafkaClient(), SubscriptionState(), Metrics(),
- 'consumer')
+def client(conn):
+ return KafkaClient(api_version=(0, 9))
+@pytest.fixture
+def coordinator(client):
+ return ConsumerCoordinator(client, SubscriptionState(), Metrics(),
+ 'consumer')
-def test_init(conn):
- cli = KafkaClient()
- coordinator = ConsumerCoordinator(cli, SubscriptionState(), Metrics(),
- 'consumer')
+def test_init(client, coordinator):
# metadata update on init
- assert cli.cluster._need_update is True
- assert WeakMethod(coordinator._handle_metadata_update) in cli.cluster._listeners
+ assert client.cluster._need_update is True
+ assert WeakMethod(coordinator._handle_metadata_update) in client.cluster._listeners
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
-def test_autocommit_enable_api_version(conn, api_version):
- coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(),
+def test_autocommit_enable_api_version(client, api_version):
+ coordinator = ConsumerCoordinator(client, SubscriptionState(),
Metrics(), 'consumer',
enable_auto_commit=True,
group_id='foobar',
@@ -80,7 +80,7 @@ def test_group_protocols(coordinator):
]
-@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)])
+@pytest.mark.parametrize('api_version', [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
def test_pattern_subscription(coordinator, api_version):
coordinator.config['api_version'] = api_version
coordinator._subscription.subscribe(pattern='foo')
@@ -360,7 +360,8 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
warn, exc):
mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning')
mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
- coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(),
+ client = KafkaClient(api_version=api_version)
+ coordinator = ConsumerCoordinator(client, SubscriptionState(),
Metrics(), 'consumer',
api_version=api_version,
enable_auto_commit=enable,