diff options
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r-- | kafka/coordinator/consumer.py | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a393d7e..d63d052 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -4,19 +4,20 @@ import copy import collections import logging import time +import weakref import six from .base import BaseCoordinator +from .assignors.range import RangePartitionAssignor from .assignors.roundrobin import RoundRobinPartitionAssignor -from .protocol import ( - ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, - ConsumerProtocol) +from .protocol import ConsumerProtocol from ..common import OffsetAndMetadata, TopicPartition from ..future import Future from ..protocol.commit import ( OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0, OffsetFetchRequest_v0, OffsetFetchRequest_v1) +from ..util import WeakMethod import kafka.common as Errors @@ -30,7 +31,7 @@ class ConsumerCoordinator(BaseCoordinator): 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': lambda offsets, response: True, - 'assignors': (RoundRobinPartitionAssignor,), + 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor), 'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000, 'retry_backoff_ms': 100, @@ -54,7 +55,7 @@ class ConsumerCoordinator(BaseCoordinator): trigger custom actions when a commit request completes. assignors (list): List of objects to use to distribute partition ownership amongst consumer instances when group management is - used. Default: [RoundRobinPartitionAssignor] + used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor] heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -82,7 +83,7 @@ class ConsumerCoordinator(BaseCoordinator): self._partitions_per_topic = {} self._cluster = client.cluster self._cluster.request_update() - self._cluster.add_listener(self._handle_metadata_update) + self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) self._auto_commit_task = None if self.config['enable_auto_commit']: @@ -94,13 +95,18 @@ class ConsumerCoordinator(BaseCoordinator): log.warning('group_id is None: disabling auto-commit.') else: interval = self.config['auto_commit_interval_ms'] / 1000.0 - self._auto_commit_task = AutoCommitTask(self, interval) + self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) # metrics=None, # metric_group_prefix=None, # metric_tags=None, # self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) + def __del__(self): + if self._auto_commit_task: + self._auto_commit_task.disable() + self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) + def protocol_type(self): return ConsumerProtocol.PROTOCOL_TYPE |