diff options
-rw-r--r-- | kafka/consumer/fetcher.py | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 375090a..71d2ed2 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -351,22 +351,22 @@ class Fetcher(six.Iterator): return dict(drained) def _unpack_message_set(self, tp, messages): - for offset, size, msg in messages: - if self.config['check_crcs'] and not msg.validate_crc(): - raise Errors.InvalidMessageError(msg) - elif msg.is_compressed(): - for record in self._unpack_message_set(tp, msg.decompress()): - yield record - else: - try: + try: + for offset, size, msg in messages: + if self.config['check_crcs'] and not msg.validate_crc(): + raise Errors.InvalidMessageError(msg) + elif msg.is_compressed(): + for record in self._unpack_message_set(tp, msg.decompress()): + yield record + else: key, value = self._deserialize(msg) - # If the deserializer raises StopIteration, it is erroneously - # caught by the generator. We want all exceptions to be raised - # back to the user. See Issue 545 - except StopIteration as e: - log.exception('Deserializer raised StopIteration: %s', e) - raise Exception('Deserializer raised StopIteration') - yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + # If unpacking raises StopIteration, it is erroneously + # caught by the generator. We want all exceptions to be raised + # back to the user. See Issue 545 + except StopIteration as e: + log.exception('StopIteration raised unpacking messageset: %s', e) + raise Exception('StopIteration raised unpacking messageset') def _message_generator(self): """Iterate over fetched_records""" |