diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 9 |
1 files changed, 8 insertions, 1 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e136ea2..f406a30 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -356,7 +356,14 @@ class Fetcher(six.Iterator): for record in self._unpack_message_set(tp, msg.decompress()): yield record else: - key, value = self._deserialize(msg) + try: + key, value = self._deserialize(msg) + # If the deserializer raises StopIteration, it is erroneously + # caught by the generator. We want all exceptions to be raised + # back to the user. See Issue 545 + except StopIteration as e: + log.exception('Deserializer raised StopIteration: %s', e) + raise Exception('Deserializer raised StopIteration') yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) def _message_generator(self): |