diff options
Diffstat (limited to 'kafka/codec.py')
-rw-r--r-- | kafka/codec.py | 25 |
1 files changed, 25 insertions, 0 deletions
diff --git a/kafka/codec.py b/kafka/codec.py index aa9fc82..917400e 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -10,6 +10,7 @@ from kafka.vendor.six.moves import range _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' +ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024 try: import snappy @@ -17,6 +18,11 @@ except ImportError: snappy = None try: + import zstandard as zstd +except ImportError: + zstd = None + +try: import lz4.frame as lz4 def _lz4_compress(payload, **kwargs): @@ -58,6 +64,10 @@ def has_snappy(): return snappy is not None +def has_zstd(): + return zstd is not None + + def has_lz4(): if lz4 is not None: return True @@ -299,3 +309,18 @@ def lz4_decode_old_kafka(payload): payload[header_size:] ]) return lz4_decode(munged_payload) + + +def zstd_encode(payload): + if not zstd: + raise NotImplementedError("Zstd codec is not available") + return zstd.ZstdCompressor().compress(payload) + + +def zstd_decode(payload): + if not zstd: + raise NotImplementedError("Zstd codec is not available") + try: + return zstd.ZstdDecompressor().decompress(payload) + except zstd.ZstdError: + return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE) |