summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorИскандаров Эдуард <e.iskandarov@corp.mail.ru>2015-01-23 12:56:42 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:48 +0300
commitcf363089617de2d0b18cb83eba1e61adbc5d0144 (patch)
tree493504fb0489646247bb96cacc38ebd411f19d05 /test
parentc165f17338c0a9260a91b816f73e5ce4ff7ed359 (diff)
downloadkafka-python-cf363089617de2d0b18cb83eba1e61adbc5d0144.tar.gz
add producer send batch queue overfilled test
Diffstat (limited to 'test')
-rw-r--r--test/test_producer.py18
1 files changed, 17 insertions, 1 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index c0dc873..b57dfd8 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -3,10 +3,11 @@
import time
import logging
-from mock import MagicMock
+from mock import MagicMock, patch
from . import unittest
from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions
+from kafka.common import BatchQueueOverfilledError
from kafka.producer.base import Producer
from kafka.producer.base import _send_upstream
from kafka.protocol import CODEC_NONE
@@ -51,6 +52,21 @@ class TestKafkaProducer(unittest.TestCase):
producer.send_messages(topic, b'hi')
assert client.send_produce_request.called
+ @patch('kafka.producer.base.Process')
+ def test_producer_batch_send_queue_overfilled(self, process_mock):
+ queue_size = 2
+ producer = Producer(MagicMock(), batch_send=True,
+ batch_send_queue_maxsize=queue_size)
+
+ topic = b'test-topic'
+ partition = 0
+
+ message = b'test-message'
+ with self.assertRaises(BatchQueueOverfilledError):
+ message_list = [message] * (queue_size + 1)
+ producer.send_messages(topic, partition, *message_list)
+
+
class TestKafkaProducerSendUpstream(unittest.TestCase):