summaryrefslogtreecommitdiff
path: root/test/test_producer.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer.py')
-rw-r--r--test/test_producer.py20
1 files changed, 18 insertions, 2 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index c7bdfdb..27272f6 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -7,6 +7,7 @@ import time
from mock import MagicMock, patch
from . import unittest
+from kafka import KafkaClient, SimpleProducer
from kafka.common import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
ProduceResponse, RetryOptions, TopicAndPartition
@@ -44,8 +45,6 @@ class TestKafkaProducer(unittest.TestCase):
producer.send_messages(topic, partition, m)
def test_topic_message_types(self):
- from kafka.producer.simple import SimpleProducer
-
client = MagicMock()
def partitions(topic):
@@ -75,6 +74,23 @@ class TestKafkaProducer(unittest.TestCase):
for _ in xrange(producer.queue.qsize()):
producer.queue.get()
+ def test_producer_sync_fail_on_error(self):
+ error = FailedPayloadsError('failure')
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
+ with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]):
+
+ client = KafkaClient(MagicMock())
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
+
+ # This should not raise
+ (response,) = producer.send_messages('foobar', b'test message')
+ self.assertEqual(response, error)
+
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
+ with self.assertRaises(FailedPayloadsError):
+ producer.send_messages('foobar', b'test message')
+
class TestKafkaProducerSendUpstream(unittest.TestCase):