summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/producer/base.py14
1 files changed, 12 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 498539d..824ef5d 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -41,6 +41,8 @@ ASYNC_LOG_MESSAGES_ON_ERROR = True
STOP_ASYNC_PRODUCER = -1
ASYNC_STOP_TIMEOUT_SECS = 30
+SYNC_FAIL_ON_ERROR_DEFAULT = True
+
def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, retry_options, stop_event,
@@ -216,6 +218,9 @@ class Producer(object):
defaults to 1 (local ack).
ack_timeout (int, optional): millisecond timeout to wait for the
configured req_acks, defaults to 1000.
+ sync_fail_on_error (bool, optional): whether sync producer should
+ raise exceptions (True), or just return errors (False),
+ defaults to True.
async (bool, optional): send message using a background thread,
defaults to False.
batch_send_every_n (int, optional): If async is True, messages are
@@ -258,6 +263,7 @@ class Producer(object):
req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT,
codec=None,
+ sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
async=False,
batch_send=False, # deprecated, use async
batch_send_every_n=BATCH_SEND_MSG_COUNT,
@@ -316,6 +322,8 @@ class Producer(object):
obj.stop()
self._cleanup_func = cleanup
atexit.register(cleanup, self)
+ else:
+ self.sync_fail_on_error = sync_fail_on_error
def send_messages(self, topic, partition, *msg):
"""
@@ -373,8 +381,10 @@ class Producer(object):
messages = create_message_set([(m, key) for m in msg], self.codec, key)
req = ProduceRequest(topic, partition, messages)
try:
- resp = self.client.send_produce_request([req], acks=self.req_acks,
- timeout=self.ack_timeout)
+ resp = self.client.send_produce_request(
+ [req], acks=self.req_acks, timeout=self.ack_timeout,
+ fail_on_error=self.sync_fail_on_error
+ )
except Exception:
log.exception("Unable to send messages")
raise