summaryrefslogtreecommitdiff
path: root/kafka/consumer/kafka.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-08 22:20:16 -0700
committerDana Powers <dana.powers@rd.io>2015-06-08 22:20:16 -0700
commitb998fc7376272fc16ea4c3242d4f009f234ef85b (patch)
tree739112376d90b1b0623c00a5a558a9ad8669b81f /kafka/consumer/kafka.py
parentfe382a55b253e2c0c4f66052ced1714dbdab65ae (diff)
downloadkafka-python-b998fc7376272fc16ea4c3242d4f009f234ef85b.tar.gz
Update KafkaConsumer to handle request-specific FailedPayloadsErrors
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r--kafka/consumer/kafka.py24
1 files changed, 13 insertions, 11 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 7ba83cb..b141a98 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -323,19 +323,21 @@ class KafkaConsumer(object):
max_bytes)
for (topic, partition) in self._topics]
- # client.send_fetch_request will collect topic/partition requests by leader
- # and send each group as a single FetchRequest to the correct broker
- try:
- responses = self._client.send_fetch_request(fetches,
- max_wait_time=max_wait_time,
- min_bytes=min_bytes,
- fail_on_error=False)
- except FailedPayloadsError:
- logger.warning('FailedPayloadsError attempting to fetch data from kafka')
- self._refresh_metadata_on_error()
- return
+ # send_fetch_request will batch topic/partition requests by leader
+ responses = self._client.send_fetch_request(
+ fetches,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes,
+ fail_on_error=False
+ )
for resp in responses:
+
+ if isinstance(resp, FailedPayloadsError):
+ logger.warning('FailedPayloadsError attempting to fetch data')
+ self._refresh_metadata_on_error()
+ continue
+
topic = kafka_bytestring(resp.topic)
partition = resp.partition
try: