diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-10-22 09:48:08 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-10-22 09:48:08 -0700 |
commit | 8de40a20d909c90745b39df09b3aa9d2cc194b68 (patch) | |
tree | 85914fcaaccc700b3ddce7eb1f9ff143dc40285a | |
parent | 9450a6bfff8517371162a968f4345ffc09380bb8 (diff) | |
download | kafka-python-8de40a20d909c90745b39df09b3aa9d2cc194b68.tar.gz |
Fix murmur2 bug handling python2 bytes that do not ascii encode (#815)
* Add test for murmur2 py2 bytes bug
* Fix murmur2 handling of python2 bytes
* Drop bytearray / str / unicode MurmurPartitioner tests -- no longer supported
* Make DefaultPartitioner importable from kafka.partitioner
-rw-r--r-- | kafka/partitioner/__init__.py | 7 | ||||
-rw-r--r-- | kafka/partitioner/hashed.py | 16 | ||||
-rw-r--r-- | test/test_partitioner.py | 24 |
3 files changed, 20 insertions, 27 deletions
diff --git a/kafka/partitioner/__init__.py b/kafka/partitioner/__init__.py index 9ce6ade..299b485 100644 --- a/kafka/partitioner/__init__.py +++ b/kafka/partitioner/__init__.py @@ -1,9 +1,10 @@ from __future__ import absolute_import -from .roundrobin import RoundRobinPartitioner +from .default import DefaultPartitioner from .hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner +from .roundrobin import RoundRobinPartitioner __all__ = [ - 'RoundRobinPartitioner', 'HashedPartitioner', 'Murmur2Partitioner', - 'LegacyPartitioner' + 'DefaultPartitioner', 'RoundRobinPartitioner', 'HashedPartitioner', + 'Murmur2Partitioner', 'LegacyPartitioner' ] diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py index b6b8f7f..06307f0 100644 --- a/kafka/partitioner/hashed.py +++ b/kafka/partitioner/hashed.py @@ -49,22 +49,20 @@ HashedPartitioner = LegacyPartitioner # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 -def murmur2(key): +def murmur2(data): """Pure-python Murmur2 implementation. Based on java client, see org.apache.kafka.common.utils.Utils.murmur2 Args: - key: if not a bytes type, encoded using default encoding + data (bytes): opaque bytes - Returns: MurmurHash2 of key bytearray + Returns: MurmurHash2 of data """ - - # Convert key to bytes or bytearray - if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)): - data = key - else: - data = bytearray(str(key).encode()) + # Python2 bytes is really a str, causing the bitwise operations below to fail + # so convert to bytearray. + if six.PY2: + data = bytearray(bytes(data)) length = len(data) seed = 0x9747b28c diff --git a/test/test_partitioner.py b/test/test_partitioner.py index e0398c6..2b5fe62 100644 --- a/test/test_partitioner.py +++ b/test/test_partitioner.py @@ -1,9 +1,7 @@ -import pytest -import six +from __future__ import absolute_import -from kafka.partitioner import Murmur2Partitioner -from kafka.partitioner.default import DefaultPartitioner -from kafka.partitioner import RoundRobinPartitioner +from kafka.partitioner import DefaultPartitioner, Murmur2Partitioner, RoundRobinPartitioner +from kafka.partitioner.hashed import murmur2 def test_default_partitioner(): @@ -55,16 +53,6 @@ def test_roundrobin_partitioner(): i += 1 -def test_hash_bytes(): - p = Murmur2Partitioner(range(1000)) - assert p.partition(bytearray(b'test')) == p.partition(b'test') - - -def test_hash_encoding(): - p = Murmur2Partitioner(range(1000)) - assert p.partition('test') == p.partition(u'test') - - def test_murmur2_java_compatibility(): p = Murmur2Partitioner(range(1000)) # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner @@ -74,3 +62,9 @@ def test_murmur2_java_compatibility(): assert p.partition(b'abc') == 107 assert p.partition(b'123456789') == 566 assert p.partition(b'\x00 ') == 742 + + +def test_murmur2_not_ascii(): + # Verify no regression of murmur2() bug encoding py2 bytes that dont ascii encode + murmur2(b'\xa4') + murmur2(b'\x81' * 1000) |