summaryrefslogtreecommitdiff
path: root/kafka/protocol/legacy.py
diff options
context:
space:
mode:
authorPaul Cavallaro <paulcavallaro@users.noreply.github.com>2016-05-10 11:34:06 -0400
committerDana Powers <dana.powers@gmail.com>2016-05-10 08:34:06 -0700
commit2c9930dea4a4537cf237ac7cc9db1f3970419b59 (patch)
tree74956b9a2433cc214d87bcd593c3eaead593eb8b /kafka/protocol/legacy.py
parent0684302d8fa1271ad0e913972b382b00ddeab717 (diff)
downloadkafka-python-2c9930dea4a4537cf237ac7cc9db1f3970419b59.tar.gz
* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages* [Legacy Protocol] Update legacy protocol to handle compressed messages
* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r--kafka/protocol/legacy.py14
1 files changed, 12 insertions, 2 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index 08d2d01..cd100d6 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -204,13 +204,23 @@ class KafkaProtocol(object):
return [
kafka.structs.FetchResponsePayload(
topic, partition, error, highwater_offset, [
- kafka.structs.OffsetAndMessage(offset, message)
- for offset, _, message in messages])
+ offset_and_msg
+ for offset_and_msg in cls.decode_message_set(messages)])
for topic, partitions in response.topics
for partition, error, highwater_offset, messages in partitions
]
@classmethod
+ def decode_message_set(cls, messages):
+ for offset, _, message in messages:
+ if isinstance(message, kafka.protocol.message.Message) and message.is_compressed():
+ inner_messages = message.decompress()
+ for (inner_offset, _msg_size, inner_msg) in inner_messages:
+ yield kafka.structs.OffsetAndMessage(inner_offset, inner_msg)
+ else:
+ yield kafka.structs.OffsetAndMessage(offset, message)
+
+ @classmethod
def encode_offset_request(cls, payloads=()):
return kafka.protocol.offset.OffsetRequest[0](
replica_id=-1,