summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_assignors.py58
-rw-r--r--test/test_coordinator.py18
2 files changed, 69 insertions, 7 deletions
diff --git a/test/test_assignors.py b/test/test_assignors.py
new file mode 100644
index 0000000..e2a1d4f
--- /dev/null
+++ b/test/test_assignors.py
@@ -0,0 +1,58 @@
+# pylint: skip-file
+from __future__ import absolute_import
+
+import pytest
+
+from kafka.coordinator.assignors.range import RangePartitionAssignor
+from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.coordinator.protocol import (
+ ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
+
+
+@pytest.fixture
+def cluster(mocker):
+ cluster = mocker.MagicMock()
+ cluster.partitions_for_topic.return_value = set([0, 1, 2])
+ return cluster
+
+
+def test_assignor_roundrobin(cluster):
+ assignor = RoundRobinPartitionAssignor
+
+ member_metadata = {
+ 'C0': assignor.metadata(set(['t0', 't1'])),
+ 'C1': assignor.metadata(set(['t0', 't1'])),
+ }
+
+ ret = assignor.assign(cluster, member_metadata)
+ expected = {
+ 'C0': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [0, 2]), ('t1', [1])], b''),
+ 'C1': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [1]), ('t1', [0, 2])], b'')
+ }
+ assert ret == expected
+ assert set(ret) == set(expected)
+ for member in ret:
+ assert ret[member].encode() == expected[member].encode()
+
+
+def test_assignor_range(cluster):
+ assignor = RangePartitionAssignor
+
+ member_metadata = {
+ 'C0': assignor.metadata(set(['t0', 't1'])),
+ 'C1': assignor.metadata(set(['t0', 't1'])),
+ }
+
+ ret = assignor.assign(cluster, member_metadata)
+ expected = {
+ 'C0': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''),
+ 'C1': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [2]), ('t1', [2])], b'')
+ }
+ assert ret == expected
+ assert set(ret) == set(expected)
+ for member in ret:
+ assert ret[member].encode() == expected[member].encode()
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 80d2de2..bf48923 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -7,6 +7,7 @@ from kafka.client_async import KafkaClient
from kafka.common import TopicPartition, OffsetAndMetadata
from kafka.consumer.subscription_state import (
SubscriptionState, ConsumerRebalanceListener)
+from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.protocol import (
@@ -72,13 +73,16 @@ def test_group_protocols(coordinator):
assert False, 'Exception not raised when expected'
coordinator._subscription.subscribe(topics=['foobar'])
- assert coordinator.group_protocols() == [(
- 'roundrobin',
- ConsumerProtocolMemberMetadata(
+ assert coordinator.group_protocols() == [
+ ('range', ConsumerProtocolMemberMetadata(
+ RangePartitionAssignor.version,
+ ['foobar'],
+ b'')),
+ ('roundrobin', ConsumerProtocolMemberMetadata(
RoundRobinPartitionAssignor.version,
['foobar'],
- b'')
- )]
+ b'')),
+ ]
@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)])
@@ -113,8 +117,8 @@ def test_pattern_subscription(coordinator, api_version):
def test_lookup_assignor(coordinator):
- assignor = coordinator._lookup_assignor('roundrobin')
- assert assignor is RoundRobinPartitionAssignor
+ assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor
+ assert coordinator._lookup_assignor('range') is RangePartitionAssignor
assert coordinator._lookup_assignor('foobar') is None