summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/coordinator/consumer.py56
1 files changed, 33 insertions, 23 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 474c0e0..67b4b6d 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -321,9 +321,18 @@ class ConsumerCoordinator(AbstractCoordinator):
try:
self.commit_offsets_sync(self._subscription.all_consumed_offsets())
+
+ # The three main group membership errors are known and should not
+ # require a stacktrace -- just a warning
+ except (Errors.UnknownMemberIdError,
+ Errors.IllegalGenerationError,
+ Errors.RebalanceInProgressError):
+ log.warning("Offset commit failed: group membership out of date"
+ " This is likely to cause duplicate message"
+ " delivery.")
except Exception:
- # consistent with async auto-commit failures, we do not propagate the exception
- log.exception("Auto offset commit failed")
+ log.exception("Offset commit failed: This is likely to cause"
+ " duplicate message delivery")
def _send_offset_commit_request(self, offsets):
"""Commit offsets for the specified list of topics and partitions.
@@ -388,7 +397,8 @@ class ConsumerCoordinator(AbstractCoordinator):
if self._subscription.is_assigned(tp):
self._subscription.assignment[tp].committed = offset.offset
elif error_type is Errors.GroupAuthorizationFailedError:
- log.error("Unauthorized to commit for group %s", self.group_id)
+ log.error("OffsetCommit failed for group %s - %s",
+ self.group_id, error_type.__name__)
future.failure(error_type(self.group_id))
return
elif error_type is Errors.TopicAuthorizationFailedError:
@@ -396,48 +406,48 @@ class ConsumerCoordinator(AbstractCoordinator):
elif error_type in (Errors.OffsetMetadataTooLargeError,
Errors.InvalidCommitOffsetSizeError):
# raise the error to the user
- error = error_type()
- log.info("Offset commit for group %s failed on partition"
- " %s due to %s will retry", self.group_id, tp, error)
- future.failure(error)
+ log.info("OffsetCommit failed for group %s on partition %s"
+ " due to %s, will retry", self.group_id, tp,
+ error_type.__name__)
+ future.failure(error_type())
return
elif error_type is Errors.GroupLoadInProgressError:
# just retry
- error = error_type(self.group_id)
- log.info("Offset commit for group %s failed due to %s,"
- " will retry", self.group_id, error)
- future.failure(error)
+ log.info("OffsetCommit failed for group %s because group is"
+ " initializing (%s), will retry", self.group_id,
+ error_type.__name__)
+ future.failure(error_type(self.group_id))
return
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError,
Errors.RequestTimedOutError):
- error = error_type(self.group_id)
- log.info("Offset commit for group %s failed due to %s,"
- " will find new coordinator and retry",
- self.group_id, error)
+ log.info("OffsetCommit failed for group %s due to a"
+ " coordinator error (%s), will find new coordinator"
+ " and retry", self.group_id, error_type.__name__)
self.coordinator_dead()
- future.failure(error)
+ future.failure(error_type(self.group_id))
return
elif error_type in (Errors.UnknownMemberIdError,
Errors.IllegalGenerationError,
Errors.RebalanceInProgressError):
# need to re-join group
error = error_type(self.group_id)
- log.error("Error %s occurred while committing offsets for"
- " group %s", error, self.group_id)
+ log.error("OffsetCommit failed for group %s due to group"
+ " error (%s), will rejoin", self.group_id, error)
self._subscription.mark_for_reassignment()
# Errors.CommitFailedError("Commit cannot be completed due to group rebalance"))
future.failure(error)
return
else:
- error = error_type()
- log.error("Unexpected error committing partition %s at"
- " offset %s: %s", tp, offset, error)
- future.failure(error)
+ log.error("OffsetCommit failed for group % on partition %s"
+ " with offset %s: %s", tp, offset,
+ error_type.__name__)
+ future.failure(error_type())
return
if unauthorized_topics:
- log.error("Unauthorized to commit to topics %s", unauthorized_topics)
+ log.error("OffsetCommit failed for unauthorized topics %s",
+ unauthorized_topics)
future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics))
else:
future.success(True)