diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_assignors.py | 58 | ||||
-rw-r--r-- | test/test_coordinator.py | 18 |
2 files changed, 69 insertions, 7 deletions
diff --git a/test/test_assignors.py b/test/test_assignors.py new file mode 100644 index 0000000..e2a1d4f --- /dev/null +++ b/test/test_assignors.py @@ -0,0 +1,58 @@ +# pylint: skip-file +from __future__ import absolute_import + +import pytest + +from kafka.coordinator.assignors.range import RangePartitionAssignor +from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.coordinator.protocol import ( + ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) + + +@pytest.fixture +def cluster(mocker): + cluster = mocker.MagicMock() + cluster.partitions_for_topic.return_value = set([0, 1, 2]) + return cluster + + +def test_assignor_roundrobin(cluster): + assignor = RoundRobinPartitionAssignor + + member_metadata = { + 'C0': assignor.metadata(set(['t0', 't1'])), + 'C1': assignor.metadata(set(['t0', 't1'])), + } + + ret = assignor.assign(cluster, member_metadata) + expected = { + 'C0': ConsumerProtocolMemberAssignment( + assignor.version, [('t0', [0, 2]), ('t1', [1])], b''), + 'C1': ConsumerProtocolMemberAssignment( + assignor.version, [('t0', [1]), ('t1', [0, 2])], b'') + } + assert ret == expected + assert set(ret) == set(expected) + for member in ret: + assert ret[member].encode() == expected[member].encode() + + +def test_assignor_range(cluster): + assignor = RangePartitionAssignor + + member_metadata = { + 'C0': assignor.metadata(set(['t0', 't1'])), + 'C1': assignor.metadata(set(['t0', 't1'])), + } + + ret = assignor.assign(cluster, member_metadata) + expected = { + 'C0': ConsumerProtocolMemberAssignment( + assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''), + 'C1': ConsumerProtocolMemberAssignment( + assignor.version, [('t0', [2]), ('t1', [2])], b'') + } + assert ret == expected + assert set(ret) == set(expected) + for member in ret: + assert ret[member].encode() == expected[member].encode() diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 80d2de2..bf48923 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -7,6 +7,7 @@ from kafka.client_async import KafkaClient from kafka.common import TopicPartition, OffsetAndMetadata from kafka.consumer.subscription_state import ( SubscriptionState, ConsumerRebalanceListener) +from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.protocol import ( @@ -72,13 +73,16 @@ def test_group_protocols(coordinator): assert False, 'Exception not raised when expected' coordinator._subscription.subscribe(topics=['foobar']) - assert coordinator.group_protocols() == [( - 'roundrobin', - ConsumerProtocolMemberMetadata( + assert coordinator.group_protocols() == [ + ('range', ConsumerProtocolMemberMetadata( + RangePartitionAssignor.version, + ['foobar'], + b'')), + ('roundrobin', ConsumerProtocolMemberMetadata( RoundRobinPartitionAssignor.version, ['foobar'], - b'') - )] + b'')), + ] @pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)]) @@ -113,8 +117,8 @@ def test_pattern_subscription(coordinator, api_version): def test_lookup_assignor(coordinator): - assignor = coordinator._lookup_assignor('roundrobin') - assert assignor is RoundRobinPartitionAssignor + assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor + assert coordinator._lookup_assignor('range') is RangePartitionAssignor assert coordinator._lookup_assignor('foobar') is None |