summaryrefslogtreecommitdiff
path: root/kafka/consumer/simple.py
diff options
context:
space:
mode:
authorreAsOn2010 <the.reason.sake@gmail.com>2015-06-09 04:20:16 +0800
committerreAsOn2010 <the.reason.sake@gmail.com>2015-06-09 04:20:16 +0800
commit945fc048a8cc61e1a9390bd7a7fed371d2e23277 (patch)
tree4d5f584f1d33f89aaa119ad20418b5a91c5ecc55 /kafka/consumer/simple.py
parentb1aad92a2e7dfded5f57ebc497dccc5ad3c56781 (diff)
downloadkafka-python-945fc048a8cc61e1a9390bd7a7fed371d2e23277.tar.gz
try to fix uncaught FailedPayloadsError
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r--kafka/consumer/simple.py9
1 files changed, 8 insertions, 1 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 2c2f820..88eeada 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -19,7 +19,7 @@ from kafka.common import (
FetchRequest, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
- OffsetOutOfRangeError, check_error
+ OffsetOutOfRangeError, FailedPayloadsError, check_error
)
from .base import (
Consumer,
@@ -355,6 +355,13 @@ class SimpleConsumer(Consumer):
# Retry this partition
retry_partitions[resp.partition] = partitions[resp.partition]
continue
+ except FailedPayloadsError as e:
+ log.warning("Failed payloads of %s"
+ "Resetting partition offset...",
+ e.payload)
+ # Retry this partition
+ retry_partitions[e.payload.partition] = partitions[e.payload.partition]
+ continue
partition = resp.partition
buffer_size = partitions[partition]