diff options
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r-- | kafka/coordinator/consumer.py | 58 |
1 files changed, 31 insertions, 27 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 211d1d0..dd3eea0 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -1,3 +1,4 @@ +import copy import collections import logging import time @@ -45,34 +46,36 @@ class ConsumerProtocol(object): class ConsumerCoordinator(AbstractCoordinator): """This class manages the coordination process with the consumer coordinator.""" - _enable_auto_commit = True - _auto_commit_interval_ms = 5000 - _default_offset_commit_callback = lambda offsets, error: True - _assignors = () - #_heartbeat_interval_ms = 3000 - #_session_timeout_ms = 30000 - #_retry_backoff_ms = 100 - - def __init__(self, client, group_id, subscription, **kwargs): + DEFAULT_CONFIG = { + 'enable_auto_commit': True, + 'auto_commit_interval_ms': 5000, + 'default_offset_commit_callback': lambda offsets, error: True, + 'assignors': (), + 'session_timeout_ms': 30000, + 'heartbeat_interval_ms': 3000, + 'retry_backoff_ms': 100, + } + + def __init__(self, client, group_id, subscription, **configs): """Initialize the coordination manager.""" - super(ConsumerCoordinator, self).__init__(client, group_id, **kwargs) - for config in ('enable_auto_commit', 'auto_commit_interval_ms', - 'default_offset_commit_callback', 'assignors'): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) + super(ConsumerCoordinator, self).__init__(client, group_id, **configs) + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] self._cluster = client.cluster self._subscription = subscription self._partitions_per_topic = {} self._auto_commit_task = None - if not self._assignors: + if not self.config['assignors']: raise Errors.IllegalStateError('Coordinator requires assignors') self._cluster.request_update() self._cluster.add_listener(self._handle_metadata_update) - if self._enable_auto_commit: - interval = self._auto_commit_interval_ms / 1000.0 + if self.config['enable_auto_commit']: + interval = self.config['auto_commit_interval_ms'] / 1000.0 self._auto_commit_task = AutoCommitTask(self, interval) # metrics=None, @@ -87,7 +90,7 @@ class ConsumerCoordinator(AbstractCoordinator): """Returns list of preferred (protocols, metadata)""" topics = self._subscription.subscription metadata_list = [] - for assignor in self._assignors: + for assignor in self.config['assignors']: metadata = assignor.metadata(topics) group_protocol = (assignor.name, metadata) metadata_list.append(group_protocol) @@ -126,7 +129,7 @@ class ConsumerCoordinator(AbstractCoordinator): return False def _lookup_assignor(self, name): - for assignor in self._assignors: + for assignor in self.config['assignors']: if assignor.name == name: return assignor return None @@ -152,7 +155,7 @@ class ConsumerCoordinator(AbstractCoordinator): assignor.on_assignment(assignment) # restart the autocommit task if needed - if self._enable_auto_commit: + if self.config['enable_auto_commit']: self._auto_commit_task.enable() assigned = set(self._subscription.assigned_partitions()) @@ -258,7 +261,7 @@ class ConsumerCoordinator(AbstractCoordinator): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self._retry_backoff_ms / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000.0) def ensure_partition_assignment(self): """Ensure that we have a valid partition assignment from the coordinator.""" @@ -283,10 +286,11 @@ class ConsumerCoordinator(AbstractCoordinator): Returns: Future: indicating whether the commit was successful or not """ + if callback is None: + callback = self.config['default_offset_commit_callback'] self._subscription.needs_fetch_committed_offsets = True future = self._send_offset_commit_request(offsets) - cb = callback if callback else self._default_offset_commit_callback - future.add_both(cb, offsets) + future.add_both(callback, offsets) def commit_offsets_sync(self, offsets): """Commit specific offsets synchronously. @@ -314,10 +318,10 @@ class ConsumerCoordinator(AbstractCoordinator): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self._retry_backoff_ms / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000.0) def _maybe_auto_commit_offsets_sync(self): - if self._enable_auto_commit: + if self.config['enable_auto_commit']: # disable periodic commits prior to committing synchronously. note that they will # be re-enabled after a rebalance completes self._auto_commit_task.disable() @@ -558,8 +562,8 @@ class AutoCommitTask(object): if self._coordinator.coordinator_unknown(): log.debug("Cannot auto-commit offsets because the coordinator is" " unknown, will retry after backoff") - next_at = time.time() + self._coordinator._retry_backoff_ms / 1000.0 - self._client.schedule(self, next_at) + backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0 + self._client.schedule(self, time.time() + backoff) return self._request_in_flight = True |