summaryrefslogtreecommitdiff
path: root/kafka/codec.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/codec.py')
-rw-r--r--kafka/codec.py25
1 files changed, 25 insertions, 0 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index aa9fc82..917400e 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -10,6 +10,7 @@ from kafka.vendor.six.moves import range
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
+ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024
try:
import snappy
@@ -17,6 +18,11 @@ except ImportError:
snappy = None
try:
+ import zstandard as zstd
+except ImportError:
+ zstd = None
+
+try:
import lz4.frame as lz4
def _lz4_compress(payload, **kwargs):
@@ -58,6 +64,10 @@ def has_snappy():
return snappy is not None
+def has_zstd():
+ return zstd is not None
+
+
def has_lz4():
if lz4 is not None:
return True
@@ -299,3 +309,18 @@ def lz4_decode_old_kafka(payload):
payload[header_size:]
])
return lz4_decode(munged_payload)
+
+
+def zstd_encode(payload):
+ if not zstd:
+ raise NotImplementedError("Zstd codec is not available")
+ return zstd.ZstdCompressor().compress(payload)
+
+
+def zstd_decode(payload):
+ if not zstd:
+ raise NotImplementedError("Zstd codec is not available")
+ try:
+ return zstd.ZstdDecompressor().decompress(payload)
+ except zstd.ZstdError:
+ return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE)