| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
 | from __future__ import absolute_import
from kafka.vendor import six
from kafka.partitioner.base import Partitioner
class Murmur2Partitioner(Partitioner):
    """
    Implements a partitioner which selects the target partition based on
    the hash of the key. Attempts to apply the same hashing
    function as mainline java client.
    """
    def __call__(self, key, partitions=None, available=None):
        if available:
            return self.partition(key, available)
        return self.partition(key, partitions)
    def partition(self, key, partitions=None):
        if not partitions:
            partitions = self.partitions
        # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69
        idx = (murmur2(key) & 0x7fffffff) % len(partitions)
        return partitions[idx]
class LegacyPartitioner(object):
    """DEPRECATED -- See Issue 374
    Implements a partitioner which selects the target partition based on
    the hash of the key
    """
    def __init__(self, partitions):
        self.partitions = partitions
    def partition(self, key, partitions=None):
        if not partitions:
            partitions = self.partitions
        size = len(partitions)
        idx = hash(key) % size
        return partitions[idx]
# Default will change to Murmur2 in 0.10 release
HashedPartitioner = LegacyPartitioner
# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
def murmur2(data):
    """Pure-python Murmur2 implementation.
    Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
    Args:
        data (bytes): opaque bytes
    Returns: MurmurHash2 of data
    """
    # Python2 bytes is really a str, causing the bitwise operations below to fail
    # so convert to bytearray.
    if six.PY2:
        data = bytearray(bytes(data))
    length = len(data)
    seed = 0x9747b28c
    # 'm' and 'r' are mixing constants generated offline.
    # They're not really 'magic', they just happen to work well.
    m = 0x5bd1e995
    r = 24
    # Initialize the hash to a random value
    h = seed ^ length
    length4 = length // 4
    for i in range(length4):
        i4 = i * 4
        k = ((data[i4 + 0] & 0xff) +
            ((data[i4 + 1] & 0xff) << 8) +
            ((data[i4 + 2] & 0xff) << 16) +
            ((data[i4 + 3] & 0xff) << 24))
        k &= 0xffffffff
        k *= m
        k &= 0xffffffff
        k ^= (k % 0x100000000) >> r # k ^= k >>> r
        k &= 0xffffffff
        k *= m
        k &= 0xffffffff
        h *= m
        h &= 0xffffffff
        h ^= k
        h &= 0xffffffff
    # Handle the last few bytes of the input array
    extra_bytes = length % 4
    if extra_bytes >= 3:
        h ^= (data[(length & ~3) + 2] & 0xff) << 16
        h &= 0xffffffff
    if extra_bytes >= 2:
        h ^= (data[(length & ~3) + 1] & 0xff) << 8
        h &= 0xffffffff
    if extra_bytes >= 1:
        h ^= (data[length & ~3] & 0xff)
        h &= 0xffffffff
        h *= m
        h &= 0xffffffff
    h ^= (h % 0x100000000) >> 13 # h >>> 13;
    h &= 0xffffffff
    h *= m
    h &= 0xffffffff
    h ^= (h % 0x100000000) >> 15 # h >>> 15;
    h &= 0xffffffff
    return h
 |