summaryrefslogtreecommitdiff
path: root/test/test_assignors.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_assignors.py')
-rw-r--r--test/test_assignors.py75
1 files changed, 33 insertions, 42 deletions
diff --git a/test/test_assignors.py b/test/test_assignors.py
index 016ff8e..67e91e1 100644
--- a/test/test_assignors.py
+++ b/test/test_assignors.py
@@ -111,7 +111,7 @@ def test_sticky_assignor1(mocker):
del subscriptions['C1']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
@@ -154,7 +154,7 @@ def test_sticky_assignor2(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, [])
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
@@ -167,7 +167,7 @@ def test_sticky_assignor2(mocker):
del subscriptions['C0']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
@@ -326,7 +326,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(
+ member_metadata[member] = StickyPartitionAssignor._metadata(
topics, assignment[member].partitions() if member in assignment else []
)
@@ -338,7 +338,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -367,7 +367,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
@@ -382,7 +382,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
@@ -413,7 +413,7 @@ def test_sticky_reassignment_after_one_consumer_leaves(mocker):
del subscriptions['C10']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -435,7 +435,7 @@ def test_sticky_reassignment_after_one_consumer_added(mocker):
subscriptions['C10'] = {'t'}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(
+ member_metadata[member] = StickyPartitionAssignor._metadata(
topics, assignment[member].partitions() if member in assignment else []
)
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -462,7 +462,7 @@ def test_sticky_same_subscriptions(mocker):
del subscriptions['C5']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
@@ -488,7 +488,7 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
for i in range(50):
member = 'C{}'.format(randint(1, n_consumers))
@@ -517,7 +517,7 @@ def test_new_subscription(mocker):
subscriptions['C0'].add('t1')
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, [])
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -540,7 +540,7 @@ def test_move_existing_assignments(mocker):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, member_assignments[member])
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, member_assignments[member])
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -570,7 +570,7 @@ def test_stickiness(mocker):
del subscriptions['C1']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -625,7 +625,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())
cluster = create_cluster(mocker, topics={}, topics_partitions={})
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -645,7 +645,7 @@ def test_conflicting_previous_assignments(mocker):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
# assume both C1 and C2 have partition 1 assigned to them in generation 1
- member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -676,7 +676,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, assignment[member].partitions())
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
@@ -687,9 +687,9 @@ def test_assignment_with_multiple_generations1(mocker):
cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
member_metadata = {
- 'C1': build_metadata({'t'}, []),
- 'C2': build_metadata({'t'}, []),
- 'C3': build_metadata({'t'}, []),
+ 'C1': StickyPartitionAssignor._metadata({'t'}, []),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, []),
+ 'C3': StickyPartitionAssignor._metadata({'t'}, []),
}
assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -699,8 +699,8 @@ def test_assignment_with_multiple_generations1(mocker):
assert len(assignment1['C3'].assignment[0][1]) == 2
member_metadata = {
- 'C1': build_metadata({'t'}, assignment1['C1'].partitions()),
- 'C2': build_metadata({'t'}, assignment1['C2'].partitions()),
+ 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions()),
}
assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -712,8 +712,8 @@ def test_assignment_with_multiple_generations1(mocker):
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
member_metadata = {
- 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
- 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2),
+ 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1),
}
assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -727,9 +727,9 @@ def test_assignment_with_multiple_generations2(mocker):
cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})
member_metadata = {
- 'C1': build_metadata({'t'}, []),
- 'C2': build_metadata({'t'}, []),
- 'C3': build_metadata({'t'}, []),
+ 'C1': StickyPartitionAssignor._metadata({'t'}, []),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, []),
+ 'C3': StickyPartitionAssignor._metadata({'t'}, []),
}
assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -739,7 +739,7 @@ def test_assignment_with_multiple_generations2(mocker):
assert len(assignment1['C3'].assignment[0][1]) == 2
member_metadata = {
- 'C2': build_metadata({'t'}, assignment1['C2'].partitions(), 1),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1),
}
assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -749,9 +749,9 @@ def test_assignment_with_multiple_generations2(mocker):
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
member_metadata = {
- 'C1': build_metadata({'t'}, assignment1['C1'].partitions(), 1),
- 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
- 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
+ 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions(), 1),
+ 'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2),
+ 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1),
}
assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
@@ -778,7 +778,7 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
}
member_metadata = {}
for member in six.iterkeys(member_assignments):
- member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member])
+ member_metadata[member] = StickyPartitionAssignor._metadata({'t'}, member_assignments[member], member_generations[member])
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment)
@@ -788,19 +788,10 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
def make_member_metadata(subscriptions):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
- member_metadata[member] = build_metadata(topics, [])
+ member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])
return member_metadata
-def build_metadata(topics, member_assignment_partitions, generation=-1):
- partitions_by_topic = defaultdict(list)
- for topic_partition in member_assignment_partitions:
- partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
- data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
- user_data = data.encode()
- return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data)
-
-
def assert_assignment(result_assignment, expected_assignment):
assert result_assignment == expected_assignment
assert set(result_assignment) == set(expected_assignment)