summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py8
-rw-r--r--test/test_producer.py18
2 files changed, 24 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 331c71c..a0d9ac1 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -249,7 +249,13 @@ class Producer(object):
if self.async:
for m in msg:
- self.queue.put((TopicAndPartition(topic, partition), m, key))
+ try:
+ item = (TopicAndPartition(topic, partition), m, key)
+ self.queue.put_nowait(item)
+ except Full:
+ raise BatchQueueOverfilledError(
+ 'Producer batch send queue overfilled. '
+ 'Current queue size %d.' % self.queue.qsize())
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key)
diff --git a/test/test_producer.py b/test/test_producer.py
index c0dc873..b57dfd8 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -3,10 +3,11 @@
import time
import logging
-from mock import MagicMock
+from mock import MagicMock, patch
from . import unittest
from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions
+from kafka.common import BatchQueueOverfilledError
from kafka.producer.base import Producer
from kafka.producer.base import _send_upstream
from kafka.protocol import CODEC_NONE
@@ -51,6 +52,21 @@ class TestKafkaProducer(unittest.TestCase):
producer.send_messages(topic, b'hi')
assert client.send_produce_request.called
+ @patch('kafka.producer.base.Process')
+ def test_producer_batch_send_queue_overfilled(self, process_mock):
+ queue_size = 2
+ producer = Producer(MagicMock(), batch_send=True,
+ batch_send_queue_maxsize=queue_size)
+
+ topic = b'test-topic'
+ partition = 0
+
+ message = b'test-message'
+ with self.assertRaises(BatchQueueOverfilledError):
+ message_list = [message] * (queue_size + 1)
+ producer.send_messages(topic, partition, *message_list)
+
+
class TestKafkaProducerSendUpstream(unittest.TestCase):