blob: 8190c34f931bdb9254d91d7de3fc90f0044b11cc (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
from itertools import cycle
class Partitioner(object):
"""
Base class for a partitioner
"""
def __init__(self, partitions):
"""
Initialize the partitioner
partitions - A list of available partitions (during startup)
"""
self.partitions = partitions
def partition(self, key, partitions):
"""
Takes a string key and num_partitions as argument and returns
a partition to be used for the message
partitions - The list of partitions is passed in every call. This
may look like an overhead, but it will be useful
(in future) when we handle cases like rebalancing
"""
raise NotImplementedError('partition function has to be implemented')
class RoundRobinPartitioner(Partitioner):
"""
Implements a round robin partitioner which sends data to partitions
in a round robin fashion
"""
def __init__(self, partitions):
super(RoundRobinPartitioner, self).__init__(partitions)
self.iterpart = cycle(partitions)
def _set_partitions(self, partitions):
self.partitions = partitions
self.iterpart = cycle(partitions)
def partition(self, key, partitions):
# Refresh the partition list if necessary
if self.partitions != partitions:
self._set_partitions(partitions)
return self.iterpart.next()
class HashedPartitioner(Partitioner):
"""
Implements a partitioner which selects the target partition based on
the hash of the key
"""
def partition(self, key, partitions):
size = len(partitions)
idx = hash(key) % size
return partitions[idx]
|