diff options
-rw-r--r-- | kafka/partitioner/hashed.py | 22 | ||||
-rw-r--r-- | test/test_partitioner.py | 23 |
2 files changed, 35 insertions, 10 deletions
diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py index 6393ce2..d5d6d27 100644 --- a/kafka/partitioner/hashed.py +++ b/kafka/partitioner/hashed.py @@ -1,3 +1,5 @@ +import six + from .base import Partitioner @@ -43,14 +45,16 @@ def murmur2(key): Based on java client, see org.apache.kafka.common.utils.Utils.murmur2 Args: - key: if not a bytearray, converted via bytearray(str(key)) + key: if not a bytes type, encoded using default encoding Returns: MurmurHash2 of key bytearray """ - # Convert key to a bytearray - if not isinstance(key, bytearray): - data = bytearray(str(key)) + # Convert key to bytes or bytearray + if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)): + data = key + else: + data = bytearray(str(key).encode()) length = len(data) seed = 0x9747b28c @@ -61,7 +65,7 @@ def murmur2(key): # Initialize the hash to a random value h = seed ^ length - length4 = length / 4 + length4 = length // 4 for i in range(length4): i4 = i * 4 @@ -84,15 +88,13 @@ def murmur2(key): # Handle the last few bytes of the input array extra_bytes = length % 4 - if extra_bytes == 3: + if extra_bytes >= 3: h ^= (data[(length & ~3) + 2] & 0xff) << 16 h &= 0xffffffff - - if extra_bytes == 2: + if extra_bytes >= 2: h ^= (data[(length & ~3) + 1] & 0xff) << 8 h &= 0xffffffff - - if extra_bytes == 1: + if extra_bytes >= 1: h ^= (data[length & ~3] & 0xff) h &= 0xffffffff h *= m diff --git a/test/test_partitioner.py b/test/test_partitioner.py new file mode 100644 index 0000000..67cd83b --- /dev/null +++ b/test/test_partitioner.py @@ -0,0 +1,23 @@ +import six +from . import unittest + +from kafka.partitioner import (Murmur2Partitioner) + +class TestMurmurPartitioner(unittest.TestCase): + def test_hash_bytes(self): + p = Murmur2Partitioner(range(1000)) + self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test')) + + def test_hash_encoding(self): + p = Murmur2Partitioner(range(1000)) + self.assertEqual(p.partition('test'), p.partition(u'test')) + + def test_murmur2_java_compatibility(self): + p = Murmur2Partitioner(range(1000)) + # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner + self.assertEqual(681, p.partition(b'')) + self.assertEqual(524, p.partition(b'a')) + self.assertEqual(434, p.partition(b'ab')) + self.assertEqual(107, p.partition(b'abc')) + self.assertEqual(566, p.partition(b'123456789')) + self.assertEqual(742, p.partition(b'\x00 ')) |