summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py9
1 files changed, 8 insertions, 1 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index e136ea2..f406a30 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -356,7 +356,14 @@ class Fetcher(six.Iterator):
for record in self._unpack_message_set(tp, msg.decompress()):
yield record
else:
- key, value = self._deserialize(msg)
+ try:
+ key, value = self._deserialize(msg)
+ # If the deserializer raises StopIteration, it is erroneously
+ # caught by the generator. We want all exceptions to be raised
+ # back to the user. See Issue 545
+ except StopIteration as e:
+ log.exception('Deserializer raised StopIteration: %s', e)
+ raise Exception('Deserializer raised StopIteration')
yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
def _message_generator(self):