diff options
-rw-r--r-- | kafka/client.py | 5 | ||||
-rw-r--r-- | kafka/partitioner.py | 10 | ||||
-rw-r--r-- | kafka/producer.py | 4 |
3 files changed, 14 insertions, 5 deletions
diff --git a/kafka/client.py b/kafka/client.py index 5595d49..1146798 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -73,7 +73,12 @@ class KafkaClient(object): self.brokers.update(brokers) self.topics_to_brokers = {} + for topic, partitions in topics.items(): + # Clear the list once before we add it. This removes stale entries + # and avoids duplicates + self.topic_partitions.pop(topic, None) + if not partitions: log.info("Partition is unassigned, delay for 1s and retry") time.sleep(1) diff --git a/kafka/partitioner.py b/kafka/partitioner.py index 0f49b07..84db4d5 100644 --- a/kafka/partitioner.py +++ b/kafka/partitioner.py @@ -31,14 +31,18 @@ class RoundRobinPartitioner(Partitioner): in a round robin fashion """ def __init__(self, partitions): - self.partitions = cycle(partitions) + self._set_partitions(partitions) + + def _set_partitions(self, partitions): + self.partitions = partitions + self.iterpart = cycle(partitions) def partition(self, key, partitions): # Refresh the partition list if necessary if self.partitions != partitions: - self.partitions = cycle(partitions) + self._set_partitions(partitions) - return self.partitions.next() + return self.iterpart.next() class HashedPartitioner(Partitioner): diff --git a/kafka/producer.py b/kafka/producer.py index 75f90c6..69c3830 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -46,8 +46,8 @@ class KeyedProducer(object): self.partitioner = partitioner(self.client.topic_partitions[topic]) - def send(self, client, key, msg): - partitions = self.client.topic_partitions[topic] + def send(self, key, msg): + partitions = self.client.topic_partitions[self.topic] partition = self.partitioner.partition(key, partitions) req = ProduceRequest(self.topic, partition, |