diff options
-rw-r--r-- | kafka/consumer/simple.py | 13 | ||||
-rw-r--r-- | kafka/producer/base.py | 5 |
2 files changed, 11 insertions, 7 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index c75e78b..82a1fe2 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -358,23 +358,26 @@ class SimpleConsumer(Consumer): try: check_error(resp) except UnknownTopicOrPartitionError: + log.error('UnknownTopicOrPartitionError for %s:%d', + resp.topic, resp.partition) self.client.reset_topic_metadata(resp.topic) raise except NotLeaderForPartitionError: + log.error('NotLeaderForPartitionError for %s:%d', + resp.topic, resp.partition) self.client.reset_topic_metadata(resp.topic) continue except OffsetOutOfRangeError: - log.warning("OffsetOutOfRangeError for %s - %d. " - "Resetting partition offset...", + log.warning('OffsetOutOfRangeError for %s:%d. ' + 'Resetting partition offset...', resp.topic, resp.partition) self.reset_partition_offset(resp.partition) # Retry this partition retry_partitions[resp.partition] = partitions[resp.partition] continue except FailedPayloadsError as e: - log.warning("Failed payloads of %s" - "Resetting partition offset...", - e.payload) + log.warning('FailedPayloadsError for %s:%d', + e.payload.topic, e.payload.partition) # Retry this partition retry_partitions[e.payload.partition] = partitions[e.payload.partition] continue diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 49090bd..3c826cd 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -166,8 +166,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if error_cls: _handle_error(error_cls, orig_req) - log.error('Error sending ProduceRequest (#%d of %d) to %s:%d ' - 'with msgs %s', i + 1, len(requests), + log.error('%s sending ProduceRequest (#%d of %d) ' + 'to %s:%d with msgs %s', + error_cls.__name__, (i + 1), len(requests), orig_req.topic, orig_req.partition, orig_req.messages if log_messages_on_error else hash(orig_req.messages)) |