summaryrefslogtreecommitdiff
path: root/kafka/coordinator
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-16 12:35:28 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-16 12:41:00 -0800
commitc8be93b44bb0939dd512a72be578d42a4d7426b7 (patch)
tree4ae6f29b5934995fc4d678d6461fd347eb17285d /kafka/coordinator
parentd5c05c811e453c507ac6f7f85bceffc5a7ba1661 (diff)
downloadkafka-python-range_assignor.tar.gz
Add RangePartitionAssignor (and use as default); add assignor testsrange_assignor
Diffstat (limited to 'kafka/coordinator')
-rw-r--r--kafka/coordinator/assignors/range.py77
-rw-r--r--kafka/coordinator/assignors/roundrobin.py18
-rw-r--r--kafka/coordinator/consumer.py5
-rw-r--r--kafka/coordinator/protocol.py2
4 files changed, 98 insertions, 4 deletions
diff --git a/kafka/coordinator/assignors/range.py b/kafka/coordinator/assignors/range.py
new file mode 100644
index 0000000..e4a7e33
--- /dev/null
+++ b/kafka/coordinator/assignors/range.py
@@ -0,0 +1,77 @@
+import collections
+import logging
+
+import six
+
+from .abstract import AbstractPartitionAssignor
+from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
+
+log = logging.getLogger(__name__)
+
+
+class RangePartitionAssignor(AbstractPartitionAssignor):
+ """
+ The range assignor works on a per-topic basis. For each topic, we lay out
+ the available partitions in numeric order and the consumers in
+ lexicographic order. We then divide the number of partitions by the total
+ number of consumers to determine the number of partitions to assign to each
+ consumer. If it does not evenly divide, then the first few consumers will
+ have one extra partition.
+
+ For example, suppose there are two consumers C0 and C1, two topics t0 and
+ t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
+ t0p2, t1p0, t1p1, and t1p2.
+
+ The assignment will be:
+ C0: [t0p0, t0p1, t1p0, t1p1]
+ C1: [t0p2, t1p2]
+ """
+ name = 'range'
+ version = 0
+
+ @classmethod
+ def assign(cls, cluster, member_metadata):
+ consumers_per_topic = collections.defaultdict(list)
+ for member, metadata in six.iteritems(member_metadata):
+ for topic in metadata.subscription:
+ consumers_per_topic[topic].append(member)
+
+ # construct {member_id: {topic: [partition, ...]}}
+ assignment = collections.defaultdict(dict)
+
+ for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
+ partitions = cluster.partitions_for_topic(topic)
+ if partitions is None:
+ log.warning('No partition metadata for topic %s', topic)
+ continue
+ partitions = sorted(list(partitions))
+ partitions_for_topic = len(partitions)
+ consumers_for_topic.sort()
+
+ partitions_per_consumer = len(partitions) // len(consumers_for_topic)
+ consumers_with_extra = len(partitions) % len(consumers_for_topic)
+
+ for i in range(len(consumers_for_topic)):
+ start = partitions_per_consumer * i
+ start += min(i, consumers_with_extra)
+ length = partitions_per_consumer
+ if not i + 1 > consumers_with_extra:
+ length += 1
+ member = consumers_for_topic[i]
+ assignment[member][topic] = partitions[start:start+length]
+
+ protocol_assignment = {}
+ for member_id in member_metadata:
+ protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
+ cls.version,
+ sorted(assignment[member_id].items()),
+ b'')
+ return protocol_assignment
+
+ @classmethod
+ def metadata(cls, topics):
+ return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
+
+ @classmethod
+ def on_assignment(cls, assignment):
+ pass
diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py
index d7cd884..3fd3fd6 100644
--- a/kafka/coordinator/assignors/roundrobin.py
+++ b/kafka/coordinator/assignors/roundrobin.py
@@ -12,6 +12,22 @@ log = logging.getLogger(__name__)
class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
+ """
+ The roundrobin assignor lays out all the available partitions and all the
+ available consumers. It then proceeds to do a roundrobin assignment from
+ partition to consumer. If the subscriptions of all consumer instances are
+ identical, then the partitions will be uniformly distributed. (i.e., the
+ partition ownership counts will be within a delta of exactly one across all
+ consumers.)
+
+ For example, suppose there are two consumers C0 and C1, two topics t0 and
+ t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
+ t0p2, t1p0, t1p1, and t1p2.
+
+ The assignment will be:
+ C0: [t0p0, t0p2, t1p1]
+ C1: [t0p1, t1p0, t1p2]
+ """
name = 'roundrobin'
version = 0
@@ -50,7 +66,7 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
cls.version,
- assignment[member_id].items(),
+ sorted(assignment[member_id].items()),
b'')
return protocol_assignment
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index a393d7e..515377a 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -8,6 +8,7 @@ import time
import six
from .base import BaseCoordinator
+from .assignors.range import RangePartitionAssignor
from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
@@ -30,7 +31,7 @@ class ConsumerCoordinator(BaseCoordinator):
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': lambda offsets, response: True,
- 'assignors': (RoundRobinPartitionAssignor,),
+ 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
@@ -54,7 +55,7 @@ class ConsumerCoordinator(BaseCoordinator):
trigger custom actions when a commit request completes.
assignors (list): List of objects to use to distribute partition
ownership amongst consumer instances when group management is
- used. Default: [RoundRobinPartitionAssignor]
+ used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
Kafka's group management feature. Heartbeats are used to ensure
diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py
index 9af7225..9e37397 100644
--- a/kafka/coordinator/protocol.py
+++ b/kafka/coordinator/protocol.py
@@ -28,6 +28,6 @@ class ConsumerProtocolMemberAssignment(Struct):
class ConsumerProtocol(object):
PROTOCOL_TYPE = 'consumer'
- ASSIGNMENT_STRATEGIES = ('roundrobin',)
+ ASSIGNMENT_STRATEGIES = ('range', 'roundrobin')
METADATA = ConsumerProtocolMemberMetadata
ASSIGNMENT = ConsumerProtocolMemberAssignment