diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-24 22:01:09 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-25 10:09:15 -0800 |
commit | 0d5899020a75e22fec14d3e3d7aec8f043d60a31 (patch) | |
tree | e227d12865560026a3170196b331d4d843a9e7fd /kafka/protocol/message.py | |
parent | 2c7b7452a8ca761672e70ee56b3779e4a96c1997 (diff) | |
download | kafka-python-0d5899020a75e22fec14d3e3d7aec8f043d60a31.tar.gz |
Add support for LZ4 compressed messages using python-lz4 module
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index fb54049..ae261bf 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -1,6 +1,7 @@ import io -from ..codec import gzip_decode, snappy_decode +from ..codec import (has_gzip, has_snappy, has_lz4, + gzip_decode, snappy_decode, lz4_decode) from . import pickle from .struct import Struct from .types import ( @@ -20,6 +21,7 @@ class Message(Struct): CODEC_MASK = 0x03 CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 + CODEC_LZ4 = 0x03 HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2) def __init__(self, value, key=None, magic=0, attributes=0, crc=0): @@ -61,11 +63,18 @@ class Message(Struct): def decompress(self): codec = self.attributes & self.CODEC_MASK - assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY) + assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4) if codec == self.CODEC_GZIP: + assert has_gzip(), 'Gzip decompression unsupported' raw_bytes = gzip_decode(self.value) - else: + elif codec == self.CODEC_SNAPPY: + assert has_snappy(), 'Snappy decompression unsupported' raw_bytes = snappy_decode(self.value) + elif codec == self.CODEC_LZ4: + assert has_lz4(), 'LZ4 decompression unsupported' + raw_bytes = lz4_decode(self.value) + else: + raise Exception('This should be impossible') return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes)) |