diff options
Diffstat (limited to 'test/test_assignors.py')
-rw-r--r-- | test/test_assignors.py | 75 |
1 files changed, 33 insertions, 42 deletions
diff --git a/test/test_assignors.py b/test/test_assignors.py index 016ff8e..67e91e1 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -111,7 +111,7 @@ def test_sticky_assignor1(mocker): del subscriptions['C1'] member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { @@ -154,7 +154,7 @@ def test_sticky_assignor2(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, []) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { @@ -167,7 +167,7 @@ def test_sticky_assignor2(mocker): del subscriptions['C0'] member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { @@ -326,7 +326,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata( + member_metadata[member] = StickyPartitionAssignor._metadata( topics, assignment[member].partitions() if member in assignment else [] ) @@ -338,7 +338,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -367,7 +367,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { @@ -382,7 +382,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { @@ -413,7 +413,7 @@ def test_sticky_reassignment_after_one_consumer_leaves(mocker): del subscriptions['C10'] member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -435,7 +435,7 @@ def test_sticky_reassignment_after_one_consumer_added(mocker): subscriptions['C10'] = {'t'} member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata( + member_metadata[member] = StickyPartitionAssignor._metadata( topics, assignment[member].partitions() if member in assignment else [] ) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -462,7 +462,7 @@ def test_sticky_same_subscriptions(mocker): del subscriptions['C5'] member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() @@ -488,7 +488,7 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker): member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) for i in range(50): member = 'C{}'.format(randint(1, n_consumers)) @@ -517,7 +517,7 @@ def test_new_subscription(mocker): subscriptions['C0'].add('t1') member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, []) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -540,7 +540,7 @@ def test_move_existing_assignments(mocker): member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, member_assignments[member]) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, member_assignments[member]) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -570,7 +570,7 @@ def test_stickiness(mocker): del subscriptions['C1'] member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -625,7 +625,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): } member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) cluster = create_cluster(mocker, topics={}, topics_partitions={}) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -645,7 +645,7 @@ def test_conflicting_previous_assignments(mocker): member_metadata = {} for member, topics in six.iteritems(subscriptions): # assume both C1 and C2 have partition 1 assigned to them in generation 1 - member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -676,7 +676,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -687,9 +687,9 @@ def test_assignment_with_multiple_generations1(mocker): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5}) member_metadata = { - 'C1': build_metadata({'t'}, []), - 'C2': build_metadata({'t'}, []), - 'C3': build_metadata({'t'}, []), + 'C1': StickyPartitionAssignor._metadata({'t'}, []), + 'C2': StickyPartitionAssignor._metadata({'t'}, []), + 'C3': StickyPartitionAssignor._metadata({'t'}, []), } assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -699,8 +699,8 @@ def test_assignment_with_multiple_generations1(mocker): assert len(assignment1['C3'].assignment[0][1]) == 2 member_metadata = { - 'C1': build_metadata({'t'}, assignment1['C1'].partitions()), - 'C2': build_metadata({'t'}, assignment1['C2'].partitions()), + 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()), + 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions()), } assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -712,8 +712,8 @@ def test_assignment_with_multiple_generations1(mocker): assert StickyPartitionAssignor._latest_partition_movements.are_sticky() member_metadata = { - 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2), - 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1), + 'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2), + 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1), } assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -727,9 +727,9 @@ def test_assignment_with_multiple_generations2(mocker): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5}) member_metadata = { - 'C1': build_metadata({'t'}, []), - 'C2': build_metadata({'t'}, []), - 'C3': build_metadata({'t'}, []), + 'C1': StickyPartitionAssignor._metadata({'t'}, []), + 'C2': StickyPartitionAssignor._metadata({'t'}, []), + 'C3': StickyPartitionAssignor._metadata({'t'}, []), } assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -739,7 +739,7 @@ def test_assignment_with_multiple_generations2(mocker): assert len(assignment1['C3'].assignment[0][1]) == 2 member_metadata = { - 'C2': build_metadata({'t'}, assignment1['C2'].partitions(), 1), + 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1), } assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -749,9 +749,9 @@ def test_assignment_with_multiple_generations2(mocker): assert StickyPartitionAssignor._latest_partition_movements.are_sticky() member_metadata = { - 'C1': build_metadata({'t'}, assignment1['C1'].partitions(), 1), - 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2), - 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1), + 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions(), 1), + 'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2), + 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1), } assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -778,7 +778,7 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb } member_metadata = {} for member in six.iterkeys(member_assignments): - member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member]) + member_metadata[member] = StickyPartitionAssignor._metadata({'t'}, member_assignments[member], member_generations[member]) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment) @@ -788,19 +788,10 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb def make_member_metadata(subscriptions): member_metadata = {} for member, topics in six.iteritems(subscriptions): - member_metadata[member] = build_metadata(topics, []) + member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) return member_metadata -def build_metadata(topics, member_assignment_partitions, generation=-1): - partitions_by_topic = defaultdict(list) - for topic_partition in member_assignment_partitions: - partitions_by_topic[topic_partition.topic].append(topic_partition.partition) - data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation) - user_data = data.encode() - return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data) - - def assert_assignment(result_assignment, expected_assignment): assert result_assignment == expected_assignment assert set(result_assignment) == set(expected_assignment) |