summaryrefslogtreecommitdiff
path: root/kafka/coordinator/assignors
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-28 15:16:55 -0800
committerDana Powers <dana.powers@rd.io>2015-12-28 15:18:43 -0800
commit161d9ffcf8e879bf65c44ea55851c72ef0b80aa6 (patch)
tree9c7c081745f950d86649d8491fc6ccf8606e8a32 /kafka/coordinator/assignors
parentbaab076c7e70a721d958f588c4199acbaae41481 (diff)
downloadkafka-python-161d9ffcf8e879bf65c44ea55851c72ef0b80aa6.tar.gz
ConsumerCoordinator (based on upstream Java client)
- Use RoundRobinPartitionAssignor by default - Define AbstractPartitionAssignor for custom assignors - metrics still TODO
Diffstat (limited to 'kafka/coordinator/assignors')
-rw-r--r--kafka/coordinator/assignors/__init__.py0
-rw-r--r--kafka/coordinator/assignors/abstract.py35
-rw-r--r--kafka/coordinator/assignors/roundrobin.py63
3 files changed, 98 insertions, 0 deletions
diff --git a/kafka/coordinator/assignors/__init__.py b/kafka/coordinator/assignors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/kafka/coordinator/assignors/__init__.py
diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py
new file mode 100644
index 0000000..ed09a6e
--- /dev/null
+++ b/kafka/coordinator/assignors/abstract.py
@@ -0,0 +1,35 @@
+import abc
+import logging
+
+log = logging.getLogger(__name__)
+
+
+class AbstractPartitionAssignor(object):
+ """
+ Abstract assignor implementation which does some common grunt work (in particular collecting
+ partition counts which are always needed in assignors).
+ """
+
+ @abc.abstractproperty
+ def name(self):
+ """.name should be a string identifying the assignor"""
+ pass
+
+ @abc.abstractmethod
+ def assign(self, cluster, members):
+ """Perform group assignment given cluster metadata and member subscriptions
+
+ @param cluster: cluster metadata
+ @param members: {member_id: subscription}
+ @return {member_id: MemberAssignment}
+ """
+ pass
+
+ @abc.abstractmethod
+ def metadata(self, topics):
+ """return ProtocolMetadata to be submitted via JoinGroupRequest"""
+ pass
+
+ @abc.abstractmethod
+ def on_assignment(self, assignment):
+ pass
diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py
new file mode 100644
index 0000000..2927f3e
--- /dev/null
+++ b/kafka/coordinator/assignors/roundrobin.py
@@ -0,0 +1,63 @@
+import collections
+import itertools
+import logging
+
+import six
+
+from .abstract import AbstractPartitionAssignor
+from ...common import TopicPartition
+from ..consumer import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
+
+log = logging.getLogger(__name__)
+
+
+class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
+ name = 'roundrobin'
+ version = 0
+
+ @classmethod
+ def assign(cls, cluster, member_metadata):
+ all_topics = set()
+ for metadata in six.itervalues(member_metadata):
+ all_topics.update(metadata.subscription)
+
+ all_topic_partitions = []
+ for topic in all_topics:
+ partitions = cluster.partitions_for_topic(topic)
+ if partitions is None:
+ log.warning('No partition metadata for topic %s', topic)
+ continue
+ for partition in partitions:
+ all_topic_partitions.append(TopicPartition(topic, partition))
+ all_topic_partitions.sort()
+
+ # construct {member_id: {topic: [partition, ...]}}
+ assignment = collections.defaultdict(lambda: collections.defaultdict(list))
+
+ member_iter = itertools.cycle(sorted(member_metadata.keys()))
+ for partition in all_topic_partitions:
+ member_id = member_iter.next()
+
+ # Because we constructed all_topic_partitions from the set of
+ # member subscribed topics, we should be safe assuming that
+ # each topic in all_topic_partitions is in at least one member
+ # subscription; otherwise this could yield an infinite loop
+ while partition.topic not in member_metadata[member_id].subscription:
+ member_id = member_iter.next()
+ assignment[member_id][partition.topic].append(partition.partition)
+
+ protocol_assignment = {}
+ for member_id in member_metadata:
+ protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
+ cls.version,
+ assignment[member_id].items(),
+ b'')
+ return protocol_assignment
+
+ @classmethod
+ def metadata(cls, topics):
+ return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
+
+ @classmethod
+ def on_assignment(cls, assignment):
+ pass