diff options
-rw-r--r-- | kafka/client_async.py | 4 | ||||
-rw-r--r-- | kafka/consumer/group.py | 6 | ||||
-rw-r--r-- | kafka/coordinator/assignors/range.py | 77 | ||||
-rw-r--r-- | kafka/coordinator/assignors/roundrobin.py | 18 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 12 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 20 | ||||
-rw-r--r-- | kafka/coordinator/protocol.py | 2 | ||||
-rw-r--r-- | kafka/util.py | 37 | ||||
-rw-r--r-- | test/test_assignors.py | 58 | ||||
-rw-r--r-- | test/test_coordinator.py | 21 |
10 files changed, 235 insertions, 20 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index cb8152a..973ece0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -99,6 +99,10 @@ class KafkaClient(object): self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._wake_r, self._wake_w = os.pipe() + def __del__(self): + os.close(self._wake_r) + os.close(self._wake_w) + def _bootstrap(self, hosts): # Exponential backoff if bootstrap fails backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 30abe00..cf77df3 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -10,6 +10,7 @@ from kafka.client_async import KafkaClient from kafka.consumer.fetcher import Fetcher from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator +from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.protocol.offset import OffsetResetStrategy from kafka.version import __version__ @@ -98,7 +99,8 @@ class KafkaConsumer(six.Iterator): brokers or partitions. Default: 300000 partition_assignment_strategy (list): List of objects to use to distribute partition ownership amongst consumer instances when - group management is used. Default: [RoundRobinPartitionAssignor] + group management is used. + Default: [RangePartitionAssignor, RoundRobinPartitionAssignor] heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -148,7 +150,7 @@ class KafkaConsumer(six.Iterator): 'auto_commit_interval_ms': 5000, 'check_crcs': True, 'metadata_max_age_ms': 5 * 60 * 1000, - 'partition_assignment_strategy': (RoundRobinPartitionAssignor,), + 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor), 'heartbeat_interval_ms': 3000, 'session_timeout_ms': 30000, 'send_buffer_bytes': None, diff --git a/kafka/coordinator/assignors/range.py b/kafka/coordinator/assignors/range.py new file mode 100644 index 0000000..e4a7e33 --- /dev/null +++ b/kafka/coordinator/assignors/range.py @@ -0,0 +1,77 @@ +import collections +import logging + +import six + +from .abstract import AbstractPartitionAssignor +from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment + +log = logging.getLogger(__name__) + + +class RangePartitionAssignor(AbstractPartitionAssignor): + """ + The range assignor works on a per-topic basis. For each topic, we lay out + the available partitions in numeric order and the consumers in + lexicographic order. We then divide the number of partitions by the total + number of consumers to determine the number of partitions to assign to each + consumer. If it does not evenly divide, then the first few consumers will + have one extra partition. + + For example, suppose there are two consumers C0 and C1, two topics t0 and + t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, + t0p2, t1p0, t1p1, and t1p2. + + The assignment will be: + C0: [t0p0, t0p1, t1p0, t1p1] + C1: [t0p2, t1p2] + """ + name = 'range' + version = 0 + + @classmethod + def assign(cls, cluster, member_metadata): + consumers_per_topic = collections.defaultdict(list) + for member, metadata in six.iteritems(member_metadata): + for topic in metadata.subscription: + consumers_per_topic[topic].append(member) + + # construct {member_id: {topic: [partition, ...]}} + assignment = collections.defaultdict(dict) + + for topic, consumers_for_topic in six.iteritems(consumers_per_topic): + partitions = cluster.partitions_for_topic(topic) + if partitions is None: + log.warning('No partition metadata for topic %s', topic) + continue + partitions = sorted(list(partitions)) + partitions_for_topic = len(partitions) + consumers_for_topic.sort() + + partitions_per_consumer = len(partitions) // len(consumers_for_topic) + consumers_with_extra = len(partitions) % len(consumers_for_topic) + + for i in range(len(consumers_for_topic)): + start = partitions_per_consumer * i + start += min(i, consumers_with_extra) + length = partitions_per_consumer + if not i + 1 > consumers_with_extra: + length += 1 + member = consumers_for_topic[i] + assignment[member][topic] = partitions[start:start+length] + + protocol_assignment = {} + for member_id in member_metadata: + protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( + cls.version, + sorted(assignment[member_id].items()), + b'') + return protocol_assignment + + @classmethod + def metadata(cls, topics): + return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') + + @classmethod + def on_assignment(cls, assignment): + pass diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py index d7cd884..3fd3fd6 100644 --- a/kafka/coordinator/assignors/roundrobin.py +++ b/kafka/coordinator/assignors/roundrobin.py @@ -12,6 +12,22 @@ log = logging.getLogger(__name__) class RoundRobinPartitionAssignor(AbstractPartitionAssignor): + """ + The roundrobin assignor lays out all the available partitions and all the + available consumers. It then proceeds to do a roundrobin assignment from + partition to consumer. If the subscriptions of all consumer instances are + identical, then the partitions will be uniformly distributed. (i.e., the + partition ownership counts will be within a delta of exactly one across all + consumers.) + + For example, suppose there are two consumers C0 and C1, two topics t0 and + t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, + t0p2, t1p0, t1p1, and t1p2. + + The assignment will be: + C0: [t0p0, t0p2, t1p1] + C1: [t0p1, t1p0, t1p2] + """ name = 'roundrobin' version = 0 @@ -50,7 +66,7 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor): for member_id in member_metadata: protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( cls.version, - assignment[member_id].items(), + sorted(assignment[member_id].items()), b'') return protocol_assignment diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 6efdfd0..c49c38b 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -2,6 +2,7 @@ import abc import copy import logging import time +import weakref import six @@ -85,9 +86,12 @@ class BaseCoordinator(object): self.rejoin_needed = True self.needs_join_prepare = True self.heartbeat = Heartbeat(**self.config) - self.heartbeat_task = HeartbeatTask(self) + self.heartbeat_task = HeartbeatTask(weakref.proxy(self)) #self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) + def __del__(self): + self.heartbeat_task.disable() + @abc.abstractmethod def protocol_type(self): """ @@ -572,6 +576,12 @@ class HeartbeatTask(object): self._client = coordinator._client self._request_in_flight = False + def disable(self): + try: + self._client.unschedule(self) + except KeyError: + pass + def reset(self): # start or restart the heartbeat task to be executed at the next chance self._heartbeat.reset_session_timeout() diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a393d7e..d63d052 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -4,19 +4,20 @@ import copy import collections import logging import time +import weakref import six from .base import BaseCoordinator +from .assignors.range import RangePartitionAssignor from .assignors.roundrobin import RoundRobinPartitionAssignor -from .protocol import ( - ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, - ConsumerProtocol) +from .protocol import ConsumerProtocol from ..common import OffsetAndMetadata, TopicPartition from ..future import Future from ..protocol.commit import ( OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0, OffsetFetchRequest_v0, OffsetFetchRequest_v1) +from ..util import WeakMethod import kafka.common as Errors @@ -30,7 +31,7 @@ class ConsumerCoordinator(BaseCoordinator): 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': lambda offsets, response: True, - 'assignors': (RoundRobinPartitionAssignor,), + 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor), 'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000, 'retry_backoff_ms': 100, @@ -54,7 +55,7 @@ class ConsumerCoordinator(BaseCoordinator): trigger custom actions when a commit request completes. assignors (list): List of objects to use to distribute partition ownership amongst consumer instances when group management is - used. Default: [RoundRobinPartitionAssignor] + used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor] heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -82,7 +83,7 @@ class ConsumerCoordinator(BaseCoordinator): self._partitions_per_topic = {} self._cluster = client.cluster self._cluster.request_update() - self._cluster.add_listener(self._handle_metadata_update) + self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) self._auto_commit_task = None if self.config['enable_auto_commit']: @@ -94,13 +95,18 @@ class ConsumerCoordinator(BaseCoordinator): log.warning('group_id is None: disabling auto-commit.') else: interval = self.config['auto_commit_interval_ms'] / 1000.0 - self._auto_commit_task = AutoCommitTask(self, interval) + self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) # metrics=None, # metric_group_prefix=None, # metric_tags=None, # self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) + def __del__(self): + if self._auto_commit_task: + self._auto_commit_task.disable() + self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) + def protocol_type(self): return ConsumerProtocol.PROTOCOL_TYPE diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py index 9af7225..9e37397 100644 --- a/kafka/coordinator/protocol.py +++ b/kafka/coordinator/protocol.py @@ -28,6 +28,6 @@ class ConsumerProtocolMemberAssignment(Struct): class ConsumerProtocol(object): PROTOCOL_TYPE = 'consumer' - ASSIGNMENT_STRATEGIES = ('roundrobin',) + ASSIGNMENT_STRATEGIES = ('range', 'roundrobin') METADATA = ConsumerProtocolMemberMetadata ASSIGNMENT = ConsumerProtocolMemberAssignment diff --git a/kafka/util.py b/kafka/util.py index c6e77fa..7a11910 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -3,6 +3,7 @@ import collections import struct import sys from threading import Thread, Event +import weakref import six @@ -151,3 +152,39 @@ class ReentrantTimer(object): def __del__(self): self.stop() + + +class WeakMethod(object): + """ + Callable that weakly references a method and the object it is bound to. It + is based on http://stackoverflow.com/a/24287465. + + Arguments: + + object_dot_method: A bound instance method (i.e. 'object.method'). + """ + def __init__(self, object_dot_method): + try: + self.target = weakref.ref(object_dot_method.__self__) + except AttributeError: + self.target = weakref.ref(object_dot_method.im_self) + self._target_id = id(self.target()) + try: + self.method = weakref.ref(object_dot_method.__func__) + except AttributeError: + self.method = weakref.ref(object_dot_method.im_func) + self._method_id = id(self.method()) + + def __call__(self, *args, **kwargs): + """ + Calls the method on target with args and kwargs. + """ + return self.method()(self.target(), *args, **kwargs) + + def __hash__(self): + return hash(self.target) ^ hash(self.method) + + def __eq__(self, other): + if not isinstance(other, WeakMethod): + return False + return self._target_id == other._target_id and self._method_id == other._method_id diff --git a/test/test_assignors.py b/test/test_assignors.py new file mode 100644 index 0000000..e2a1d4f --- /dev/null +++ b/test/test_assignors.py @@ -0,0 +1,58 @@ +# pylint: skip-file +from __future__ import absolute_import + +import pytest + +from kafka.coordinator.assignors.range import RangePartitionAssignor +from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.coordinator.protocol import ( + ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) + + +@pytest.fixture +def cluster(mocker): + cluster = mocker.MagicMock() + cluster.partitions_for_topic.return_value = set([0, 1, 2]) + return cluster + + +def test_assignor_roundrobin(cluster): + assignor = RoundRobinPartitionAssignor + + member_metadata = { + 'C0': assignor.metadata(set(['t0', 't1'])), + 'C1': assignor.metadata(set(['t0', 't1'])), + } + + ret = assignor.assign(cluster, member_metadata) + expected = { + 'C0': ConsumerProtocolMemberAssignment( + assignor.version, [('t0', [0, 2]), ('t1', [1])], b''), + 'C1': ConsumerProtocolMemberAssignment( + assignor.version, [('t0', [1]), ('t1', [0, 2])], b'') + } + assert ret == expected + assert set(ret) == set(expected) + for member in ret: + assert ret[member].encode() == expected[member].encode() + + +def test_assignor_range(cluster): + assignor = RangePartitionAssignor + + member_metadata = { + 'C0': assignor.metadata(set(['t0', 't1'])), + 'C1': assignor.metadata(set(['t0', 't1'])), + } + + ret = assignor.assign(cluster, member_metadata) + expected = { + 'C0': ConsumerProtocolMemberAssignment( + assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''), + 'C1': ConsumerProtocolMemberAssignment( + assignor.version, [('t0', [2]), ('t1', [2])], b'') + } + assert ret == expected + assert set(ret) == set(expected) + for member in ret: + assert ret[member].encode() == expected[member].encode() diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 80d2de2..e0906c7 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -7,6 +7,7 @@ from kafka.client_async import KafkaClient from kafka.common import TopicPartition, OffsetAndMetadata from kafka.consumer.subscription_state import ( SubscriptionState, ConsumerRebalanceListener) +from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.protocol import ( @@ -18,6 +19,7 @@ from kafka.protocol.commit import ( OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1, OffsetFetchResponse) from kafka.protocol.metadata import MetadataResponse +from kafka.util import WeakMethod import kafka.common as Errors @@ -45,7 +47,7 @@ def test_init(conn): # metadata update on init assert cli.cluster._need_update is True - assert coordinator._handle_metadata_update in cli.cluster._listeners + assert WeakMethod(coordinator._handle_metadata_update) in cli.cluster._listeners @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) @@ -72,13 +74,16 @@ def test_group_protocols(coordinator): assert False, 'Exception not raised when expected' coordinator._subscription.subscribe(topics=['foobar']) - assert coordinator.group_protocols() == [( - 'roundrobin', - ConsumerProtocolMemberMetadata( + assert coordinator.group_protocols() == [ + ('range', ConsumerProtocolMemberMetadata( + RangePartitionAssignor.version, + ['foobar'], + b'')), + ('roundrobin', ConsumerProtocolMemberMetadata( RoundRobinPartitionAssignor.version, ['foobar'], - b'') - )] + b'')), + ] @pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)]) @@ -113,8 +118,8 @@ def test_pattern_subscription(coordinator, api_version): def test_lookup_assignor(coordinator): - assignor = coordinator._lookup_assignor('roundrobin') - assert assignor is RoundRobinPartitionAssignor + assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor + assert coordinator._lookup_assignor('range') is RangePartitionAssignor assert coordinator._lookup_assignor('foobar') is None |