summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py58
1 files changed, 31 insertions, 27 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 211d1d0..dd3eea0 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -1,3 +1,4 @@
+import copy
import collections
import logging
import time
@@ -45,34 +46,36 @@ class ConsumerProtocol(object):
class ConsumerCoordinator(AbstractCoordinator):
"""This class manages the coordination process with the consumer coordinator."""
- _enable_auto_commit = True
- _auto_commit_interval_ms = 5000
- _default_offset_commit_callback = lambda offsets, error: True
- _assignors = ()
- #_heartbeat_interval_ms = 3000
- #_session_timeout_ms = 30000
- #_retry_backoff_ms = 100
-
- def __init__(self, client, group_id, subscription, **kwargs):
+ DEFAULT_CONFIG = {
+ 'enable_auto_commit': True,
+ 'auto_commit_interval_ms': 5000,
+ 'default_offset_commit_callback': lambda offsets, error: True,
+ 'assignors': (),
+ 'session_timeout_ms': 30000,
+ 'heartbeat_interval_ms': 3000,
+ 'retry_backoff_ms': 100,
+ }
+
+ def __init__(self, client, group_id, subscription, **configs):
"""Initialize the coordination manager."""
- super(ConsumerCoordinator, self).__init__(client, group_id, **kwargs)
- for config in ('enable_auto_commit', 'auto_commit_interval_ms',
- 'default_offset_commit_callback', 'assignors'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
+ super(ConsumerCoordinator, self).__init__(client, group_id, **configs)
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
self._cluster = client.cluster
self._subscription = subscription
self._partitions_per_topic = {}
self._auto_commit_task = None
- if not self._assignors:
+ if not self.config['assignors']:
raise Errors.IllegalStateError('Coordinator requires assignors')
self._cluster.request_update()
self._cluster.add_listener(self._handle_metadata_update)
- if self._enable_auto_commit:
- interval = self._auto_commit_interval_ms / 1000.0
+ if self.config['enable_auto_commit']:
+ interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(self, interval)
# metrics=None,
@@ -87,7 +90,7 @@ class ConsumerCoordinator(AbstractCoordinator):
"""Returns list of preferred (protocols, metadata)"""
topics = self._subscription.subscription
metadata_list = []
- for assignor in self._assignors:
+ for assignor in self.config['assignors']:
metadata = assignor.metadata(topics)
group_protocol = (assignor.name, metadata)
metadata_list.append(group_protocol)
@@ -126,7 +129,7 @@ class ConsumerCoordinator(AbstractCoordinator):
return False
def _lookup_assignor(self, name):
- for assignor in self._assignors:
+ for assignor in self.config['assignors']:
if assignor.name == name:
return assignor
return None
@@ -152,7 +155,7 @@ class ConsumerCoordinator(AbstractCoordinator):
assignor.on_assignment(assignment)
# restart the autocommit task if needed
- if self._enable_auto_commit:
+ if self.config['enable_auto_commit']:
self._auto_commit_task.enable()
assigned = set(self._subscription.assigned_partitions())
@@ -258,7 +261,7 @@ class ConsumerCoordinator(AbstractCoordinator):
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
- time.sleep(self._retry_backoff_ms / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000.0)
def ensure_partition_assignment(self):
"""Ensure that we have a valid partition assignment from the coordinator."""
@@ -283,10 +286,11 @@ class ConsumerCoordinator(AbstractCoordinator):
Returns:
Future: indicating whether the commit was successful or not
"""
+ if callback is None:
+ callback = self.config['default_offset_commit_callback']
self._subscription.needs_fetch_committed_offsets = True
future = self._send_offset_commit_request(offsets)
- cb = callback if callback else self._default_offset_commit_callback
- future.add_both(cb, offsets)
+ future.add_both(callback, offsets)
def commit_offsets_sync(self, offsets):
"""Commit specific offsets synchronously.
@@ -314,10 +318,10 @@ class ConsumerCoordinator(AbstractCoordinator):
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
- time.sleep(self._retry_backoff_ms / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000.0)
def _maybe_auto_commit_offsets_sync(self):
- if self._enable_auto_commit:
+ if self.config['enable_auto_commit']:
# disable periodic commits prior to committing synchronously. note that they will
# be re-enabled after a rebalance completes
self._auto_commit_task.disable()
@@ -558,8 +562,8 @@ class AutoCommitTask(object):
if self._coordinator.coordinator_unknown():
log.debug("Cannot auto-commit offsets because the coordinator is"
" unknown, will retry after backoff")
- next_at = time.time() + self._coordinator._retry_backoff_ms / 1000.0
- self._client.schedule(self, next_at)
+ backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
+ self._client.schedule(self, time.time() + backoff)
return
self._request_in_flight = True