diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-29 17:34:17 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-29 17:34:17 -0800 |
commit | 6e20e0bb52143955e49a3edca77153b5aba58148 (patch) | |
tree | 104a4be7ce475ef49b9d501702b0367930891d13 | |
parent | 39e7562b31a7058bdeeb6d61abbbbd7627799546 (diff) | |
download | kafka-python-6e20e0bb52143955e49a3edca77153b5aba58148.tar.gz |
Improve OffsetCommit error logging
Avoid printing full errors because they currently include
long descriptions that are generally duplicative of our local
error message.
-rw-r--r-- | kafka/coordinator/consumer.py | 56 |
1 files changed, 33 insertions, 23 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 474c0e0..67b4b6d 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -321,9 +321,18 @@ class ConsumerCoordinator(AbstractCoordinator): try: self.commit_offsets_sync(self._subscription.all_consumed_offsets()) + + # The three main group membership errors are known and should not + # require a stacktrace -- just a warning + except (Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError): + log.warning("Offset commit failed: group membership out of date" + " This is likely to cause duplicate message" + " delivery.") except Exception: - # consistent with async auto-commit failures, we do not propagate the exception - log.exception("Auto offset commit failed") + log.exception("Offset commit failed: This is likely to cause" + " duplicate message delivery") def _send_offset_commit_request(self, offsets): """Commit offsets for the specified list of topics and partitions. @@ -388,7 +397,8 @@ class ConsumerCoordinator(AbstractCoordinator): if self._subscription.is_assigned(tp): self._subscription.assignment[tp].committed = offset.offset elif error_type is Errors.GroupAuthorizationFailedError: - log.error("Unauthorized to commit for group %s", self.group_id) + log.error("OffsetCommit failed for group %s - %s", + self.group_id, error_type.__name__) future.failure(error_type(self.group_id)) return elif error_type is Errors.TopicAuthorizationFailedError: @@ -396,48 +406,48 @@ class ConsumerCoordinator(AbstractCoordinator): elif error_type in (Errors.OffsetMetadataTooLargeError, Errors.InvalidCommitOffsetSizeError): # raise the error to the user - error = error_type() - log.info("Offset commit for group %s failed on partition" - " %s due to %s will retry", self.group_id, tp, error) - future.failure(error) + log.info("OffsetCommit failed for group %s on partition %s" + " due to %s, will retry", self.group_id, tp, + error_type.__name__) + future.failure(error_type()) return elif error_type is Errors.GroupLoadInProgressError: # just retry - error = error_type(self.group_id) - log.info("Offset commit for group %s failed due to %s," - " will retry", self.group_id, error) - future.failure(error) + log.info("OffsetCommit failed for group %s because group is" + " initializing (%s), will retry", self.group_id, + error_type.__name__) + future.failure(error_type(self.group_id)) return elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError, Errors.RequestTimedOutError): - error = error_type(self.group_id) - log.info("Offset commit for group %s failed due to %s," - " will find new coordinator and retry", - self.group_id, error) + log.info("OffsetCommit failed for group %s due to a" + " coordinator error (%s), will find new coordinator" + " and retry", self.group_id, error_type.__name__) self.coordinator_dead() - future.failure(error) + future.failure(error_type(self.group_id)) return elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError, Errors.RebalanceInProgressError): # need to re-join group error = error_type(self.group_id) - log.error("Error %s occurred while committing offsets for" - " group %s", error, self.group_id) + log.error("OffsetCommit failed for group %s due to group" + " error (%s), will rejoin", self.group_id, error) self._subscription.mark_for_reassignment() # Errors.CommitFailedError("Commit cannot be completed due to group rebalance")) future.failure(error) return else: - error = error_type() - log.error("Unexpected error committing partition %s at" - " offset %s: %s", tp, offset, error) - future.failure(error) + log.error("OffsetCommit failed for group % on partition %s" + " with offset %s: %s", tp, offset, + error_type.__name__) + future.failure(error_type()) return if unauthorized_topics: - log.error("Unauthorized to commit to topics %s", unauthorized_topics) + log.error("OffsetCommit failed for unauthorized topics %s", + unauthorized_topics) future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) else: future.success(True) |