summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-13 14:40:09 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-13 14:40:09 +0530
commitd3cf18a9fb961f8c640bd2ff0384b9a66ae179bb (patch)
treeff11ce0a1bf8770ae60cc8fe1026d097c84e8d26
parentac76520bf880fb91e76d34a8e67cbe7e96017c7d (diff)
downloadkafka-python-d3cf18a9fb961f8c640bd2ff0384b9a66ae179bb.tar.gz
Fix bugs and testing
* Ensure that round-robin partitioner works fine * _load_metadata_for_topics() would cause duplicate and stale entries in self.topic_partitions. Fix this
-rw-r--r--kafka/client.py5
-rw-r--r--kafka/partitioner.py10
-rw-r--r--kafka/producer.py4
3 files changed, 14 insertions, 5 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 5595d49..1146798 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -73,7 +73,12 @@ class KafkaClient(object):
self.brokers.update(brokers)
self.topics_to_brokers = {}
+
for topic, partitions in topics.items():
+ # Clear the list once before we add it. This removes stale entries
+ # and avoids duplicates
+ self.topic_partitions.pop(topic, None)
+
if not partitions:
log.info("Partition is unassigned, delay for 1s and retry")
time.sleep(1)
diff --git a/kafka/partitioner.py b/kafka/partitioner.py
index 0f49b07..84db4d5 100644
--- a/kafka/partitioner.py
+++ b/kafka/partitioner.py
@@ -31,14 +31,18 @@ class RoundRobinPartitioner(Partitioner):
in a round robin fashion
"""
def __init__(self, partitions):
- self.partitions = cycle(partitions)
+ self._set_partitions(partitions)
+
+ def _set_partitions(self, partitions):
+ self.partitions = partitions
+ self.iterpart = cycle(partitions)
def partition(self, key, partitions):
# Refresh the partition list if necessary
if self.partitions != partitions:
- self.partitions = cycle(partitions)
+ self._set_partitions(partitions)
- return self.partitions.next()
+ return self.iterpart.next()
class HashedPartitioner(Partitioner):
diff --git a/kafka/producer.py b/kafka/producer.py
index 75f90c6..69c3830 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -46,8 +46,8 @@ class KeyedProducer(object):
self.partitioner = partitioner(self.client.topic_partitions[topic])
- def send(self, client, key, msg):
- partitions = self.client.topic_partitions[topic]
+ def send(self, key, msg):
+ partitions = self.client.topic_partitions[self.topic]
partition = self.partitioner.partition(key, partitions)
req = ProduceRequest(self.topic, partition,