diff options
author | Paul Cavallaro <paulcavallaro@users.noreply.github.com> | 2016-05-10 11:34:06 -0400 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-10 08:34:06 -0700 |
commit | 2c9930dea4a4537cf237ac7cc9db1f3970419b59 (patch) | |
tree | 74956b9a2433cc214d87bcd593c3eaead593eb8b /kafka/protocol/legacy.py | |
parent | 0684302d8fa1271ad0e913972b382b00ddeab717 (diff) | |
download | kafka-python-2c9930dea4a4537cf237ac7cc9db1f3970419b59.tar.gz |
* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages* [Legacy Protocol] Update legacy protocol to handle compressed messages
* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r-- | kafka/protocol/legacy.py | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 08d2d01..cd100d6 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -204,13 +204,23 @@ class KafkaProtocol(object): return [ kafka.structs.FetchResponsePayload( topic, partition, error, highwater_offset, [ - kafka.structs.OffsetAndMessage(offset, message) - for offset, _, message in messages]) + offset_and_msg + for offset_and_msg in cls.decode_message_set(messages)]) for topic, partitions in response.topics for partition, error, highwater_offset, messages in partitions ] @classmethod + def decode_message_set(cls, messages): + for offset, _, message in messages: + if isinstance(message, kafka.protocol.message.Message) and message.is_compressed(): + inner_messages = message.decompress() + for (inner_offset, _msg_size, inner_msg) in inner_messages: + yield kafka.structs.OffsetAndMessage(inner_offset, inner_msg) + else: + yield kafka.structs.OffsetAndMessage(offset, message) + + @classmethod def encode_offset_request(cls, payloads=()): return kafka.protocol.offset.OffsetRequest[0]( replica_id=-1, |