diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-28 15:16:55 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-28 15:18:43 -0800 |
commit | 161d9ffcf8e879bf65c44ea55851c72ef0b80aa6 (patch) | |
tree | 9c7c081745f950d86649d8491fc6ccf8606e8a32 /kafka/coordinator/assignors | |
parent | baab076c7e70a721d958f588c4199acbaae41481 (diff) | |
download | kafka-python-161d9ffcf8e879bf65c44ea55851c72ef0b80aa6.tar.gz |
ConsumerCoordinator (based on upstream Java client)
- Use RoundRobinPartitionAssignor by default
- Define AbstractPartitionAssignor for custom assignors
- metrics still TODO
Diffstat (limited to 'kafka/coordinator/assignors')
-rw-r--r-- | kafka/coordinator/assignors/__init__.py | 0 | ||||
-rw-r--r-- | kafka/coordinator/assignors/abstract.py | 35 | ||||
-rw-r--r-- | kafka/coordinator/assignors/roundrobin.py | 63 |
3 files changed, 98 insertions, 0 deletions
diff --git a/kafka/coordinator/assignors/__init__.py b/kafka/coordinator/assignors/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/kafka/coordinator/assignors/__init__.py diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py new file mode 100644 index 0000000..ed09a6e --- /dev/null +++ b/kafka/coordinator/assignors/abstract.py @@ -0,0 +1,35 @@ +import abc +import logging + +log = logging.getLogger(__name__) + + +class AbstractPartitionAssignor(object): + """ + Abstract assignor implementation which does some common grunt work (in particular collecting + partition counts which are always needed in assignors). + """ + + @abc.abstractproperty + def name(self): + """.name should be a string identifying the assignor""" + pass + + @abc.abstractmethod + def assign(self, cluster, members): + """Perform group assignment given cluster metadata and member subscriptions + + @param cluster: cluster metadata + @param members: {member_id: subscription} + @return {member_id: MemberAssignment} + """ + pass + + @abc.abstractmethod + def metadata(self, topics): + """return ProtocolMetadata to be submitted via JoinGroupRequest""" + pass + + @abc.abstractmethod + def on_assignment(self, assignment): + pass diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py new file mode 100644 index 0000000..2927f3e --- /dev/null +++ b/kafka/coordinator/assignors/roundrobin.py @@ -0,0 +1,63 @@ +import collections +import itertools +import logging + +import six + +from .abstract import AbstractPartitionAssignor +from ...common import TopicPartition +from ..consumer import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment + +log = logging.getLogger(__name__) + + +class RoundRobinPartitionAssignor(AbstractPartitionAssignor): + name = 'roundrobin' + version = 0 + + @classmethod + def assign(cls, cluster, member_metadata): + all_topics = set() + for metadata in six.itervalues(member_metadata): + all_topics.update(metadata.subscription) + + all_topic_partitions = [] + for topic in all_topics: + partitions = cluster.partitions_for_topic(topic) + if partitions is None: + log.warning('No partition metadata for topic %s', topic) + continue + for partition in partitions: + all_topic_partitions.append(TopicPartition(topic, partition)) + all_topic_partitions.sort() + + # construct {member_id: {topic: [partition, ...]}} + assignment = collections.defaultdict(lambda: collections.defaultdict(list)) + + member_iter = itertools.cycle(sorted(member_metadata.keys())) + for partition in all_topic_partitions: + member_id = member_iter.next() + + # Because we constructed all_topic_partitions from the set of + # member subscribed topics, we should be safe assuming that + # each topic in all_topic_partitions is in at least one member + # subscription; otherwise this could yield an infinite loop + while partition.topic not in member_metadata[member_id].subscription: + member_id = member_iter.next() + assignment[member_id][partition.topic].append(partition.partition) + + protocol_assignment = {} + for member_id in member_metadata: + protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( + cls.version, + assignment[member_id].items(), + b'') + return protocol_assignment + + @classmethod + def metadata(cls, topics): + return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') + + @classmethod + def on_assignment(cls, assignment): + pass |