From 6b97eaefb23f637bc5095e49fd4ab9ee6755ce6e Mon Sep 17 00:00:00 2001 From: webber Date: Sun, 6 Aug 2017 00:40:54 +0800 Subject: Fixed Issue 1033.Raise AssertionError when decompression unsupported. (#1159) --- kafka/consumer/fetcher.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 2782057..8db89a1 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -122,6 +122,7 @@ class Fetcher(six.Iterator): if self._client.ready(node_id): log.debug("Sending FetchRequest to node %s", node_id) future = self._client.send(node_id, request) + future.error_on_callbacks=True future.add_callback(self._handle_fetch_response, request, time.time()) future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id) futures.append(future) @@ -550,6 +551,12 @@ class Fetcher(six.Iterator): log.exception('StopIteration raised unpacking messageset: %s', e) raise Exception('StopIteration raised unpacking messageset') + # If unpacking raises AssertionError, it means decompression unsupported + # See Issue 1033 + except AssertionError as e: + log.exception('AssertionError raised unpacking messageset: %s', e) + raise + def __iter__(self): # pylint: disable=non-iterator-returned return self -- cgit v1.2.1