summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-14 18:24:00 -0700
committerDana Powers <dana.powers@gmail.com>2016-03-14 18:24:00 -0700
commit00599b6306f3064268cda818c0096562f0f2af6a (patch)
treeb15d307fc33d24e1a647c733cef6dedfd01d5532
parent3d80a798555bb2173e604b685ac57dcf4f51f4d4 (diff)
parent1b1bd55d03ee56da4b5663aa0dfbfbd8bbc1b73c (diff)
downloadkafka-python-00599b6306f3064268cda818c0096562f0f2af6a.tar.gz
Merge pull request #598 from zackdever/producer-optimization
Producer optimization
-rw-r--r--kafka/producer/kafka.py5
-rw-r--r--kafka/producer/sender.py8
2 files changed, 8 insertions, 5 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 0286f8b..2a16fd8 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -443,12 +443,15 @@ class KafkaProducer(object):
self._sender.add_topic(topic)
begin = time.time()
elapsed = 0.0
- metadata_event = threading.Event()
+ metadata_event = None
while True:
partitions = self._metadata.partitions_for_topic(topic)
if partitions is not None:
return partitions
+ if not metadata_event:
+ metadata_event = threading.Event()
+
log.debug("Requesting metadata update for topic %s", topic)
metadata_event.clear()
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 0e6d6cd..9a86a16 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -4,7 +4,6 @@ import collections
import copy
import logging
import threading
-import time
import six
@@ -45,7 +44,7 @@ class Sender(threading.Thread):
self._metadata = client.cluster
self._running = True
self._force_close = False
- self._topics_to_add = []
+ self._topics_to_add = set()
def run(self):
"""The main run loop for the sender thread."""
@@ -158,8 +157,9 @@ class Sender(threading.Thread):
self.initiate_close()
def add_topic(self, topic):
- self._topics_to_add.append(topic)
- self.wakeup()
+ if topic not in self._topics_to_add:
+ self._topics_to_add.add(topic)
+ self.wakeup()
def _failed_produce(self, batches, node_id, error):
log.debug("Error sending produce request to node %d: %s", node_id, error) # trace