diff options
-rw-r--r-- | kafka/coordinator/base.py | 14 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 4 |
2 files changed, 9 insertions, 9 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 5f60aa3..22dffb4 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -190,7 +190,7 @@ class BaseCoordinator(object): return True if self._client.is_disconnected(self.coordinator_id): - self.coordinator_dead() + self.coordinator_dead('Node Disconnected') return True return False @@ -311,7 +311,7 @@ class BaseCoordinator(object): # unless the error is caused by internal client pipelining if not isinstance(error, (Errors.NodeNotReadyError, Errors.TooManyInFlightRequests)): - self.coordinator_dead() + self.coordinator_dead(error) future.failure(error) def _handle_join_group_response(self, future, send_time, response): @@ -348,7 +348,7 @@ class BaseCoordinator(object): elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): # re-discover the coordinator and retry with backoff - self.coordinator_dead() + self.coordinator_dead(error_type()) log.debug("Attempt to join group %s failed due to obsolete " "coordinator information: %s", self.group_id, error_type.__name__) @@ -448,7 +448,7 @@ class BaseCoordinator(object): Errors.NotCoordinatorForGroupError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - self.coordinator_dead() + self.coordinator_dead(error) future.failure(error) else: error = error_type() @@ -513,7 +513,7 @@ class BaseCoordinator(object): error) future.failure(error) - def coordinator_dead(self, error=None): + def coordinator_dead(self, error): """Mark the current coordinator as dead.""" if self.coordinator_id is not None: log.warning("Marking the coordinator dead (node %s) for group %s: %s.", @@ -571,7 +571,7 @@ class BaseCoordinator(object): log.warning("Heartbeat failed for group %s: coordinator (node %s)" " is either not started or not valid", self.group_id, self.coordinator_id) - self.coordinator_dead() + self.coordinator_dead(error_type()) future.failure(error_type()) elif error_type is Errors.RebalanceInProgressError: log.warning("Heartbeat failed for group %s because it is" @@ -640,7 +640,7 @@ class HeartbeatTask(object): # we haven't received a successful heartbeat in one session interval # so mark the coordinator dead log.error("Heartbeat session expired - marking coordinator dead") - self._coordinator.coordinator_dead() + self._coordinator.coordinator_dead('Heartbeat session expired') return if not self._heartbeat.should_heartbeat(): diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a600cb4..fac8144 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -525,7 +525,7 @@ class ConsumerCoordinator(BaseCoordinator): Errors.RequestTimedOutError): log.debug("OffsetCommit for group %s failed: %s", self.group_id, error_type.__name__) - self.coordinator_dead() + self.coordinator_dead(error_type()) future.failure(error_type(self.group_id)) return elif error_type in (Errors.UnknownMemberIdError, @@ -630,7 +630,7 @@ class ConsumerCoordinator(BaseCoordinator): future.failure(error) elif error_type is Errors.NotCoordinatorForGroupError: # re-discover the coordinator and retry - self.coordinator_dead() + self.coordinator_dead(error_type()) future.failure(error) elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): |