summaryrefslogtreecommitdiff
path: root/kafka/producer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer.py')
-rw-r--r--kafka/producer.py19
1 files changed, 9 insertions, 10 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 8a6bff0..b60f13d 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -241,16 +241,14 @@ class SimpleProducer(Producer):
def _next_partition(self, topic):
if topic not in self.partition_cycles:
- if topic not in self.client.topic_partitions:
+ if not self.client.has_metadata_for_topic(topic):
self.client.load_metadata_for_topics(topic)
- try:
- self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
- except KeyError:
- raise UnknownTopicOrPartitionError(topic)
+
+ self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
# Randomize the initial partition that is returned
if self.random_start:
- num_partitions = len(self.client.topic_partitions[topic])
+ num_partitions = len(self.client.get_partition_ids_for_topic(topic))
for _ in xrange(random.randint(0, num_partitions-1)):
self.partition_cycles[topic].next()
@@ -299,12 +297,13 @@ class KeyedProducer(Producer):
def _next_partition(self, topic, key):
if topic not in self.partitioners:
- if topic not in self.client.topic_partitions:
+ if not self.client.has_metadata_for_topic(topic):
self.client.load_metadata_for_topics(topic)
- self.partitioners[topic] = \
- self.partitioner_class(self.client.topic_partitions[topic])
+
+ self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
+
partitioner = self.partitioners[topic]
- return partitioner.partition(key, self.client.topic_partitions[topic])
+ return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic))
def send(self, topic, key, msg):
partition = self._next_partition(topic, key)