diff options
Diffstat (limited to 'kafka/coordinator/assignors')
-rw-r--r-- | kafka/coordinator/assignors/range.py | 77 | ||||
-rw-r--r-- | kafka/coordinator/assignors/roundrobin.py | 18 |
2 files changed, 94 insertions, 1 deletions
diff --git a/kafka/coordinator/assignors/range.py b/kafka/coordinator/assignors/range.py new file mode 100644 index 0000000..e4a7e33 --- /dev/null +++ b/kafka/coordinator/assignors/range.py @@ -0,0 +1,77 @@ +import collections +import logging + +import six + +from .abstract import AbstractPartitionAssignor +from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment + +log = logging.getLogger(__name__) + + +class RangePartitionAssignor(AbstractPartitionAssignor): + """ + The range assignor works on a per-topic basis. For each topic, we lay out + the available partitions in numeric order and the consumers in + lexicographic order. We then divide the number of partitions by the total + number of consumers to determine the number of partitions to assign to each + consumer. If it does not evenly divide, then the first few consumers will + have one extra partition. + + For example, suppose there are two consumers C0 and C1, two topics t0 and + t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, + t0p2, t1p0, t1p1, and t1p2. + + The assignment will be: + C0: [t0p0, t0p1, t1p0, t1p1] + C1: [t0p2, t1p2] + """ + name = 'range' + version = 0 + + @classmethod + def assign(cls, cluster, member_metadata): + consumers_per_topic = collections.defaultdict(list) + for member, metadata in six.iteritems(member_metadata): + for topic in metadata.subscription: + consumers_per_topic[topic].append(member) + + # construct {member_id: {topic: [partition, ...]}} + assignment = collections.defaultdict(dict) + + for topic, consumers_for_topic in six.iteritems(consumers_per_topic): + partitions = cluster.partitions_for_topic(topic) + if partitions is None: + log.warning('No partition metadata for topic %s', topic) + continue + partitions = sorted(list(partitions)) + partitions_for_topic = len(partitions) + consumers_for_topic.sort() + + partitions_per_consumer = len(partitions) // len(consumers_for_topic) + consumers_with_extra = len(partitions) % len(consumers_for_topic) + + for i in range(len(consumers_for_topic)): + start = partitions_per_consumer * i + start += min(i, consumers_with_extra) + length = partitions_per_consumer + if not i + 1 > consumers_with_extra: + length += 1 + member = consumers_for_topic[i] + assignment[member][topic] = partitions[start:start+length] + + protocol_assignment = {} + for member_id in member_metadata: + protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( + cls.version, + sorted(assignment[member_id].items()), + b'') + return protocol_assignment + + @classmethod + def metadata(cls, topics): + return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') + + @classmethod + def on_assignment(cls, assignment): + pass diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py index d7cd884..3fd3fd6 100644 --- a/kafka/coordinator/assignors/roundrobin.py +++ b/kafka/coordinator/assignors/roundrobin.py @@ -12,6 +12,22 @@ log = logging.getLogger(__name__) class RoundRobinPartitionAssignor(AbstractPartitionAssignor): + """ + The roundrobin assignor lays out all the available partitions and all the + available consumers. It then proceeds to do a roundrobin assignment from + partition to consumer. If the subscriptions of all consumer instances are + identical, then the partitions will be uniformly distributed. (i.e., the + partition ownership counts will be within a delta of exactly one across all + consumers.) + + For example, suppose there are two consumers C0 and C1, two topics t0 and + t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, + t0p2, t1p0, t1p1, and t1p2. + + The assignment will be: + C0: [t0p0, t0p2, t1p1] + C1: [t0p1, t1p0, t1p2] + """ name = 'roundrobin' version = 0 @@ -50,7 +66,7 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor): for member_id in member_metadata: protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( cls.version, - assignment[member_id].items(), + sorted(assignment[member_id].items()), b'') return protocol_assignment |