diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-16 12:35:28 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-16 12:41:00 -0800 |
commit | c8be93b44bb0939dd512a72be578d42a4d7426b7 (patch) | |
tree | 4ae6f29b5934995fc4d678d6461fd347eb17285d /kafka/coordinator | |
parent | d5c05c811e453c507ac6f7f85bceffc5a7ba1661 (diff) | |
download | kafka-python-range_assignor.tar.gz |
Add RangePartitionAssignor (and use as default); add assignor testsrange_assignor
Diffstat (limited to 'kafka/coordinator')
-rw-r--r-- | kafka/coordinator/assignors/range.py | 77 | ||||
-rw-r--r-- | kafka/coordinator/assignors/roundrobin.py | 18 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 5 | ||||
-rw-r--r-- | kafka/coordinator/protocol.py | 2 |
4 files changed, 98 insertions, 4 deletions
diff --git a/kafka/coordinator/assignors/range.py b/kafka/coordinator/assignors/range.py new file mode 100644 index 0000000..e4a7e33 --- /dev/null +++ b/kafka/coordinator/assignors/range.py @@ -0,0 +1,77 @@ +import collections +import logging + +import six + +from .abstract import AbstractPartitionAssignor +from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment + +log = logging.getLogger(__name__) + + +class RangePartitionAssignor(AbstractPartitionAssignor): + """ + The range assignor works on a per-topic basis. For each topic, we lay out + the available partitions in numeric order and the consumers in + lexicographic order. We then divide the number of partitions by the total + number of consumers to determine the number of partitions to assign to each + consumer. If it does not evenly divide, then the first few consumers will + have one extra partition. + + For example, suppose there are two consumers C0 and C1, two topics t0 and + t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, + t0p2, t1p0, t1p1, and t1p2. + + The assignment will be: + C0: [t0p0, t0p1, t1p0, t1p1] + C1: [t0p2, t1p2] + """ + name = 'range' + version = 0 + + @classmethod + def assign(cls, cluster, member_metadata): + consumers_per_topic = collections.defaultdict(list) + for member, metadata in six.iteritems(member_metadata): + for topic in metadata.subscription: + consumers_per_topic[topic].append(member) + + # construct {member_id: {topic: [partition, ...]}} + assignment = collections.defaultdict(dict) + + for topic, consumers_for_topic in six.iteritems(consumers_per_topic): + partitions = cluster.partitions_for_topic(topic) + if partitions is None: + log.warning('No partition metadata for topic %s', topic) + continue + partitions = sorted(list(partitions)) + partitions_for_topic = len(partitions) + consumers_for_topic.sort() + + partitions_per_consumer = len(partitions) // len(consumers_for_topic) + consumers_with_extra = len(partitions) % len(consumers_for_topic) + + for i in range(len(consumers_for_topic)): + start = partitions_per_consumer * i + start += min(i, consumers_with_extra) + length = partitions_per_consumer + if not i + 1 > consumers_with_extra: + length += 1 + member = consumers_for_topic[i] + assignment[member][topic] = partitions[start:start+length] + + protocol_assignment = {} + for member_id in member_metadata: + protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( + cls.version, + sorted(assignment[member_id].items()), + b'') + return protocol_assignment + + @classmethod + def metadata(cls, topics): + return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') + + @classmethod + def on_assignment(cls, assignment): + pass diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py index d7cd884..3fd3fd6 100644 --- a/kafka/coordinator/assignors/roundrobin.py +++ b/kafka/coordinator/assignors/roundrobin.py @@ -12,6 +12,22 @@ log = logging.getLogger(__name__) class RoundRobinPartitionAssignor(AbstractPartitionAssignor): + """ + The roundrobin assignor lays out all the available partitions and all the + available consumers. It then proceeds to do a roundrobin assignment from + partition to consumer. If the subscriptions of all consumer instances are + identical, then the partitions will be uniformly distributed. (i.e., the + partition ownership counts will be within a delta of exactly one across all + consumers.) + + For example, suppose there are two consumers C0 and C1, two topics t0 and + t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, + t0p2, t1p0, t1p1, and t1p2. + + The assignment will be: + C0: [t0p0, t0p2, t1p1] + C1: [t0p1, t1p0, t1p2] + """ name = 'roundrobin' version = 0 @@ -50,7 +66,7 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor): for member_id in member_metadata: protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( cls.version, - assignment[member_id].items(), + sorted(assignment[member_id].items()), b'') return protocol_assignment diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a393d7e..515377a 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -8,6 +8,7 @@ import time import six from .base import BaseCoordinator +from .assignors.range import RangePartitionAssignor from .assignors.roundrobin import RoundRobinPartitionAssignor from .protocol import ( ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, @@ -30,7 +31,7 @@ class ConsumerCoordinator(BaseCoordinator): 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': lambda offsets, response: True, - 'assignors': (RoundRobinPartitionAssignor,), + 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor), 'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000, 'retry_backoff_ms': 100, @@ -54,7 +55,7 @@ class ConsumerCoordinator(BaseCoordinator): trigger custom actions when a commit request completes. assignors (list): List of objects to use to distribute partition ownership amongst consumer instances when group management is - used. Default: [RoundRobinPartitionAssignor] + used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor] heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py index 9af7225..9e37397 100644 --- a/kafka/coordinator/protocol.py +++ b/kafka/coordinator/protocol.py @@ -28,6 +28,6 @@ class ConsumerProtocolMemberAssignment(Struct): class ConsumerProtocol(object): PROTOCOL_TYPE = 'consumer' - ASSIGNMENT_STRATEGIES = ('roundrobin',) + ASSIGNMENT_STRATEGIES = ('range', 'roundrobin') METADATA = ConsumerProtocolMemberMetadata ASSIGNMENT = ConsumerProtocolMemberAssignment |