diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-08 18:07:57 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-09 17:23:23 -0700 |
commit | 99bcf078c23fbc1e17add1620b34f3177861846b (patch) | |
tree | 3bf9271e807d01f0d831f8dbeec18dfe9c336d02 /test/test_producer.py | |
parent | f6be28372cffc6ddc675905f67bf3aa1f1716bf9 (diff) | |
download | kafka-python-99bcf078c23fbc1e17add1620b34f3177861846b.tar.gz |
Support sync_fail_on_error kwarg in Producer
Diffstat (limited to 'test/test_producer.py')
-rw-r--r-- | test/test_producer.py | 20 |
1 files changed, 18 insertions, 2 deletions
diff --git a/test/test_producer.py b/test/test_producer.py index c7bdfdb..27272f6 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,6 +7,7 @@ import time from mock import MagicMock, patch from . import unittest +from kafka import KafkaClient, SimpleProducer from kafka.common import ( AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, ProduceResponse, RetryOptions, TopicAndPartition @@ -44,8 +45,6 @@ class TestKafkaProducer(unittest.TestCase): producer.send_messages(topic, partition, m) def test_topic_message_types(self): - from kafka.producer.simple import SimpleProducer - client = MagicMock() def partitions(topic): @@ -75,6 +74,23 @@ class TestKafkaProducer(unittest.TestCase): for _ in xrange(producer.queue.qsize()): producer.queue.get() + def test_producer_sync_fail_on_error(self): + error = FailedPayloadsError('failure') + with patch.object(KafkaClient, 'load_metadata_for_topics'): + with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]): + with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]): + + client = KafkaClient(MagicMock()) + producer = SimpleProducer(client, async=False, sync_fail_on_error=False) + + # This should not raise + (response,) = producer.send_messages('foobar', b'test message') + self.assertEqual(response, error) + + producer = SimpleProducer(client, async=False, sync_fail_on_error=True) + with self.assertRaises(FailedPayloadsError): + producer.send_messages('foobar', b'test message') + class TestKafkaProducerSendUpstream(unittest.TestCase): |