summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 0b31d18..df391f7 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -252,11 +252,13 @@ class Producer(object):
raise TypeError("the key must be type bytes")
if self.async:
- put_timeout = self.async_queue_put_timeout
for m in msg:
try:
item = (TopicAndPartition(topic, partition), m, key)
- self.queue.put(item, bool(put_timeout), put_timeout)
+ if self.async_queue_put_timeout == 0:
+ self.queue.put_nowait(item)
+ else:
+ self.queue.put(item, True, self.async_queue_put_timeout)
except Full:
raise AsyncProducerQueueFull(
'Producer async queue overfilled. '