summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/codec.py13
-rw-r--r--kafka/producer/buffer.py5
-rw-r--r--kafka/producer/kafka.py2
-rw-r--r--kafka/producer/record_accumulator.py2
-rw-r--r--kafka/protocol/message.py15
5 files changed, 30 insertions, 7 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index c27d89b..c8195ee 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -13,6 +13,15 @@ try:
except ImportError:
_HAS_SNAPPY = False
+try:
+ import lz4
+ from lz4 import compress as lz4_encode
+ from lz4 import decompress as lz4_decode
+except ImportError:
+ lz4 = None
+ lz4_encode = None
+ lz4_decode = None
+
def has_gzip():
return True
@@ -22,6 +31,10 @@ def has_snappy():
return _HAS_SNAPPY
+def has_lz4():
+ return lz4 is not None
+
+
def gzip_encode(payload, compresslevel=None):
if not compresslevel:
compresslevel = 9
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 4e05ec9..1a2dd71 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -5,8 +5,8 @@ import io
import threading
import time
-from ..codec import (has_gzip, has_snappy,
- gzip_encode, snappy_encode)
+from ..codec import (has_gzip, has_snappy, has_lz4,
+ gzip_encode, snappy_encode, lz4_encode)
from ..protocol.types import Int32, Int64
from ..protocol.message import MessageSet, Message
@@ -27,6 +27,7 @@ class MessageSetBuffer(object):
_COMPRESSORS = {
'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
+ 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
}
def __init__(self, buf, batch_size, compression_type=None):
assert batch_size > 0, 'batch_size must be > 0'
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 220528f..2443265 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -111,7 +111,7 @@ class KafkaProducer(object):
remains alive. This is the strongest available guarantee.
If unset, defaults to acks=1.
compression_type (str): The compression type for all data generated by
- the producer. Valid values are 'gzip', 'snappy', or None.
+ the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 17cfa5e..6a762eb 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -114,7 +114,7 @@ class RecordAccumulator(object):
In the current implementation, this setting is an approximation.
Default: 33554432 (32MB)
compression_type (str): The compression type for all data generated by
- the producer. Valid values are 'gzip', 'snappy', or None.
+ the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index fb54049..ae261bf 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -1,6 +1,7 @@
import io
-from ..codec import gzip_decode, snappy_decode
+from ..codec import (has_gzip, has_snappy, has_lz4,
+ gzip_decode, snappy_decode, lz4_decode)
from . import pickle
from .struct import Struct
from .types import (
@@ -20,6 +21,7 @@ class Message(Struct):
CODEC_MASK = 0x03
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
+ CODEC_LZ4 = 0x03
HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2)
def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
@@ -61,11 +63,18 @@ class Message(Struct):
def decompress(self):
codec = self.attributes & self.CODEC_MASK
- assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY)
+ assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
if codec == self.CODEC_GZIP:
+ assert has_gzip(), 'Gzip decompression unsupported'
raw_bytes = gzip_decode(self.value)
- else:
+ elif codec == self.CODEC_SNAPPY:
+ assert has_snappy(), 'Snappy decompression unsupported'
raw_bytes = snappy_decode(self.value)
+ elif codec == self.CODEC_LZ4:
+ assert has_lz4(), 'LZ4 decompression unsupported'
+ raw_bytes = lz4_decode(self.value)
+ else:
+ raise Exception('This should be impossible')
return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes))