summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--kafka/partitioner/base.py13
-rw-r--r--kafka/partitioner/hashed.py10
-rw-r--r--kafka/partitioner/roundrobin.py78
-rw-r--r--test/test_partitioner.py33
5 files changed, 112 insertions, 24 deletions
diff --git a/.gitignore b/.gitignore
index 7d9069c..edb75c5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,3 +12,5 @@ servers/*/resources/ssl*
docs/_build
.cache*
.idea/
+integration-test/
+tests-env/ \ No newline at end of file
diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py
index 00f7be3..0e36253 100644
--- a/kafka/partitioner/base.py
+++ b/kafka/partitioner/base.py
@@ -5,22 +5,23 @@ class Partitioner(object):
"""
Base class for a partitioner
"""
- def __init__(self, partitions):
+ def __init__(self, partitions=None):
"""
Initialize the partitioner
Arguments:
- partitions: A list of available partitions (during startup)
+ partitions: A list of available partitions (during startup) OPTIONAL.
"""
self.partitions = partitions
- def partition(self, key, partitions=None):
+ def __call__(self, key, all_partitions=None, available_partitions=None):
"""
- Takes a string key and num_partitions as argument and returns
+ Takes a string key, num_partitions and available_partitions as argument and returns
a partition to be used for the message
Arguments:
- key: the key to use for partitioning
- partitions: (optional) a list of partitions.
+ key: the key to use for partitioning.
+ all_partitions: a list of the topic's partitions.
+ available_partitions: a list of the broker's currently avaliable partitions(optional).
"""
raise NotImplementedError('partition function has to be implemented')
diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py
index 988319b..b6b8f7f 100644
--- a/kafka/partitioner/hashed.py
+++ b/kafka/partitioner/hashed.py
@@ -11,6 +11,11 @@ class Murmur2Partitioner(Partitioner):
the hash of the key. Attempts to apply the same hashing
function as mainline java client.
"""
+ def __call__(self, key, partitions=None, available=None):
+ if available:
+ return self.partition(key, available)
+ return self.partition(key, partitions)
+
def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions
@@ -21,12 +26,15 @@ class Murmur2Partitioner(Partitioner):
return partitions[idx]
-class LegacyPartitioner(Partitioner):
+class LegacyPartitioner(object):
"""DEPRECATED -- See Issue 374
Implements a partitioner which selects the target partition based on
the hash of the key
"""
+ def __init__(self, partitions):
+ self.partitions = partitions
+
def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions
diff --git a/kafka/partitioner/roundrobin.py b/kafka/partitioner/roundrobin.py
index d244353..9ac2ed0 100644
--- a/kafka/partitioner/roundrobin.py
+++ b/kafka/partitioner/roundrobin.py
@@ -1,26 +1,70 @@
from __future__ import absolute_import
-from itertools import cycle
-
from .base import Partitioner
class RoundRobinPartitioner(Partitioner):
- """
- Implements a round robin partitioner which sends data to partitions
- in a round robin fashion
- """
- def __init__(self, partitions):
- super(RoundRobinPartitioner, self).__init__(partitions)
- self.iterpart = cycle(partitions)
-
- def _set_partitions(self, partitions):
+ def __init__(self, partitions=None):
+ self.partitions_iterable = CachedPartitionCycler(partitions)
+ if partitions:
+ self._set_partitions(partitions)
+ else:
+ self.partitions = None
+
+ def __call__(self, key, all_partitions=None, available_partitions=None):
+ if available_partitions:
+ cur_partitions = available_partitions
+ else:
+ cur_partitions = all_partitions
+ if not self.partitions:
+ self._set_partitions(cur_partitions)
+ elif cur_partitions != self.partitions_iterable.partitions and cur_partitions is not None:
+ self._set_partitions(cur_partitions)
+ return next(self.partitions_iterable)
+
+ def _set_partitions(self, available_partitions):
+ self.partitions = available_partitions
+ self.partitions_iterable.set_partitions(available_partitions)
+
+ def partition(self, key, all_partitions=None, available_partitions=None):
+ return self.__call__(key, all_partitions, available_partitions)
+
+
+class CachedPartitionCycler(object):
+ def __init__(self, partitions=None):
self.partitions = partitions
- self.iterpart = cycle(partitions)
+ if partitions:
+ assert type(partitions) is list
+ self.cur_pos = None
- def partition(self, key, partitions=None):
- # Refresh the partition list if necessary
- if partitions and self.partitions != partitions:
- self._set_partitions(partitions)
+ def __next__(self):
+ return self.next()
+
+ @staticmethod
+ def _index_available(cur_pos, partitions):
+ return cur_pos < len(partitions)
+
+ def set_partitions(self, partitions):
+ if self.cur_pos:
+ if not self._index_available(self.cur_pos, partitions):
+ self.cur_pos = 0
+ self.partitions = partitions
+ return None
+
+ self.partitions = partitions
+ next_item = self.partitions[self.cur_pos]
+ if next_item in partitions:
+ self.cur_pos = partitions.index(next_item)
+ else:
+ self.cur_pos = 0
+ return None
+ self.partitions = partitions
- return next(self.iterpart)
+ def next(self):
+ assert self.partitions is not None
+ if self.cur_pos is None or not self._index_available(self.cur_pos, self.partitions):
+ self.cur_pos = 1
+ return self.partitions[0]
+ cur_item = self.partitions[self.cur_pos]
+ self.cur_pos += 1
+ return cur_item
diff --git a/test/test_partitioner.py b/test/test_partitioner.py
index 52b6b81..e0398c6 100644
--- a/test/test_partitioner.py
+++ b/test/test_partitioner.py
@@ -3,6 +3,7 @@ import six
from kafka.partitioner import Murmur2Partitioner
from kafka.partitioner.default import DefaultPartitioner
+from kafka.partitioner import RoundRobinPartitioner
def test_default_partitioner():
@@ -22,6 +23,38 @@ def test_default_partitioner():
assert partitioner(None, all_partitions, []) in all_partitions
+def test_roundrobin_partitioner():
+ partitioner = RoundRobinPartitioner()
+ all_partitions = list(range(100))
+ available = all_partitions
+ # partitioner should cycle between partitions
+ i = 0
+ max_partition = all_partitions[len(all_partitions) - 1]
+ while i <= max_partition:
+ assert i == partitioner(None, all_partitions, available)
+ i += 1
+
+ i = 0
+ while i <= int(max_partition / 2):
+ assert i == partitioner(None, all_partitions, available)
+ i += 1
+
+ # test dynamic partition re-assignment
+ available = available[:-25]
+
+ while i <= max(available):
+ assert i == partitioner(None, all_partitions, available)
+ i += 1
+
+ all_partitions = list(range(200))
+ available = all_partitions
+
+ max_partition = all_partitions[len(all_partitions) - 1]
+ while i <= max_partition:
+ assert i == partitioner(None, all_partitions, available)
+ i += 1
+
+
def test_hash_bytes():
p = Murmur2Partitioner(range(1000))
assert p.partition(bytearray(b'test')) == p.partition(b'test')