diff options
-rw-r--r-- | kafka/producer/base.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 0b31d18..df391f7 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -252,11 +252,13 @@ class Producer(object): raise TypeError("the key must be type bytes") if self.async: - put_timeout = self.async_queue_put_timeout for m in msg: try: item = (TopicAndPartition(topic, partition), m, key) - self.queue.put(item, bool(put_timeout), put_timeout) + if self.async_queue_put_timeout == 0: + self.queue.put_nowait(item) + else: + self.queue.put(item, True, self.async_queue_put_timeout) except Full: raise AsyncProducerQueueFull( 'Producer async queue overfilled. ' |