summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-12-02 13:34:33 -0800
committerDana Powers <dana.powers@gmail.com>2015-12-02 13:34:33 -0800
commit4fc15a5f27ff6f4c0c96ba39b55521891e96e5d5 (patch)
tree8efa8ab04427b0cd1dc1e7dc21c8b42a583d3cc1
parent149d43cf6cfe121347c15b39600299b29b583dc1 (diff)
parentbb6283df6f9331595020d1d8c28818eeb1131957 (diff)
downloadkafka-python-4fc15a5f27ff6f4c0c96ba39b55521891e96e5d5.tar.gz
Merge pull request #439 from chrischamberlin/fix-murmur
Fix translation of Java murmur2 code, fix byte encoding for Python 3.
-rw-r--r--kafka/partitioner/hashed.py22
-rw-r--r--test/test_partitioner.py23
2 files changed, 35 insertions, 10 deletions
diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py
index 6393ce2..d5d6d27 100644
--- a/kafka/partitioner/hashed.py
+++ b/kafka/partitioner/hashed.py
@@ -1,3 +1,5 @@
+import six
+
from .base import Partitioner
@@ -43,14 +45,16 @@ def murmur2(key):
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
Args:
- key: if not a bytearray, converted via bytearray(str(key))
+ key: if not a bytes type, encoded using default encoding
Returns: MurmurHash2 of key bytearray
"""
- # Convert key to a bytearray
- if not isinstance(key, bytearray):
- data = bytearray(str(key))
+ # Convert key to bytes or bytearray
+ if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)):
+ data = key
+ else:
+ data = bytearray(str(key).encode())
length = len(data)
seed = 0x9747b28c
@@ -61,7 +65,7 @@ def murmur2(key):
# Initialize the hash to a random value
h = seed ^ length
- length4 = length / 4
+ length4 = length // 4
for i in range(length4):
i4 = i * 4
@@ -84,15 +88,13 @@ def murmur2(key):
# Handle the last few bytes of the input array
extra_bytes = length % 4
- if extra_bytes == 3:
+ if extra_bytes >= 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16
h &= 0xffffffff
-
- if extra_bytes == 2:
+ if extra_bytes >= 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8
h &= 0xffffffff
-
- if extra_bytes == 1:
+ if extra_bytes >= 1:
h ^= (data[length & ~3] & 0xff)
h &= 0xffffffff
h *= m
diff --git a/test/test_partitioner.py b/test/test_partitioner.py
new file mode 100644
index 0000000..67cd83b
--- /dev/null
+++ b/test/test_partitioner.py
@@ -0,0 +1,23 @@
+import six
+from . import unittest
+
+from kafka.partitioner import (Murmur2Partitioner)
+
+class TestMurmurPartitioner(unittest.TestCase):
+ def test_hash_bytes(self):
+ p = Murmur2Partitioner(range(1000))
+ self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test'))
+
+ def test_hash_encoding(self):
+ p = Murmur2Partitioner(range(1000))
+ self.assertEqual(p.partition('test'), p.partition(u'test'))
+
+ def test_murmur2_java_compatibility(self):
+ p = Murmur2Partitioner(range(1000))
+ # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
+ self.assertEqual(681, p.partition(b''))
+ self.assertEqual(524, p.partition(b'a'))
+ self.assertEqual(434, p.partition(b'ab'))
+ self.assertEqual(107, p.partition(b'abc'))
+ self.assertEqual(566, p.partition(b'123456789'))
+ self.assertEqual(742, p.partition(b'\x00 '))