summaryrefslogtreecommitdiff
path: root/kafka/protocol/message.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-01-25 12:27:24 -0800
committerDana Powers <dana.powers@gmail.com>2016-01-25 12:27:24 -0800
commit0dcd5f10b983b85a17e38065d79fe8f632e70fad (patch)
tree21f18d999f88b58e64ebc2a0579d70cac0bfdb13 /kafka/protocol/message.py
parent2c7b7452a8ca761672e70ee56b3779e4a96c1997 (diff)
parentc118991a1cfbbd88d999843c6d7bb4a48fce0820 (diff)
downloadkafka-python-0dcd5f10b983b85a17e38065d79fe8f632e70fad.tar.gz
Merge pull request #518 from dpkp/lz4
Add support for LZ4 compression / decompression
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r--kafka/protocol/message.py15
1 files changed, 12 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index fb54049..ae261bf 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -1,6 +1,7 @@
import io
-from ..codec import gzip_decode, snappy_decode
+from ..codec import (has_gzip, has_snappy, has_lz4,
+ gzip_decode, snappy_decode, lz4_decode)
from . import pickle
from .struct import Struct
from .types import (
@@ -20,6 +21,7 @@ class Message(Struct):
CODEC_MASK = 0x03
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
+ CODEC_LZ4 = 0x03
HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2)
def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
@@ -61,11 +63,18 @@ class Message(Struct):
def decompress(self):
codec = self.attributes & self.CODEC_MASK
- assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY)
+ assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
if codec == self.CODEC_GZIP:
+ assert has_gzip(), 'Gzip decompression unsupported'
raw_bytes = gzip_decode(self.value)
- else:
+ elif codec == self.CODEC_SNAPPY:
+ assert has_snappy(), 'Snappy decompression unsupported'
raw_bytes = snappy_decode(self.value)
+ elif codec == self.CODEC_LZ4:
+ assert has_lz4(), 'LZ4 decompression unsupported'
+ raw_bytes = lz4_decode(self.value)
+ else:
+ raise Exception('This should be impossible')
return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes))