diff options
-rw-r--r-- | kafka/coordinator/abstract.py | 27 |
1 files changed, 13 insertions, 14 deletions
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py index 78e8d74..c84475a 100644 --- a/kafka/coordinator/abstract.py +++ b/kafka/coordinator/abstract.py @@ -488,6 +488,8 @@ class AbstractCoordinator(object): def _send_heartbeat_request(self): """Send a heartbeat request""" request = HeartbeatRequest(self.group_id, self.generation, self.member_id) + log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, + request.member_id) future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_heartbeat_response, future) @@ -502,33 +504,32 @@ class AbstractCoordinator(object): future.success(None) elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): - log.info("Attempt to heart beat failed since coordinator is either" - " not started or not valid; marking it as dead.") + log.info("Heartbeat failed: coordinator is either not started or" + " not valid; will refresh metadata and retry") self.coordinator_dead() future.failure(error_type()) elif error_type is Errors.RebalanceInProgressError: - log.info("Attempt to heart beat failed since the group is" - " rebalancing; try to re-join group.") + log.info("Heartbeat failed: group is rebalancing; re-joining group") self.rejoin_needed = True future.failure(error_type()) elif error_type is Errors.IllegalGenerationError: - log.info("Attempt to heart beat failed since generation id" - " is not legal; try to re-join group.") + log.info("Heartbeat failed: local generation id is not current;" + " re-joining group") self.rejoin_needed = True future.failure(error_type()) elif error_type is Errors.UnknownMemberIdError: - log.info("Attempt to heart beat failed since member id" - " is not valid; reset it and try to re-join group.") + log.info("Heartbeat failed: local member_id was not recognized;" + " resetting and re-joining group") self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID self.rejoin_needed = True future.failure(error_type) elif error_type is Errors.GroupAuthorizationFailedError: error = error_type(self.group_id) - log.error("Attempt to heart beat failed authorization: %s", error) + log.error("Heartbeat failed: authorization error: %s", error) future.failure(error) else: error = error_type() - log.error("Unknown error in heart beat response: %s", error) + log.error("Heartbeat failed: Unhandled error: %s", error) future.failure(error) @@ -550,7 +551,6 @@ class HeartbeatTask(object): self._client.schedule(self, time.time()) def __call__(self): - log.debug("Running Heartbeat task") if (self._coordinator.generation < 0 or self._coordinator.need_rejoin() or self._coordinator.coordinator_unknown()): @@ -563,17 +563,16 @@ class HeartbeatTask(object): if self._heartbeat.session_expired(): # we haven't received a successful heartbeat in one session interval # so mark the coordinator dead - log.error("Heartbeat session expired") + log.error("Heartbeat session expired - marking coordinator dead") self._coordinator.coordinator_dead() return if not self._heartbeat.should_heartbeat(): # we don't need to heartbeat now, so reschedule for when we do ttl = self._heartbeat.ttl() - log.debug("Heartbeat unneeded now, retrying in %s", ttl) + log.debug("Heartbeat task unneeded now, retrying in %s", ttl) self._client.schedule(self, time.time() + ttl) else: - log.debug("Sending HeartbeatRequest") self._heartbeat.sent_heartbeat() self._request_in_flight = True future = self._coordinator._send_heartbeat_request() |