diff options
Diffstat (limited to 'kafka/coordinator/abstract.py')
-rw-r--r-- | kafka/coordinator/abstract.py | 34 |
1 files changed, 17 insertions, 17 deletions
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py index 03302a3..ea5cb97 100644 --- a/kafka/coordinator/abstract.py +++ b/kafka/coordinator/abstract.py @@ -1,4 +1,5 @@ import abc +import copy import logging import time @@ -44,22 +45,24 @@ class AbstractCoordinator(object): _on_join_complete(). """ - _session_timeout_ms = 30000 - _heartbeat_interval_ms = 3000 - _retry_backoff_ms = 100 + DEFAULT_CONFIG = { + 'session_timeout_ms': 30000, + 'heartbeat_interval_ms': 3000, + 'retry_backoff_ms': 100, + } - def __init__(self, client, group_id, **kwargs): + def __init__(self, client, group_id, **configs): if not client: raise Errors.IllegalStateError('a client is required to use' ' Group Coordinator') if not group_id: raise Errors.IllegalStateError('a group_id is required to use' ' Group Coordinator') - for config in ('session_timeout_ms', - 'heartbeat_interval_ms', - 'retry_backoff_ms'): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) + + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] self._client = client self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID @@ -68,9 +71,7 @@ class AbstractCoordinator(object): self.coordinator_id = None self.rejoin_needed = True self.needs_join_prepare = True - self.heartbeat = Heartbeat( - session_timeout_ms=self._session_timeout_ms, - heartbeat_interval_ms=self._heartbeat_interval_ms) + self.heartbeat = Heartbeat(**self.config) self.heartbeat_task = HeartbeatTask(self) #self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) @@ -222,7 +223,7 @@ class AbstractCoordinator(object): continue elif not future.retriable(): raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self._retry_backoff_ms / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000.0) def _perform_group_join(self): """Join the group and return the assignment for the next generation. @@ -242,7 +243,7 @@ class AbstractCoordinator(object): log.debug("(Re-)joining group %s", self.group_id) request = JoinGroupRequest( self.group_id, - self._session_timeout_ms, + self.config['session_timeout_ms'], self.member_id, self.protocol_type(), [(protocol, @@ -492,8 +493,7 @@ class AbstractCoordinator(object): def _send_heartbeat_request(self): """Send a heartbeat request""" request = HeartbeatRequest(self.group_id, self.generation, self.member_id) - log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, - request.member_id) + log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_heartbeat_response, future) @@ -594,7 +594,7 @@ class HeartbeatTask(object): def _handle_heartbeat_failure(self, e): log.debug("Heartbeat failed; retrying") self._request_in_flight = False - etd = time.time() + self._coordinator._retry_backoff_ms / 1000.0 + etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0 self._client.schedule(self, etd) |