summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-04-22 12:40:04 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:48 +0300
commit91af27c64488a0029e960615d0f10d62532d6616 (patch)
treec298d478df20e2c22e2aadbf58c79d49dcb3db45
parent5119bb605acc4b24e091778656b229a36f9cac11 (diff)
downloadkafka-python-91af27c64488a0029e960615d0f10d62532d6616.tar.gz
Fix async producer queue put arguments
-rw-r--r--kafka/producer/base.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 0b31d18..df391f7 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -252,11 +252,13 @@ class Producer(object):
raise TypeError("the key must be type bytes")
if self.async:
- put_timeout = self.async_queue_put_timeout
for m in msg:
try:
item = (TopicAndPartition(topic, partition), m, key)
- self.queue.put(item, bool(put_timeout), put_timeout)
+ if self.async_queue_put_timeout == 0:
+ self.queue.put_nowait(item)
+ else:
+ self.queue.put(item, True, self.async_queue_put_timeout)
except Full:
raise AsyncProducerQueueFull(
'Producer async queue overfilled. '