summaryrefslogtreecommitdiff
path: root/kafka/partitioner/hashed.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-05-17 12:28:28 -0700
committerDana Powers <dana.powers@rd.io>2015-06-10 13:39:56 -0700
commit4e339a746d84650f75fb401e3bd9cfe6e3a65e00 (patch)
tree25895be15fac221bf3c041af71a407ee8428cea9 /kafka/partitioner/hashed.py
parent8e3cd1ca2801ed13522d177305898c29a3ebfd9b (diff)
downloadkafka-python-4e339a746d84650f75fb401e3bd9cfe6e3a65e00.tar.gz
Use MurmurHash2 for key partition hashing
Diffstat (limited to 'kafka/partitioner/hashed.py')
-rw-r--r--kafka/partitioner/hashed.py96
1 files changed, 95 insertions, 1 deletions
diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py
index fb5e598..6393ce2 100644
--- a/kafka/partitioner/hashed.py
+++ b/kafka/partitioner/hashed.py
@@ -1,8 +1,26 @@
from .base import Partitioner
-class HashedPartitioner(Partitioner):
+
+class Murmur2Partitioner(Partitioner):
"""
Implements a partitioner which selects the target partition based on
+ the hash of the key. Attempts to apply the same hashing
+ function as mainline java client.
+ """
+ def partition(self, key, partitions=None):
+ if not partitions:
+ partitions = self.partitions
+
+ # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69
+ idx = (murmur2(key) & 0x7fffffff) % len(partitions)
+
+ return partitions[idx]
+
+
+class LegacyPartitioner(Partitioner):
+ """DEPRECATED -- See Issue 374
+
+ Implements a partitioner which selects the target partition based on
the hash of the key
"""
def partition(self, key, partitions=None):
@@ -12,3 +30,79 @@ class HashedPartitioner(Partitioner):
idx = hash(key) % size
return partitions[idx]
+
+
+# Default will change to Murmur2 in 0.10 release
+HashedPartitioner = LegacyPartitioner
+
+
+# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
+def murmur2(key):
+ """Pure-python Murmur2 implementation.
+
+ Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
+
+ Args:
+ key: if not a bytearray, converted via bytearray(str(key))
+
+ Returns: MurmurHash2 of key bytearray
+ """
+
+ # Convert key to a bytearray
+ if not isinstance(key, bytearray):
+ data = bytearray(str(key))
+
+ length = len(data)
+ seed = 0x9747b28c
+ # 'm' and 'r' are mixing constants generated offline.
+ # They're not really 'magic', they just happen to work well.
+ m = 0x5bd1e995
+ r = 24
+
+ # Initialize the hash to a random value
+ h = seed ^ length
+ length4 = length / 4
+
+ for i in range(length4):
+ i4 = i * 4
+ k = ((data[i4 + 0] & 0xff) +
+ ((data[i4 + 1] & 0xff) << 8) +
+ ((data[i4 + 2] & 0xff) << 16) +
+ ((data[i4 + 3] & 0xff) << 24))
+ k &= 0xffffffff
+ k *= m
+ k &= 0xffffffff
+ k ^= (k % 0x100000000) >> r # k ^= k >>> r
+ k &= 0xffffffff
+ k *= m
+ k &= 0xffffffff
+
+ h *= m
+ h &= 0xffffffff
+ h ^= k
+ h &= 0xffffffff
+
+ # Handle the last few bytes of the input array
+ extra_bytes = length % 4
+ if extra_bytes == 3:
+ h ^= (data[(length & ~3) + 2] & 0xff) << 16
+ h &= 0xffffffff
+
+ if extra_bytes == 2:
+ h ^= (data[(length & ~3) + 1] & 0xff) << 8
+ h &= 0xffffffff
+
+ if extra_bytes == 1:
+ h ^= (data[length & ~3] & 0xff)
+ h &= 0xffffffff
+ h *= m
+ h &= 0xffffffff
+
+ h ^= (h % 0x100000000) >> 13 # h >>> 13;
+ h &= 0xffffffff
+ h *= m
+ h &= 0xffffffff
+ h ^= (h % 0x100000000) >> 15 # h >>> 15;
+ h &= 0xffffffff
+
+ return h