diff options
-rw-r--r-- | kafka/coordinator/consumer.py | 56 |
1 files changed, 33 insertions, 23 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 474c0e0..67b4b6d 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -321,9 +321,18 @@ class ConsumerCoordinator(AbstractCoordinator): try: self.commit_offsets_sync(self._subscription.all_consumed_offsets()) + + # The three main group membership errors are known and should not + # require a stacktrace -- just a warning + except (Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError): + log.warning("Offset commit failed: group membership out of date" + " This is likely to cause duplicate message" + " delivery.") except Exception: - # consistent with async auto-commit failures, we do not propagate the exception - log.exception("Auto offset commit failed") + log.exception("Offset commit failed: This is likely to cause" + " duplicate message delivery") def _send_offset_commit_request(self, offsets): """Commit offsets for the specified list of topics and partitions. @@ -388,7 +397,8 @@ class ConsumerCoordinator(AbstractCoordinator): if self._subscription.is_assigned(tp): self._subscription.assignment[tp].committed = offset.offset elif error_type is Errors.GroupAuthorizationFailedError: - log.error("Unauthorized to commit for group %s", self.group_id) + log.error("OffsetCommit failed for group %s - %s", + self.group_id, error_type.__name__) future.failure(error_type(self.group_id)) return elif error_type is Errors.TopicAuthorizationFailedError: @@ -396,48 +406,48 @@ class ConsumerCoordinator(AbstractCoordinator): elif error_type in (Errors.OffsetMetadataTooLargeError, Errors.InvalidCommitOffsetSizeError): # raise the error to the user - error = error_type() - log.info("Offset commit for group %s failed on partition" - " %s due to %s will retry", self.group_id, tp, error) - future.failure(error) + log.info("OffsetCommit failed for group %s on partition %s" + " due to %s, will retry", self.group_id, tp, + error_type.__name__) + future.failure(error_type()) return elif error_type is Errors.GroupLoadInProgressError: # just retry - error = error_type(self.group_id) - log.info("Offset commit for group %s failed due to %s," - " will retry", self.group_id, error) - future.failure(error) + log.info("OffsetCommit failed for group %s because group is" + " initializing (%s), will retry", self.group_id, + error_type.__name__) + future.failure(error_type(self.group_id)) return elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError, Errors.RequestTimedOutError): - error = error_type(self.group_id) - log.info("Offset commit for group %s failed due to %s," - " will find new coordinator and retry", - self.group_id, error) + log.info("OffsetCommit failed for group %s due to a" + " coordinator error (%s), will find new coordinator" + " and retry", self.group_id, error_type.__name__) self.coordinator_dead() - future.failure(error) + future.failure(error_type(self.group_id)) return elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError, Errors.RebalanceInProgressError): # need to re-join group error = error_type(self.group_id) - log.error("Error %s occurred while committing offsets for" - " group %s", error, self.group_id) + log.error("OffsetCommit failed for group %s due to group" + " error (%s), will rejoin", self.group_id, error) self._subscription.mark_for_reassignment() # Errors.CommitFailedError("Commit cannot be completed due to group rebalance")) future.failure(error) return else: - error = error_type() - log.error("Unexpected error committing partition %s at" - " offset %s: %s", tp, offset, error) - future.failure(error) + log.error("OffsetCommit failed for group % on partition %s" + " with offset %s: %s", tp, offset, + error_type.__name__) + future.failure(error_type()) return if unauthorized_topics: - log.error("Unauthorized to commit to topics %s", unauthorized_topics) + log.error("OffsetCommit failed for unauthorized topics %s", + unauthorized_topics) future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) else: future.success(True) |