summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/fetcher.py30
1 files changed, 15 insertions, 15 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 375090a..71d2ed2 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -351,22 +351,22 @@ class Fetcher(six.Iterator):
return dict(drained)
def _unpack_message_set(self, tp, messages):
- for offset, size, msg in messages:
- if self.config['check_crcs'] and not msg.validate_crc():
- raise Errors.InvalidMessageError(msg)
- elif msg.is_compressed():
- for record in self._unpack_message_set(tp, msg.decompress()):
- yield record
- else:
- try:
+ try:
+ for offset, size, msg in messages:
+ if self.config['check_crcs'] and not msg.validate_crc():
+ raise Errors.InvalidMessageError(msg)
+ elif msg.is_compressed():
+ for record in self._unpack_message_set(tp, msg.decompress()):
+ yield record
+ else:
key, value = self._deserialize(msg)
- # If the deserializer raises StopIteration, it is erroneously
- # caught by the generator. We want all exceptions to be raised
- # back to the user. See Issue 545
- except StopIteration as e:
- log.exception('Deserializer raised StopIteration: %s', e)
- raise Exception('Deserializer raised StopIteration')
- yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+ yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+ # If unpacking raises StopIteration, it is erroneously
+ # caught by the generator. We want all exceptions to be raised
+ # back to the user. See Issue 545
+ except StopIteration as e:
+ log.exception('StopIteration raised unpacking messageset: %s', e)
+ raise Exception('StopIteration raised unpacking messageset')
def _message_generator(self):
"""Iterate over fetched_records"""