diff options
Diffstat (limited to 'kafka/producer.py')
-rw-r--r-- | kafka/producer.py | 19 |
1 files changed, 9 insertions, 10 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 8a6bff0..b60f13d 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -241,16 +241,14 @@ class SimpleProducer(Producer): def _next_partition(self, topic): if topic not in self.partition_cycles: - if topic not in self.client.topic_partitions: + if not self.client.has_metadata_for_topic(topic): self.client.load_metadata_for_topics(topic) - try: - self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) - except KeyError: - raise UnknownTopicOrPartitionError(topic) + + self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic)) # Randomize the initial partition that is returned if self.random_start: - num_partitions = len(self.client.topic_partitions[topic]) + num_partitions = len(self.client.get_partition_ids_for_topic(topic)) for _ in xrange(random.randint(0, num_partitions-1)): self.partition_cycles[topic].next() @@ -299,12 +297,13 @@ class KeyedProducer(Producer): def _next_partition(self, topic, key): if topic not in self.partitioners: - if topic not in self.client.topic_partitions: + if not self.client.has_metadata_for_topic(topic): self.client.load_metadata_for_topics(topic) - self.partitioners[topic] = \ - self.partitioner_class(self.client.topic_partitions[topic]) + + self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) + partitioner = self.partitioners[topic] - return partitioner.partition(key, self.client.topic_partitions[topic]) + return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic)) def send(self, topic, key, msg): partition = self._next_partition(topic, key) |