summaryrefslogtreecommitdiff
path: root/test/test_producer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-08 18:07:57 -0700
committerDana Powers <dana.powers@rd.io>2015-06-09 17:23:23 -0700
commit99bcf078c23fbc1e17add1620b34f3177861846b (patch)
tree3bf9271e807d01f0d831f8dbeec18dfe9c336d02 /test/test_producer.py
parentf6be28372cffc6ddc675905f67bf3aa1f1716bf9 (diff)
downloadkafka-python-99bcf078c23fbc1e17add1620b34f3177861846b.tar.gz
Support sync_fail_on_error kwarg in Producer
Diffstat (limited to 'test/test_producer.py')
-rw-r--r--test/test_producer.py20
1 files changed, 18 insertions, 2 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index c7bdfdb..27272f6 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -7,6 +7,7 @@ import time
from mock import MagicMock, patch
from . import unittest
+from kafka import KafkaClient, SimpleProducer
from kafka.common import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
ProduceResponse, RetryOptions, TopicAndPartition
@@ -44,8 +45,6 @@ class TestKafkaProducer(unittest.TestCase):
producer.send_messages(topic, partition, m)
def test_topic_message_types(self):
- from kafka.producer.simple import SimpleProducer
-
client = MagicMock()
def partitions(topic):
@@ -75,6 +74,23 @@ class TestKafkaProducer(unittest.TestCase):
for _ in xrange(producer.queue.qsize()):
producer.queue.get()
+ def test_producer_sync_fail_on_error(self):
+ error = FailedPayloadsError('failure')
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
+ with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]):
+
+ client = KafkaClient(MagicMock())
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
+
+ # This should not raise
+ (response,) = producer.send_messages('foobar', b'test message')
+ self.assertEqual(response, error)
+
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
+ with self.assertRaises(FailedPayloadsError):
+ producer.send_messages('foobar', b'test message')
+
class TestKafkaProducerSendUpstream(unittest.TestCase):