summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-01-26 20:35:22 -0800
committerDana Powers <dana.powers@gmail.com>2016-01-26 21:51:29 -0800
commitf08ec792ee93fd059e81ee1e30f5651c15f69e85 (patch)
tree6698c6b28681d35edbfe5ca9ff385c8664f33ee1
parent66284e57accec5977d606fc91a0b28177b352eb4 (diff)
downloadkafka-python-lz4_fixup.tar.gz
Handle broken LZ4 framing; switch to lz4tools + xxhashlz4_fixup
-rw-r--r--README.rst5
-rw-r--r--docs/index.rst5
-rw-r--r--docs/install.rst10
-rw-r--r--kafka/codec.py58
-rw-r--r--test/test_producer.py12
-rw-r--r--tox.ini4
6 files changed, 72 insertions, 22 deletions
diff --git a/README.rst b/README.rst
index 61b737f..782aba0 100644
--- a/README.rst
+++ b/README.rst
@@ -102,8 +102,9 @@ Compression
***********
kafka-python supports gzip compression/decompression natively. To produce or
-consume snappy and lz4 compressed messages, you must install `lz4` (`lz4-cffi`
-if using pypy) and/or `python-snappy` (also requires snappy library).
+consume lz4 compressed messages, you must install lz4tools and xxhash (modules
+may not work on python2.6). To enable snappy compression/decompression install
+python-snappy (also requires snappy library).
See `Installation <http://kafka-python.readthedocs.org/en/master/install.html#optional-snappy-install>`_
for more information.
diff --git a/docs/index.rst b/docs/index.rst
index 2f54b09..fd13a46 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -101,8 +101,9 @@ Compression
***********
kafka-python supports gzip compression/decompression natively. To produce or
-consume snappy and lz4 compressed messages, you must install lz4 (lz4-cffi
-if using pypy) and/or python-snappy (also requires snappy library).
+consume lz4 compressed messages, you must install lz4tools and xxhash (modules
+may not work on python2.6). To enable snappy, install python-snappy (also
+requires snappy library).
See `Installation <install.html#optional-snappy-install>`_ for more information.
diff --git a/docs/install.rst b/docs/install.rst
index aba5019..4dca5d0 100644
--- a/docs/install.rst
+++ b/docs/install.rst
@@ -40,14 +40,12 @@ Using `setup.py` directly:
Optional LZ4 install
********************
-To enable LZ4 compression/decompression, install `lz4`:
+To enable LZ4 compression/decompression, install lz4tools and xxhash:
->>> pip install lz4
-
-Or `lz4-cffi` if using pypy:
-
->>> pip install lz4-cffi
+>>> pip install lz4tools
+>>> pip install xxhash
+*Note*: these modules do not support python2.6
Optional Snappy install
***********************
diff --git a/kafka/codec.py b/kafka/codec.py
index 11d5a99..e94bc4c 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -15,13 +15,10 @@ except ImportError:
snappy = None
try:
- import lz4
- from lz4 import compress as lz4_encode
- from lz4 import decompress as lz4_decode
+ import lz4f
+ import xxhash
except ImportError:
- lz4 = None
- lz4_encode = None
- lz4_decode = None
+ lz4f = None
PYPY = bool(platform.python_implementation() == 'PyPy')
@@ -34,7 +31,7 @@ def has_snappy():
def has_lz4():
- return lz4 is not None
+ return lz4f is not None
def gzip_encode(payload, compresslevel=None):
@@ -180,3 +177,50 @@ def snappy_decode(payload):
return out.read()
else:
return snappy.decompress(payload)
+
+
+def lz4_encode(payload):
+ data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member
+ # Kafka's LZ4 code has a bug in its header checksum implementation
+ header_size = 7
+ if isinstance(data[4], int):
+ flg = data[4]
+ else:
+ flg = ord(data[4])
+ content_size_bit = ((flg >> 3) & 1)
+ if content_size_bit:
+ header_size += 8
+
+ # This is the incorrect hc
+ hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
+
+ return b''.join([
+ data[0:header_size-1],
+ hc,
+ data[header_size:]
+ ])
+
+
+def lz4_decode(payload):
+ # Kafka's LZ4 code has a bug in its header checksum implementation
+ header_size = 7
+ if isinstance(payload[4], int):
+ flg = payload[4]
+ else:
+ flg = ord(payload[4])
+ content_size_bit = ((flg >> 3) & 1)
+ if content_size_bit:
+ header_size += 8
+
+ # This should be the correct hc
+ hc = xxhash.xxh32(payload[4:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
+
+ munged_payload = b''.join([
+ payload[0:header_size-1],
+ hc,
+ payload[header_size:]
+ ])
+
+ cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member
+ data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member
+ return data['decomp']
diff --git a/test/test_producer.py b/test/test_producer.py
index 7a2db68..36da68d 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,3 +1,5 @@
+import sys
+
import pytest
from kafka import KafkaConsumer, KafkaProducer
@@ -9,9 +11,13 @@ from test.testutil import random_string
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
def test_end_to_end(kafka_broker, compression):
- # LZ4 requires 0.8.2
- if compression == 'lz4' and version() < (0, 8, 2):
- return
+ if compression == 'lz4':
+ # LZ4 requires 0.8.2
+ if version() < (0, 8, 2):
+ return
+ # LZ4 python libs dont work on python2.6
+ elif sys.version_info < (2, 7):
+ return
connect_str = 'localhost:' + str(kafka_broker.port)
producer = KafkaProducer(bootstrap_servers=connect_str,
diff --git a/tox.ini b/tox.ini
index 4ead9e3..ce7feee 100644
--- a/tox.ini
+++ b/tox.ini
@@ -16,8 +16,8 @@ deps =
pytest-mock
mock
python-snappy
- py{26,27,33,34,35}: lz4
- pypy: lz4-cffi
+ lz4tools
+ xxhash
py{26,27}: six
py26: unittest2
commands =