diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-06 11:47:07 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-06 11:47:07 -0700 |
commit | 3ef15f9d60af01ce397737b4d356618385b8884f (patch) | |
tree | 9abd07603f15429eb423780b8a57dee390d3fd70 /kafka | |
parent | 358b4820744c42d47171f17a5b373a1c89f520bb (diff) | |
download | kafka-python-3ef15f9d60af01ce397737b4d356618385b8884f.tar.gz |
Increase coverage of StopIteration check in _unpack_message_set
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/fetcher.py | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 375090a..71d2ed2 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -351,22 +351,22 @@ class Fetcher(six.Iterator): return dict(drained) def _unpack_message_set(self, tp, messages): - for offset, size, msg in messages: - if self.config['check_crcs'] and not msg.validate_crc(): - raise Errors.InvalidMessageError(msg) - elif msg.is_compressed(): - for record in self._unpack_message_set(tp, msg.decompress()): - yield record - else: - try: + try: + for offset, size, msg in messages: + if self.config['check_crcs'] and not msg.validate_crc(): + raise Errors.InvalidMessageError(msg) + elif msg.is_compressed(): + for record in self._unpack_message_set(tp, msg.decompress()): + yield record + else: key, value = self._deserialize(msg) - # If the deserializer raises StopIteration, it is erroneously - # caught by the generator. We want all exceptions to be raised - # back to the user. See Issue 545 - except StopIteration as e: - log.exception('Deserializer raised StopIteration: %s', e) - raise Exception('Deserializer raised StopIteration') - yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + # If unpacking raises StopIteration, it is erroneously + # caught by the generator. We want all exceptions to be raised + # back to the user. See Issue 545 + except StopIteration as e: + log.exception('StopIteration raised unpacking messageset: %s', e) + raise Exception('StopIteration raised unpacking messageset') def _message_generator(self): """Iterate over fetched_records""" |