summaryrefslogtreecommitdiff
path: root/kafka/protocol/message.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-24 22:01:09 -0800
committerDana Powers <dana.powers@rd.io>2016-01-25 10:09:15 -0800
commit0d5899020a75e22fec14d3e3d7aec8f043d60a31 (patch)
treee227d12865560026a3170196b331d4d843a9e7fd /kafka/protocol/message.py
parent2c7b7452a8ca761672e70ee56b3779e4a96c1997 (diff)
downloadkafka-python-0d5899020a75e22fec14d3e3d7aec8f043d60a31.tar.gz
Add support for LZ4 compressed messages using python-lz4 module
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r--kafka/protocol/message.py15
1 files changed, 12 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index fb54049..ae261bf 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -1,6 +1,7 @@
import io
-from ..codec import gzip_decode, snappy_decode
+from ..codec import (has_gzip, has_snappy, has_lz4,
+ gzip_decode, snappy_decode, lz4_decode)
from . import pickle
from .struct import Struct
from .types import (
@@ -20,6 +21,7 @@ class Message(Struct):
CODEC_MASK = 0x03
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
+ CODEC_LZ4 = 0x03
HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2)
def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
@@ -61,11 +63,18 @@ class Message(Struct):
def decompress(self):
codec = self.attributes & self.CODEC_MASK
- assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY)
+ assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
if codec == self.CODEC_GZIP:
+ assert has_gzip(), 'Gzip decompression unsupported'
raw_bytes = gzip_decode(self.value)
- else:
+ elif codec == self.CODEC_SNAPPY:
+ assert has_snappy(), 'Snappy decompression unsupported'
raw_bytes = snappy_decode(self.value)
+ elif codec == self.CODEC_LZ4:
+ assert has_lz4(), 'LZ4 decompression unsupported'
+ raw_bytes = lz4_decode(self.value)
+ else:
+ raise Exception('This should be impossible')
return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes))