diff options
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r-- | kafka/coordinator/base.py | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index e4ebcb0..66d7e6c 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -15,7 +15,7 @@ from ..metrics import AnonMeasurable from ..metrics.stats import Avg, Count, Max, Rate from ..protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest from ..protocol.group import (HeartbeatRequest, JoinGroupRequest, - LeaveGroupRequest, SyncGroupRequest) + LeaveGroupRequest, SyncGroupRequest) log = logging.getLogger('kafka.coordinator') @@ -220,7 +220,7 @@ class BaseCoordinator(object): metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) else: - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type def need_rejoin(self): """Check whether the group should be rejoined (e.g. if metadata changes) @@ -270,7 +270,7 @@ class BaseCoordinator(object): Errors.IllegalGenerationError)): continue elif not future.retriable(): - raise exception # pylint: disable-msg=raising-bad-type + raise exception # pylint: disable-msg=raising-bad-type time.sleep(self.config['retry_backoff_ms'] / 1000) def _send_join_group_request(self): @@ -428,7 +428,7 @@ class BaseCoordinator(object): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.info("Successfully joined group %s with generation %s", - self.group_id, self.generation) + self.group_id, self.generation) self.sensors.sync_latency.record((time.time() - send_time) * 1000) future.success(response.member_assignment) return @@ -554,7 +554,7 @@ class BaseCoordinator(object): def _send_heartbeat_request(self): """Send a heartbeat request""" request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id) - log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member + log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_heartbeat_response, future, time.time()) @@ -627,7 +627,7 @@ class HeartbeatTask(object): def __call__(self): if (self._coordinator.generation < 0 or - self._coordinator.need_rejoin()): + self._coordinator.need_rejoin()): # no need to send the heartbeat we're not using auto-assignment # or if we are awaiting a rebalance log.info("Skipping heartbeat: no auto-assignment" |