summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py14
-rw-r--r--test/test_producer.py20
2 files changed, 30 insertions, 4 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 498539d..824ef5d 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -41,6 +41,8 @@ ASYNC_LOG_MESSAGES_ON_ERROR = True
STOP_ASYNC_PRODUCER = -1
ASYNC_STOP_TIMEOUT_SECS = 30
+SYNC_FAIL_ON_ERROR_DEFAULT = True
+
def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, retry_options, stop_event,
@@ -216,6 +218,9 @@ class Producer(object):
defaults to 1 (local ack).
ack_timeout (int, optional): millisecond timeout to wait for the
configured req_acks, defaults to 1000.
+ sync_fail_on_error (bool, optional): whether sync producer should
+ raise exceptions (True), or just return errors (False),
+ defaults to True.
async (bool, optional): send message using a background thread,
defaults to False.
batch_send_every_n (int, optional): If async is True, messages are
@@ -258,6 +263,7 @@ class Producer(object):
req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT,
codec=None,
+ sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
async=False,
batch_send=False, # deprecated, use async
batch_send_every_n=BATCH_SEND_MSG_COUNT,
@@ -316,6 +322,8 @@ class Producer(object):
obj.stop()
self._cleanup_func = cleanup
atexit.register(cleanup, self)
+ else:
+ self.sync_fail_on_error = sync_fail_on_error
def send_messages(self, topic, partition, *msg):
"""
@@ -373,8 +381,10 @@ class Producer(object):
messages = create_message_set([(m, key) for m in msg], self.codec, key)
req = ProduceRequest(topic, partition, messages)
try:
- resp = self.client.send_produce_request([req], acks=self.req_acks,
- timeout=self.ack_timeout)
+ resp = self.client.send_produce_request(
+ [req], acks=self.req_acks, timeout=self.ack_timeout,
+ fail_on_error=self.sync_fail_on_error
+ )
except Exception:
log.exception("Unable to send messages")
raise
diff --git a/test/test_producer.py b/test/test_producer.py
index c7bdfdb..27272f6 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -7,6 +7,7 @@ import time
from mock import MagicMock, patch
from . import unittest
+from kafka import KafkaClient, SimpleProducer
from kafka.common import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
ProduceResponse, RetryOptions, TopicAndPartition
@@ -44,8 +45,6 @@ class TestKafkaProducer(unittest.TestCase):
producer.send_messages(topic, partition, m)
def test_topic_message_types(self):
- from kafka.producer.simple import SimpleProducer
-
client = MagicMock()
def partitions(topic):
@@ -75,6 +74,23 @@ class TestKafkaProducer(unittest.TestCase):
for _ in xrange(producer.queue.qsize()):
producer.queue.get()
+ def test_producer_sync_fail_on_error(self):
+ error = FailedPayloadsError('failure')
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
+ with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]):
+
+ client = KafkaClient(MagicMock())
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
+
+ # This should not raise
+ (response,) = producer.send_messages('foobar', b'test message')
+ self.assertEqual(response, error)
+
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
+ with self.assertRaises(FailedPayloadsError):
+ producer.send_messages('foobar', b'test message')
+
class TestKafkaProducerSendUpstream(unittest.TestCase):