summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-10-22 09:48:08 -0700
committerGitHub <noreply@github.com>2016-10-22 09:48:08 -0700
commit8de40a20d909c90745b39df09b3aa9d2cc194b68 (patch)
tree85914fcaaccc700b3ddce7eb1f9ff143dc40285a
parent9450a6bfff8517371162a968f4345ffc09380bb8 (diff)
downloadkafka-python-8de40a20d909c90745b39df09b3aa9d2cc194b68.tar.gz
Fix murmur2 bug handling python2 bytes that do not ascii encode (#815)
* Add test for murmur2 py2 bytes bug * Fix murmur2 handling of python2 bytes * Drop bytearray / str / unicode MurmurPartitioner tests -- no longer supported * Make DefaultPartitioner importable from kafka.partitioner
-rw-r--r--kafka/partitioner/__init__.py7
-rw-r--r--kafka/partitioner/hashed.py16
-rw-r--r--test/test_partitioner.py24
3 files changed, 20 insertions, 27 deletions
diff --git a/kafka/partitioner/__init__.py b/kafka/partitioner/__init__.py
index 9ce6ade..299b485 100644
--- a/kafka/partitioner/__init__.py
+++ b/kafka/partitioner/__init__.py
@@ -1,9 +1,10 @@
from __future__ import absolute_import
-from .roundrobin import RoundRobinPartitioner
+from .default import DefaultPartitioner
from .hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner
+from .roundrobin import RoundRobinPartitioner
__all__ = [
- 'RoundRobinPartitioner', 'HashedPartitioner', 'Murmur2Partitioner',
- 'LegacyPartitioner'
+ 'DefaultPartitioner', 'RoundRobinPartitioner', 'HashedPartitioner',
+ 'Murmur2Partitioner', 'LegacyPartitioner'
]
diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py
index b6b8f7f..06307f0 100644
--- a/kafka/partitioner/hashed.py
+++ b/kafka/partitioner/hashed.py
@@ -49,22 +49,20 @@ HashedPartitioner = LegacyPartitioner
# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
-def murmur2(key):
+def murmur2(data):
"""Pure-python Murmur2 implementation.
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
Args:
- key: if not a bytes type, encoded using default encoding
+ data (bytes): opaque bytes
- Returns: MurmurHash2 of key bytearray
+ Returns: MurmurHash2 of data
"""
-
- # Convert key to bytes or bytearray
- if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)):
- data = key
- else:
- data = bytearray(str(key).encode())
+ # Python2 bytes is really a str, causing the bitwise operations below to fail
+ # so convert to bytearray.
+ if six.PY2:
+ data = bytearray(bytes(data))
length = len(data)
seed = 0x9747b28c
diff --git a/test/test_partitioner.py b/test/test_partitioner.py
index e0398c6..2b5fe62 100644
--- a/test/test_partitioner.py
+++ b/test/test_partitioner.py
@@ -1,9 +1,7 @@
-import pytest
-import six
+from __future__ import absolute_import
-from kafka.partitioner import Murmur2Partitioner
-from kafka.partitioner.default import DefaultPartitioner
-from kafka.partitioner import RoundRobinPartitioner
+from kafka.partitioner import DefaultPartitioner, Murmur2Partitioner, RoundRobinPartitioner
+from kafka.partitioner.hashed import murmur2
def test_default_partitioner():
@@ -55,16 +53,6 @@ def test_roundrobin_partitioner():
i += 1
-def test_hash_bytes():
- p = Murmur2Partitioner(range(1000))
- assert p.partition(bytearray(b'test')) == p.partition(b'test')
-
-
-def test_hash_encoding():
- p = Murmur2Partitioner(range(1000))
- assert p.partition('test') == p.partition(u'test')
-
-
def test_murmur2_java_compatibility():
p = Murmur2Partitioner(range(1000))
# compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
@@ -74,3 +62,9 @@ def test_murmur2_java_compatibility():
assert p.partition(b'abc') == 107
assert p.partition(b'123456789') == 566
assert p.partition(b'\x00 ') == 742
+
+
+def test_murmur2_not_ascii():
+ # Verify no regression of murmur2() bug encoding py2 bytes that dont ascii encode
+ murmur2(b'\xa4')
+ murmur2(b'\x81' * 1000)