diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-08 18:07:57 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-09 17:23:23 -0700 |
commit | 99bcf078c23fbc1e17add1620b34f3177861846b (patch) | |
tree | 3bf9271e807d01f0d831f8dbeec18dfe9c336d02 | |
parent | f6be28372cffc6ddc675905f67bf3aa1f1716bf9 (diff) | |
download | kafka-python-99bcf078c23fbc1e17add1620b34f3177861846b.tar.gz |
Support sync_fail_on_error kwarg in Producer
-rw-r--r-- | kafka/producer/base.py | 14 | ||||
-rw-r--r-- | test/test_producer.py | 20 |
2 files changed, 30 insertions, 4 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 498539d..824ef5d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -41,6 +41,8 @@ ASYNC_LOG_MESSAGES_ON_ERROR = True STOP_ASYNC_PRODUCER = -1 ASYNC_STOP_TIMEOUT_SECS = 30 +SYNC_FAIL_ON_ERROR_DEFAULT = True + def _send_upstream(queue, client, codec, batch_time, batch_size, req_acks, ack_timeout, retry_options, stop_event, @@ -216,6 +218,9 @@ class Producer(object): defaults to 1 (local ack). ack_timeout (int, optional): millisecond timeout to wait for the configured req_acks, defaults to 1000. + sync_fail_on_error (bool, optional): whether sync producer should + raise exceptions (True), or just return errors (False), + defaults to True. async (bool, optional): send message using a background thread, defaults to False. batch_send_every_n (int, optional): If async is True, messages are @@ -258,6 +263,7 @@ class Producer(object): req_acks=ACK_AFTER_LOCAL_WRITE, ack_timeout=DEFAULT_ACK_TIMEOUT, codec=None, + sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, async=False, batch_send=False, # deprecated, use async batch_send_every_n=BATCH_SEND_MSG_COUNT, @@ -316,6 +322,8 @@ class Producer(object): obj.stop() self._cleanup_func = cleanup atexit.register(cleanup, self) + else: + self.sync_fail_on_error = sync_fail_on_error def send_messages(self, topic, partition, *msg): """ @@ -373,8 +381,10 @@ class Producer(object): messages = create_message_set([(m, key) for m in msg], self.codec, key) req = ProduceRequest(topic, partition, messages) try: - resp = self.client.send_produce_request([req], acks=self.req_acks, - timeout=self.ack_timeout) + resp = self.client.send_produce_request( + [req], acks=self.req_acks, timeout=self.ack_timeout, + fail_on_error=self.sync_fail_on_error + ) except Exception: log.exception("Unable to send messages") raise diff --git a/test/test_producer.py b/test/test_producer.py index c7bdfdb..27272f6 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,6 +7,7 @@ import time from mock import MagicMock, patch from . import unittest +from kafka import KafkaClient, SimpleProducer from kafka.common import ( AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, ProduceResponse, RetryOptions, TopicAndPartition @@ -44,8 +45,6 @@ class TestKafkaProducer(unittest.TestCase): producer.send_messages(topic, partition, m) def test_topic_message_types(self): - from kafka.producer.simple import SimpleProducer - client = MagicMock() def partitions(topic): @@ -75,6 +74,23 @@ class TestKafkaProducer(unittest.TestCase): for _ in xrange(producer.queue.qsize()): producer.queue.get() + def test_producer_sync_fail_on_error(self): + error = FailedPayloadsError('failure') + with patch.object(KafkaClient, 'load_metadata_for_topics'): + with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]): + with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]): + + client = KafkaClient(MagicMock()) + producer = SimpleProducer(client, async=False, sync_fail_on_error=False) + + # This should not raise + (response,) = producer.send_messages('foobar', b'test message') + self.assertEqual(response, error) + + producer = SimpleProducer(client, async=False, sync_fail_on_error=True) + with self.assertRaises(FailedPayloadsError): + producer.send_messages('foobar', b'test message') + class TestKafkaProducerSendUpstream(unittest.TestCase): |