diff options
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r-- | kafka/protocol/legacy.py | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 08d2d01..cd100d6 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -204,13 +204,23 @@ class KafkaProtocol(object): return [ kafka.structs.FetchResponsePayload( topic, partition, error, highwater_offset, [ - kafka.structs.OffsetAndMessage(offset, message) - for offset, _, message in messages]) + offset_and_msg + for offset_and_msg in cls.decode_message_set(messages)]) for topic, partitions in response.topics for partition, error, highwater_offset, messages in partitions ] @classmethod + def decode_message_set(cls, messages): + for offset, _, message in messages: + if isinstance(message, kafka.protocol.message.Message) and message.is_compressed(): + inner_messages = message.decompress() + for (inner_offset, _msg_size, inner_msg) in inner_messages: + yield kafka.structs.OffsetAndMessage(inner_offset, inner_msg) + else: + yield kafka.structs.OffsetAndMessage(offset, message) + + @classmethod def encode_offset_request(cls, payloads=()): return kafka.protocol.offset.OffsetRequest[0]( replica_id=-1, |