summaryrefslogtreecommitdiff
path: root/kafka/coordinator/abstract.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/abstract.py')
-rw-r--r--kafka/coordinator/abstract.py34
1 files changed, 17 insertions, 17 deletions
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py
index 03302a3..ea5cb97 100644
--- a/kafka/coordinator/abstract.py
+++ b/kafka/coordinator/abstract.py
@@ -1,4 +1,5 @@
import abc
+import copy
import logging
import time
@@ -44,22 +45,24 @@ class AbstractCoordinator(object):
_on_join_complete().
"""
- _session_timeout_ms = 30000
- _heartbeat_interval_ms = 3000
- _retry_backoff_ms = 100
+ DEFAULT_CONFIG = {
+ 'session_timeout_ms': 30000,
+ 'heartbeat_interval_ms': 3000,
+ 'retry_backoff_ms': 100,
+ }
- def __init__(self, client, group_id, **kwargs):
+ def __init__(self, client, group_id, **configs):
if not client:
raise Errors.IllegalStateError('a client is required to use'
' Group Coordinator')
if not group_id:
raise Errors.IllegalStateError('a group_id is required to use'
' Group Coordinator')
- for config in ('session_timeout_ms',
- 'heartbeat_interval_ms',
- 'retry_backoff_ms'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
+
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
self._client = client
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
@@ -68,9 +71,7 @@ class AbstractCoordinator(object):
self.coordinator_id = None
self.rejoin_needed = True
self.needs_join_prepare = True
- self.heartbeat = Heartbeat(
- session_timeout_ms=self._session_timeout_ms,
- heartbeat_interval_ms=self._heartbeat_interval_ms)
+ self.heartbeat = Heartbeat(**self.config)
self.heartbeat_task = HeartbeatTask(self)
#self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
@@ -222,7 +223,7 @@ class AbstractCoordinator(object):
continue
elif not future.retriable():
raise exception # pylint: disable-msg=raising-bad-type
- time.sleep(self._retry_backoff_ms / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000.0)
def _perform_group_join(self):
"""Join the group and return the assignment for the next generation.
@@ -242,7 +243,7 @@ class AbstractCoordinator(object):
log.debug("(Re-)joining group %s", self.group_id)
request = JoinGroupRequest(
self.group_id,
- self._session_timeout_ms,
+ self.config['session_timeout_ms'],
self.member_id,
self.protocol_type(),
[(protocol,
@@ -492,8 +493,7 @@ class AbstractCoordinator(object):
def _send_heartbeat_request(self):
"""Send a heartbeat request"""
request = HeartbeatRequest(self.group_id, self.generation, self.member_id)
- log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id,
- request.member_id)
+ log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_heartbeat_response, future)
@@ -594,7 +594,7 @@ class HeartbeatTask(object):
def _handle_heartbeat_failure(self, e):
log.debug("Heartbeat failed; retrying")
self._request_in_flight = False
- etd = time.time() + self._coordinator._retry_backoff_ms / 1000.0
+ etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
self._client.schedule(self, etd)