summaryrefslogtreecommitdiff
path: root/kafka/coordinator/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r--kafka/coordinator/base.py78
1 files changed, 42 insertions, 36 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index fcf3901..3c7ea21 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -200,7 +200,7 @@ class BaseCoordinator(object):
self._client.poll()
continue
- future = self._send_group_metadata_request()
+ future = self._send_group_coordinator_request()
self._client.poll(future=future)
if future.failed():
@@ -233,7 +233,7 @@ class BaseCoordinator(object):
while self.need_rejoin():
self.ensure_coordinator_known()
- future = self._perform_group_join()
+ future = self._send_join_group_request()
self._client.poll(future=future)
if future.succeeded():
@@ -253,7 +253,7 @@ class BaseCoordinator(object):
raise exception # pylint: disable-msg=raising-bad-type
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
- def _perform_group_join(self):
+ def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.
This function handles both JoinGroup and SyncGroup, delegating to
@@ -268,7 +268,7 @@ class BaseCoordinator(object):
return Future().failure(e)
# send a join group request to the coordinator
- log.debug("(Re-)joining group %s", self.group_id)
+ log.info("(Re-)joining group %s", self.group_id)
request = JoinGroupRequest(
self.group_id,
self.config['session_timeout_ms'],
@@ -279,7 +279,7 @@ class BaseCoordinator(object):
for protocol, metadata in self.group_protocols()])
# create the request for the coordinator
- log.debug("Issuing request (%s) to coordinator %s", request, self.coordinator_id)
+ log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_join_group_response, future)
@@ -300,6 +300,8 @@ class BaseCoordinator(object):
def _handle_join_group_response(self, future, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
+ log.debug("Received successful JoinGroup response for group %s: %s",
+ self.group_id, response)
self.member_id = response.member_id
self.generation = response.generation_id
self.rejoin_needed = False
@@ -315,30 +317,31 @@ class BaseCoordinator(object):
self._on_join_follower().chain(future)
elif error_type is Errors.GroupLoadInProgressError:
- log.debug("Attempt to join group %s rejected since coordinator is"
- " loading the group.", self.group_id)
+ log.debug("Attempt to join group %s rejected since coordinator %s"
+ " is loading the group.", self.group_id, self.coordinator_id)
# backoff and retry
future.failure(error_type(response))
elif error_type is Errors.UnknownMemberIdError:
# reset the member id and retry immediately
error = error_type(self.member_id)
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
- log.info("Attempt to join group %s failed due to unknown member id,"
- " resetting and retrying.", self.group_id)
+ log.debug("Attempt to join group %s failed due to unknown member id",
+ self.group_id)
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
# re-discover the coordinator and retry with backoff
self.coordinator_dead()
- log.info("Attempt to join group %s failed due to obsolete "
- "coordinator information, retrying.", self.group_id)
+ log.debug("Attempt to join group %s failed due to obsolete "
+ "coordinator information: %s", self.group_id,
+ error_type.__name__)
future.failure(error_type())
elif error_type in (Errors.InconsistentGroupProtocolError,
Errors.InvalidSessionTimeoutError,
Errors.InvalidGroupIdError):
# log the error and re-throw the exception
error = error_type(response)
- log.error("Attempt to join group %s failed due to: %s",
+ log.error("Attempt to join group %s failed due to fatal error: %s",
self.group_id, error)
future.failure(error)
elif error_type is Errors.GroupAuthorizationFailedError:
@@ -356,8 +359,8 @@ class BaseCoordinator(object):
self.generation,
self.member_id,
{})
- log.debug("Issuing follower SyncGroup (%s) to coordinator %s",
- request, self.coordinator_id)
+ log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
+ self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
def _on_join_leader(self, response):
@@ -386,8 +389,8 @@ class BaseCoordinator(object):
assignment if isinstance(assignment, bytes) else assignment.encode())
for member_id, assignment in six.iteritems(group_assignment)])
- log.debug("Issuing leader SyncGroup (%s) to coordinator %s",
- request, self.coordinator_id)
+ log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s",
+ self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
def _send_sync_group_request(self, request):
@@ -404,8 +407,8 @@ class BaseCoordinator(object):
def _handle_sync_group_response(self, future, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.debug("Received successful sync group response for group %s: %s",
- self.group_id, response)
+ log.info("Successfully joined group %s with generation %s",
+ self.group_id, self.generation)
#self.sensors.syncLatency.record(response.requestLatencyMs())
future.success(response.member_assignment)
return
@@ -415,21 +418,19 @@ class BaseCoordinator(object):
if error_type is Errors.GroupAuthorizationFailedError:
future.failure(error_type(self.group_id))
elif error_type is Errors.RebalanceInProgressError:
- log.info("SyncGroup for group %s failed due to coordinator"
- " rebalance, rejoining the group", self.group_id)
+ log.debug("SyncGroup for group %s failed due to coordinator"
+ " rebalance", self.group_id)
future.failure(error_type(self.group_id))
elif error_type in (Errors.UnknownMemberIdError,
Errors.IllegalGenerationError):
error = error_type()
- log.info("SyncGroup for group %s failed due to %s,"
- " rejoining the group", self.group_id, error)
+ log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
error = error_type()
- log.info("SyncGroup for group %s failed due to %s, will find new"
- " coordinator and rejoin", self.group_id, error)
+ log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
self.coordinator_dead()
future.failure(error)
else:
@@ -437,7 +438,7 @@ class BaseCoordinator(object):
log.error("Unexpected error from SyncGroup: %s", error)
future.failure(error)
- def _send_group_metadata_request(self):
+ def _send_group_coordinator_request(self):
"""Discover the current coordinator for the group.
Returns:
@@ -447,7 +448,8 @@ class BaseCoordinator(object):
if node_id is None:
return Future().failure(Errors.NoBrokersAvailable())
- log.debug("Issuing group metadata request to broker %s", node_id)
+ log.debug("Sending group coordinator request for group %s to broker %s",
+ self.group_id, node_id)
request = GroupCoordinatorRequest(self.group_id)
future = Future()
_f = self._client.send(node_id, request)
@@ -456,7 +458,7 @@ class BaseCoordinator(object):
return future
def _handle_group_coordinator_response(self, future, response):
- log.debug("Group metadata response %s", response)
+ log.debug("Received group coordinator response %s", response)
if not self.coordinator_unknown():
# We already found the coordinator, so ignore the request
log.debug("Coordinator already known -- ignoring metadata response")
@@ -473,6 +475,8 @@ class BaseCoordinator(object):
return
self.coordinator_id = response.coordinator_id
+ log.info("Discovered coordinator %s for group %s",
+ self.coordinator_id, self.group_id)
self._client.ready(self.coordinator_id)
# start sending heartbeats only if we have a valid generation
@@ -495,8 +499,8 @@ class BaseCoordinator(object):
def coordinator_dead(self, error=None):
"""Mark the current coordinator as dead."""
if self.coordinator_id is not None:
- log.warning("Marking the coordinator dead (node %s): %s.",
- self.coordinator_id, error)
+ log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
+ self.coordinator_id, self.group_id, error)
self.coordinator_id = None
def close(self):
@@ -542,22 +546,24 @@ class BaseCoordinator(object):
#self.sensors.heartbeat_latency.record(response.requestLatencyMs())
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.info("Heartbeat successful")
+ log.debug("Received successful heartbeat response for group %s",
+ self.group_id)
future.success(None)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
- log.warning("Heartbeat failed: coordinator is either not started or"
- " not valid; will refresh metadata and retry")
+ log.warning("Heartbeat failed for group %s: coordinator (node %s)"
+ " is either not started or not valid", self.group_id,
+ self.coordinator_id)
self.coordinator_dead()
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
- log.warning("Heartbeat: group is rebalancing; this consumer needs to"
- " re-join")
+ log.warning("Heartbeat failed for group %s because it is"
+ " rebalancing", self.group_id)
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
- log.warning("Heartbeat: generation id is not current; this consumer"
- " needs to re-join")
+ log.warning("Heartbeat failed for group %s: generation id is not "
+ " current.", self.group_id)
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError: