summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py5
-rw-r--r--kafka/partitioner.py10
-rw-r--r--kafka/producer.py4
3 files changed, 14 insertions, 5 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 5595d49..1146798 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -73,7 +73,12 @@ class KafkaClient(object):
self.brokers.update(brokers)
self.topics_to_brokers = {}
+
for topic, partitions in topics.items():
+ # Clear the list once before we add it. This removes stale entries
+ # and avoids duplicates
+ self.topic_partitions.pop(topic, None)
+
if not partitions:
log.info("Partition is unassigned, delay for 1s and retry")
time.sleep(1)
diff --git a/kafka/partitioner.py b/kafka/partitioner.py
index 0f49b07..84db4d5 100644
--- a/kafka/partitioner.py
+++ b/kafka/partitioner.py
@@ -31,14 +31,18 @@ class RoundRobinPartitioner(Partitioner):
in a round robin fashion
"""
def __init__(self, partitions):
- self.partitions = cycle(partitions)
+ self._set_partitions(partitions)
+
+ def _set_partitions(self, partitions):
+ self.partitions = partitions
+ self.iterpart = cycle(partitions)
def partition(self, key, partitions):
# Refresh the partition list if necessary
if self.partitions != partitions:
- self.partitions = cycle(partitions)
+ self._set_partitions(partitions)
- return self.partitions.next()
+ return self.iterpart.next()
class HashedPartitioner(Partitioner):
diff --git a/kafka/producer.py b/kafka/producer.py
index 75f90c6..69c3830 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -46,8 +46,8 @@ class KeyedProducer(object):
self.partitioner = partitioner(self.client.topic_partitions[topic])
- def send(self, client, key, msg):
- partitions = self.client.topic_partitions[topic]
+ def send(self, key, msg):
+ partitions = self.client.topic_partitions[self.topic]
partition = self.partitioner.partition(key, partitions)
req = ProduceRequest(self.topic, partition,