summaryrefslogtreecommitdiff
path: root/kafka/protocol/legacy.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r--kafka/protocol/legacy.py14
1 files changed, 12 insertions, 2 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index 08d2d01..cd100d6 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -204,13 +204,23 @@ class KafkaProtocol(object):
return [
kafka.structs.FetchResponsePayload(
topic, partition, error, highwater_offset, [
- kafka.structs.OffsetAndMessage(offset, message)
- for offset, _, message in messages])
+ offset_and_msg
+ for offset_and_msg in cls.decode_message_set(messages)])
for topic, partitions in response.topics
for partition, error, highwater_offset, messages in partitions
]
@classmethod
+ def decode_message_set(cls, messages):
+ for offset, _, message in messages:
+ if isinstance(message, kafka.protocol.message.Message) and message.is_compressed():
+ inner_messages = message.decompress()
+ for (inner_offset, _msg_size, inner_msg) in inner_messages:
+ yield kafka.structs.OffsetAndMessage(inner_offset, inner_msg)
+ else:
+ yield kafka.structs.OffsetAndMessage(offset, message)
+
+ @classmethod
def encode_offset_request(cls, payloads=()):
return kafka.protocol.offset.OffsetRequest[0](
replica_id=-1,