diff options
Diffstat (limited to 'kafka/partitioner.py')
-rw-r--r-- | kafka/partitioner.py | 58 |
1 files changed, 0 insertions, 58 deletions
diff --git a/kafka/partitioner.py b/kafka/partitioner.py deleted file mode 100644 index 695dd6f..0000000 --- a/kafka/partitioner.py +++ /dev/null @@ -1,58 +0,0 @@ -from itertools import cycle - - -class Partitioner(object): - """ - Base class for a partitioner - """ - def __init__(self, partitions): - """ - Initialize the partitioner - - partitions - A list of available partitions (during startup) - """ - self.partitions = partitions - - def partition(self, key, partitions): - """ - Takes a string key and num_partitions as argument and returns - a partition to be used for the message - - partitions - The list of partitions is passed in every call. This - may look like an overhead, but it will be useful - (in future) when we handle cases like rebalancing - """ - raise NotImplementedError('partition function has to be implemented') - - -class RoundRobinPartitioner(Partitioner): - """ - Implements a round robin partitioner which sends data to partitions - in a round robin fashion - """ - def __init__(self, partitions): - super(RoundRobinPartitioner, self).__init__(partitions) - self.iterpart = cycle(partitions) - - def _set_partitions(self, partitions): - self.partitions = partitions - self.iterpart = cycle(partitions) - - def partition(self, key, partitions): - # Refresh the partition list if necessary - if self.partitions != partitions: - self._set_partitions(partitions) - - return next(self.iterpart) - - -class HashedPartitioner(Partitioner): - """ - Implements a partitioner which selects the target partition based on - the hash of the key - """ - def partition(self, key, partitions): - size = len(partitions) - idx = hash(key) % size - - return partitions[idx] |