diff options
author | barrotsteindev <barrotstein@gmail.com> | 2016-09-28 20:30:32 +0300 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-09-28 10:30:32 -0700 |
commit | b8717b4b79462e83344f49bbd42312cf521d84aa (patch) | |
tree | c20e9a2f2e33e744702d277cb84e7a08c85d2218 | |
parent | 5c784890b6f323ea37c6171a59184e9304cbcb5c (diff) | |
download | kafka-python-b8717b4b79462e83344f49bbd42312cf521d84aa.tar.gz |
Update Partitioners for use with KafkaProducer (#827)
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | kafka/partitioner/base.py | 13 | ||||
-rw-r--r-- | kafka/partitioner/hashed.py | 10 | ||||
-rw-r--r-- | kafka/partitioner/roundrobin.py | 78 | ||||
-rw-r--r-- | test/test_partitioner.py | 33 |
5 files changed, 112 insertions, 24 deletions
@@ -12,3 +12,5 @@ servers/*/resources/ssl* docs/_build .cache* .idea/ +integration-test/ +tests-env/
\ No newline at end of file diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py index 00f7be3..0e36253 100644 --- a/kafka/partitioner/base.py +++ b/kafka/partitioner/base.py @@ -5,22 +5,23 @@ class Partitioner(object): """ Base class for a partitioner """ - def __init__(self, partitions): + def __init__(self, partitions=None): """ Initialize the partitioner Arguments: - partitions: A list of available partitions (during startup) + partitions: A list of available partitions (during startup) OPTIONAL. """ self.partitions = partitions - def partition(self, key, partitions=None): + def __call__(self, key, all_partitions=None, available_partitions=None): """ - Takes a string key and num_partitions as argument and returns + Takes a string key, num_partitions and available_partitions as argument and returns a partition to be used for the message Arguments: - key: the key to use for partitioning - partitions: (optional) a list of partitions. + key: the key to use for partitioning. + all_partitions: a list of the topic's partitions. + available_partitions: a list of the broker's currently avaliable partitions(optional). """ raise NotImplementedError('partition function has to be implemented') diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py index 988319b..b6b8f7f 100644 --- a/kafka/partitioner/hashed.py +++ b/kafka/partitioner/hashed.py @@ -11,6 +11,11 @@ class Murmur2Partitioner(Partitioner): the hash of the key. Attempts to apply the same hashing function as mainline java client. """ + def __call__(self, key, partitions=None, available=None): + if available: + return self.partition(key, available) + return self.partition(key, partitions) + def partition(self, key, partitions=None): if not partitions: partitions = self.partitions @@ -21,12 +26,15 @@ class Murmur2Partitioner(Partitioner): return partitions[idx] -class LegacyPartitioner(Partitioner): +class LegacyPartitioner(object): """DEPRECATED -- See Issue 374 Implements a partitioner which selects the target partition based on the hash of the key """ + def __init__(self, partitions): + self.partitions = partitions + def partition(self, key, partitions=None): if not partitions: partitions = self.partitions diff --git a/kafka/partitioner/roundrobin.py b/kafka/partitioner/roundrobin.py index d244353..9ac2ed0 100644 --- a/kafka/partitioner/roundrobin.py +++ b/kafka/partitioner/roundrobin.py @@ -1,26 +1,70 @@ from __future__ import absolute_import -from itertools import cycle - from .base import Partitioner class RoundRobinPartitioner(Partitioner): - """ - Implements a round robin partitioner which sends data to partitions - in a round robin fashion - """ - def __init__(self, partitions): - super(RoundRobinPartitioner, self).__init__(partitions) - self.iterpart = cycle(partitions) - - def _set_partitions(self, partitions): + def __init__(self, partitions=None): + self.partitions_iterable = CachedPartitionCycler(partitions) + if partitions: + self._set_partitions(partitions) + else: + self.partitions = None + + def __call__(self, key, all_partitions=None, available_partitions=None): + if available_partitions: + cur_partitions = available_partitions + else: + cur_partitions = all_partitions + if not self.partitions: + self._set_partitions(cur_partitions) + elif cur_partitions != self.partitions_iterable.partitions and cur_partitions is not None: + self._set_partitions(cur_partitions) + return next(self.partitions_iterable) + + def _set_partitions(self, available_partitions): + self.partitions = available_partitions + self.partitions_iterable.set_partitions(available_partitions) + + def partition(self, key, all_partitions=None, available_partitions=None): + return self.__call__(key, all_partitions, available_partitions) + + +class CachedPartitionCycler(object): + def __init__(self, partitions=None): self.partitions = partitions - self.iterpart = cycle(partitions) + if partitions: + assert type(partitions) is list + self.cur_pos = None - def partition(self, key, partitions=None): - # Refresh the partition list if necessary - if partitions and self.partitions != partitions: - self._set_partitions(partitions) + def __next__(self): + return self.next() + + @staticmethod + def _index_available(cur_pos, partitions): + return cur_pos < len(partitions) + + def set_partitions(self, partitions): + if self.cur_pos: + if not self._index_available(self.cur_pos, partitions): + self.cur_pos = 0 + self.partitions = partitions + return None + + self.partitions = partitions + next_item = self.partitions[self.cur_pos] + if next_item in partitions: + self.cur_pos = partitions.index(next_item) + else: + self.cur_pos = 0 + return None + self.partitions = partitions - return next(self.iterpart) + def next(self): + assert self.partitions is not None + if self.cur_pos is None or not self._index_available(self.cur_pos, self.partitions): + self.cur_pos = 1 + return self.partitions[0] + cur_item = self.partitions[self.cur_pos] + self.cur_pos += 1 + return cur_item diff --git a/test/test_partitioner.py b/test/test_partitioner.py index 52b6b81..e0398c6 100644 --- a/test/test_partitioner.py +++ b/test/test_partitioner.py @@ -3,6 +3,7 @@ import six from kafka.partitioner import Murmur2Partitioner from kafka.partitioner.default import DefaultPartitioner +from kafka.partitioner import RoundRobinPartitioner def test_default_partitioner(): @@ -22,6 +23,38 @@ def test_default_partitioner(): assert partitioner(None, all_partitions, []) in all_partitions +def test_roundrobin_partitioner(): + partitioner = RoundRobinPartitioner() + all_partitions = list(range(100)) + available = all_partitions + # partitioner should cycle between partitions + i = 0 + max_partition = all_partitions[len(all_partitions) - 1] + while i <= max_partition: + assert i == partitioner(None, all_partitions, available) + i += 1 + + i = 0 + while i <= int(max_partition / 2): + assert i == partitioner(None, all_partitions, available) + i += 1 + + # test dynamic partition re-assignment + available = available[:-25] + + while i <= max(available): + assert i == partitioner(None, all_partitions, available) + i += 1 + + all_partitions = list(range(200)) + available = all_partitions + + max_partition = all_partitions[len(all_partitions) - 1] + while i <= max_partition: + assert i == partitioner(None, all_partitions, available) + i += 1 + + def test_hash_bytes(): p = Murmur2Partitioner(range(1000)) assert p.partition(bytearray(b'test')) == p.partition(b'test') |