summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorChris Chamberlin <chris.chamberlin@climate.com>2015-07-28 22:38:28 -0700
committerChris Chamberlin <chris.chamberlin@climate.com>2015-07-28 22:38:28 -0700
commitbb6283df6f9331595020d1d8c28818eeb1131957 (patch)
tree6d0cc4d5163449bc87468f5e01450c6835fffab3 /kafka
parentadbd4ac052e4a5b40cfc2a3589b7adbcb656afe5 (diff)
downloadkafka-python-bb6283df6f9331595020d1d8c28818eeb1131957.tar.gz
Fix translation of Java murmur2 code, fix byte encoding for Python 3.
Avoid further type changes when Murmur2Partitioner is passed a byte array. Change leftover-bytes logic to match the Java switch statement, and add tests to verify that partition selection matches Java implementation.
Diffstat (limited to 'kafka')
-rw-r--r--kafka/partitioner/hashed.py22
1 files changed, 12 insertions, 10 deletions
diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py
index 6393ce2..d5d6d27 100644
--- a/kafka/partitioner/hashed.py
+++ b/kafka/partitioner/hashed.py
@@ -1,3 +1,5 @@
+import six
+
from .base import Partitioner
@@ -43,14 +45,16 @@ def murmur2(key):
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
Args:
- key: if not a bytearray, converted via bytearray(str(key))
+ key: if not a bytes type, encoded using default encoding
Returns: MurmurHash2 of key bytearray
"""
- # Convert key to a bytearray
- if not isinstance(key, bytearray):
- data = bytearray(str(key))
+ # Convert key to bytes or bytearray
+ if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)):
+ data = key
+ else:
+ data = bytearray(str(key).encode())
length = len(data)
seed = 0x9747b28c
@@ -61,7 +65,7 @@ def murmur2(key):
# Initialize the hash to a random value
h = seed ^ length
- length4 = length / 4
+ length4 = length // 4
for i in range(length4):
i4 = i * 4
@@ -84,15 +88,13 @@ def murmur2(key):
# Handle the last few bytes of the input array
extra_bytes = length % 4
- if extra_bytes == 3:
+ if extra_bytes >= 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16
h &= 0xffffffff
-
- if extra_bytes == 2:
+ if extra_bytes >= 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8
h &= 0xffffffff
-
- if extra_bytes == 1:
+ if extra_bytes >= 1:
h ^= (data[length & ~3] & 0xff)
h &= 0xffffffff
h *= m