diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-04-22 12:40:04 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:48 +0300 |
commit | 91af27c64488a0029e960615d0f10d62532d6616 (patch) | |
tree | c298d478df20e2c22e2aadbf58c79d49dcb3db45 | |
parent | 5119bb605acc4b24e091778656b229a36f9cac11 (diff) | |
download | kafka-python-91af27c64488a0029e960615d0f10d62532d6616.tar.gz |
Fix async producer queue put arguments
-rw-r--r-- | kafka/producer/base.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 0b31d18..df391f7 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -252,11 +252,13 @@ class Producer(object): raise TypeError("the key must be type bytes") if self.async: - put_timeout = self.async_queue_put_timeout for m in msg: try: item = (TopicAndPartition(topic, partition), m, key) - self.queue.put(item, bool(put_timeout), put_timeout) + if self.async_queue_put_timeout == 0: + self.queue.put_nowait(item) + else: + self.queue.put(item, True, self.async_queue_put_timeout) except Full: raise AsyncProducerQueueFull( 'Producer async queue overfilled. ' |