diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-01-25 12:27:24 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-01-25 12:27:24 -0800 |
commit | 0dcd5f10b983b85a17e38065d79fe8f632e70fad (patch) | |
tree | 21f18d999f88b58e64ebc2a0579d70cac0bfdb13 /kafka/protocol/message.py | |
parent | 2c7b7452a8ca761672e70ee56b3779e4a96c1997 (diff) | |
parent | c118991a1cfbbd88d999843c6d7bb4a48fce0820 (diff) | |
download | kafka-python-0dcd5f10b983b85a17e38065d79fe8f632e70fad.tar.gz |
Merge pull request #518 from dpkp/lz4
Add support for LZ4 compression / decompression
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index fb54049..ae261bf 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -1,6 +1,7 @@ import io -from ..codec import gzip_decode, snappy_decode +from ..codec import (has_gzip, has_snappy, has_lz4, + gzip_decode, snappy_decode, lz4_decode) from . import pickle from .struct import Struct from .types import ( @@ -20,6 +21,7 @@ class Message(Struct): CODEC_MASK = 0x03 CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 + CODEC_LZ4 = 0x03 HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2) def __init__(self, value, key=None, magic=0, attributes=0, crc=0): @@ -61,11 +63,18 @@ class Message(Struct): def decompress(self): codec = self.attributes & self.CODEC_MASK - assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY) + assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4) if codec == self.CODEC_GZIP: + assert has_gzip(), 'Gzip decompression unsupported' raw_bytes = gzip_decode(self.value) - else: + elif codec == self.CODEC_SNAPPY: + assert has_snappy(), 'Snappy decompression unsupported' raw_bytes = snappy_decode(self.value) + elif codec == self.CODEC_LZ4: + assert has_lz4(), 'LZ4 decompression unsupported' + raw_bytes = lz4_decode(self.value) + else: + raise Exception('This should be impossible') return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes)) |