summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py20
1 files changed, 13 insertions, 7 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index a393d7e..d63d052 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -4,19 +4,20 @@ import copy
import collections
import logging
import time
+import weakref
import six
from .base import BaseCoordinator
+from .assignors.range import RangePartitionAssignor
from .assignors.roundrobin import RoundRobinPartitionAssignor
-from .protocol import (
- ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
- ConsumerProtocol)
+from .protocol import ConsumerProtocol
from ..common import OffsetAndMetadata, TopicPartition
from ..future import Future
from ..protocol.commit import (
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
+from ..util import WeakMethod
import kafka.common as Errors
@@ -30,7 +31,7 @@ class ConsumerCoordinator(BaseCoordinator):
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': lambda offsets, response: True,
- 'assignors': (RoundRobinPartitionAssignor,),
+ 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
@@ -54,7 +55,7 @@ class ConsumerCoordinator(BaseCoordinator):
trigger custom actions when a commit request completes.
assignors (list): List of objects to use to distribute partition
ownership amongst consumer instances when group management is
- used. Default: [RoundRobinPartitionAssignor]
+ used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
Kafka's group management feature. Heartbeats are used to ensure
@@ -82,7 +83,7 @@ class ConsumerCoordinator(BaseCoordinator):
self._partitions_per_topic = {}
self._cluster = client.cluster
self._cluster.request_update()
- self._cluster.add_listener(self._handle_metadata_update)
+ self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
self._auto_commit_task = None
if self.config['enable_auto_commit']:
@@ -94,13 +95,18 @@ class ConsumerCoordinator(BaseCoordinator):
log.warning('group_id is None: disabling auto-commit.')
else:
interval = self.config['auto_commit_interval_ms'] / 1000.0
- self._auto_commit_task = AutoCommitTask(self, interval)
+ self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
# metrics=None,
# metric_group_prefix=None,
# metric_tags=None,
# self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+ def __del__(self):
+ if self._auto_commit_task:
+ self._auto_commit_task.disable()
+ self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
+
def protocol_type(self):
return ConsumerProtocol.PROTOCOL_TYPE