diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-01-26 20:35:22 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-01-26 21:51:29 -0800 |
commit | f08ec792ee93fd059e81ee1e30f5651c15f69e85 (patch) | |
tree | 6698c6b28681d35edbfe5ca9ff385c8664f33ee1 | |
parent | 66284e57accec5977d606fc91a0b28177b352eb4 (diff) | |
download | kafka-python-lz4_fixup.tar.gz |
Handle broken LZ4 framing; switch to lz4tools + xxhashlz4_fixup
-rw-r--r-- | README.rst | 5 | ||||
-rw-r--r-- | docs/index.rst | 5 | ||||
-rw-r--r-- | docs/install.rst | 10 | ||||
-rw-r--r-- | kafka/codec.py | 58 | ||||
-rw-r--r-- | test/test_producer.py | 12 | ||||
-rw-r--r-- | tox.ini | 4 |
6 files changed, 72 insertions, 22 deletions
@@ -102,8 +102,9 @@ Compression *********** kafka-python supports gzip compression/decompression natively. To produce or -consume snappy and lz4 compressed messages, you must install `lz4` (`lz4-cffi` -if using pypy) and/or `python-snappy` (also requires snappy library). +consume lz4 compressed messages, you must install lz4tools and xxhash (modules +may not work on python2.6). To enable snappy compression/decompression install +python-snappy (also requires snappy library). See `Installation <http://kafka-python.readthedocs.org/en/master/install.html#optional-snappy-install>`_ for more information. diff --git a/docs/index.rst b/docs/index.rst index 2f54b09..fd13a46 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -101,8 +101,9 @@ Compression *********** kafka-python supports gzip compression/decompression natively. To produce or -consume snappy and lz4 compressed messages, you must install lz4 (lz4-cffi -if using pypy) and/or python-snappy (also requires snappy library). +consume lz4 compressed messages, you must install lz4tools and xxhash (modules +may not work on python2.6). To enable snappy, install python-snappy (also +requires snappy library). See `Installation <install.html#optional-snappy-install>`_ for more information. diff --git a/docs/install.rst b/docs/install.rst index aba5019..4dca5d0 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -40,14 +40,12 @@ Using `setup.py` directly: Optional LZ4 install ******************** -To enable LZ4 compression/decompression, install `lz4`: +To enable LZ4 compression/decompression, install lz4tools and xxhash: ->>> pip install lz4 - -Or `lz4-cffi` if using pypy: - ->>> pip install lz4-cffi +>>> pip install lz4tools +>>> pip install xxhash +*Note*: these modules do not support python2.6 Optional Snappy install *********************** diff --git a/kafka/codec.py b/kafka/codec.py index 11d5a99..e94bc4c 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -15,13 +15,10 @@ except ImportError: snappy = None try: - import lz4 - from lz4 import compress as lz4_encode - from lz4 import decompress as lz4_decode + import lz4f + import xxhash except ImportError: - lz4 = None - lz4_encode = None - lz4_decode = None + lz4f = None PYPY = bool(platform.python_implementation() == 'PyPy') @@ -34,7 +31,7 @@ def has_snappy(): def has_lz4(): - return lz4 is not None + return lz4f is not None def gzip_encode(payload, compresslevel=None): @@ -180,3 +177,50 @@ def snappy_decode(payload): return out.read() else: return snappy.decompress(payload) + + +def lz4_encode(payload): + data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member + # Kafka's LZ4 code has a bug in its header checksum implementation + header_size = 7 + if isinstance(data[4], int): + flg = data[4] + else: + flg = ord(data[4]) + content_size_bit = ((flg >> 3) & 1) + if content_size_bit: + header_size += 8 + + # This is the incorrect hc + hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member + + return b''.join([ + data[0:header_size-1], + hc, + data[header_size:] + ]) + + +def lz4_decode(payload): + # Kafka's LZ4 code has a bug in its header checksum implementation + header_size = 7 + if isinstance(payload[4], int): + flg = payload[4] + else: + flg = ord(payload[4]) + content_size_bit = ((flg >> 3) & 1) + if content_size_bit: + header_size += 8 + + # This should be the correct hc + hc = xxhash.xxh32(payload[4:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member + + munged_payload = b''.join([ + payload[0:header_size-1], + hc, + payload[header_size:] + ]) + + cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member + data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member + return data['decomp'] diff --git a/test/test_producer.py b/test/test_producer.py index 7a2db68..36da68d 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,3 +1,5 @@ +import sys + import pytest from kafka import KafkaConsumer, KafkaProducer @@ -9,9 +11,13 @@ from test.testutil import random_string @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) def test_end_to_end(kafka_broker, compression): - # LZ4 requires 0.8.2 - if compression == 'lz4' and version() < (0, 8, 2): - return + if compression == 'lz4': + # LZ4 requires 0.8.2 + if version() < (0, 8, 2): + return + # LZ4 python libs dont work on python2.6 + elif sys.version_info < (2, 7): + return connect_str = 'localhost:' + str(kafka_broker.port) producer = KafkaProducer(bootstrap_servers=connect_str, @@ -16,8 +16,8 @@ deps = pytest-mock mock python-snappy - py{26,27,33,34,35}: lz4 - pypy: lz4-cffi + lz4tools + xxhash py{26,27}: six py26: unittest2 commands = |