summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/coordinator/abstract.py27
1 files changed, 13 insertions, 14 deletions
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py
index 78e8d74..c84475a 100644
--- a/kafka/coordinator/abstract.py
+++ b/kafka/coordinator/abstract.py
@@ -488,6 +488,8 @@ class AbstractCoordinator(object):
def _send_heartbeat_request(self):
"""Send a heartbeat request"""
request = HeartbeatRequest(self.group_id, self.generation, self.member_id)
+ log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id,
+ request.member_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_heartbeat_response, future)
@@ -502,33 +504,32 @@ class AbstractCoordinator(object):
future.success(None)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
- log.info("Attempt to heart beat failed since coordinator is either"
- " not started or not valid; marking it as dead.")
+ log.info("Heartbeat failed: coordinator is either not started or"
+ " not valid; will refresh metadata and retry")
self.coordinator_dead()
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
- log.info("Attempt to heart beat failed since the group is"
- " rebalancing; try to re-join group.")
+ log.info("Heartbeat failed: group is rebalancing; re-joining group")
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
- log.info("Attempt to heart beat failed since generation id"
- " is not legal; try to re-join group.")
+ log.info("Heartbeat failed: local generation id is not current;"
+ " re-joining group")
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError:
- log.info("Attempt to heart beat failed since member id"
- " is not valid; reset it and try to re-join group.")
+ log.info("Heartbeat failed: local member_id was not recognized;"
+ " resetting and re-joining group")
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
self.rejoin_needed = True
future.failure(error_type)
elif error_type is Errors.GroupAuthorizationFailedError:
error = error_type(self.group_id)
- log.error("Attempt to heart beat failed authorization: %s", error)
+ log.error("Heartbeat failed: authorization error: %s", error)
future.failure(error)
else:
error = error_type()
- log.error("Unknown error in heart beat response: %s", error)
+ log.error("Heartbeat failed: Unhandled error: %s", error)
future.failure(error)
@@ -550,7 +551,6 @@ class HeartbeatTask(object):
self._client.schedule(self, time.time())
def __call__(self):
- log.debug("Running Heartbeat task")
if (self._coordinator.generation < 0 or
self._coordinator.need_rejoin() or
self._coordinator.coordinator_unknown()):
@@ -563,17 +563,16 @@ class HeartbeatTask(object):
if self._heartbeat.session_expired():
# we haven't received a successful heartbeat in one session interval
# so mark the coordinator dead
- log.error("Heartbeat session expired")
+ log.error("Heartbeat session expired - marking coordinator dead")
self._coordinator.coordinator_dead()
return
if not self._heartbeat.should_heartbeat():
# we don't need to heartbeat now, so reschedule for when we do
ttl = self._heartbeat.ttl()
- log.debug("Heartbeat unneeded now, retrying in %s", ttl)
+ log.debug("Heartbeat task unneeded now, retrying in %s", ttl)
self._client.schedule(self, time.time() + ttl)
else:
- log.debug("Sending HeartbeatRequest")
self._heartbeat.sent_heartbeat()
self._request_in_flight = True
future = self._coordinator._send_heartbeat_request()