summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/common.py2
-rw-r--r--kafka/consumer/simple.py9
-rw-r--r--test/test_failover_integration.py8
3 files changed, 18 insertions, 1 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 2fdf7d2..66987ff 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -226,6 +226,8 @@ kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()])
def check_error(response):
+ if isinstance(response, Exception):
+ raise response
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 2c2f820..88eeada 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -19,7 +19,7 @@ from kafka.common import (
FetchRequest, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
- OffsetOutOfRangeError, check_error
+ OffsetOutOfRangeError, FailedPayloadsError, check_error
)
from .base import (
Consumer,
@@ -355,6 +355,13 @@ class SimpleConsumer(Consumer):
# Retry this partition
retry_partitions[resp.partition] = partitions[resp.partition]
continue
+ except FailedPayloadsError as e:
+ log.warning("Failed payloads of %s"
+ "Resetting partition offset...",
+ e.payload)
+ # Retry this partition
+ retry_partitions[e.payload.partition] = partitions[e.payload.partition]
+ continue
partition = resp.partition
buffer_size = partitions[partition]
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index f260093..1d835e2 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -169,6 +169,14 @@ class TestFailover(KafkaIntegrationTestCase):
msg = random_string(10).encode('utf-8')
producer.send_messages(topic, key, msg)
+ @kafka_versions("all")
+ def test_switch_leader_simple_consumer(self):
+ producer = Producer(self.client, async=False)
+ consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
+ self._send_random_messages(producer, self.topic, 0, 2)
+ consumer.get_messages()
+ self._kill_leader(self.topic, 0)
+ consumer.get_messages()
def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):