summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/common.py3
-rw-r--r--kafka/producer/base.py86
-rw-r--r--kafka/producer/keyed.py6
-rw-r--r--kafka/producer/simple.py9
-rw-r--r--test/test_producer.py6
5 files changed, 76 insertions, 34 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 5c2b788..cbb4013 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -78,6 +78,9 @@ TopicAndPartition = namedtuple("TopicAndPartition",
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
+RetryOptions = namedtuple("RetryOptions",
+ ["limit", "backoff_ms", "retry_on_timeouts"])
+
#################
# Exceptions #
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 9bfe98b..ebeb82d 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -16,7 +16,10 @@ import six
from kafka.common import (
ProduceRequest, TopicAndPartition,
- UnsupportedCodecError, FailedPayloadsError
+ UnsupportedCodecError, FailedPayloadsError, RetryOptions,
+ RequestTimedOutError, KafkaUnavailableError, LeaderNotAvailableError,
+ UnknownTopicOrPartitionError, NotLeaderForPartitionError, ConnectionError,
+ InvalidMessageError, MessageSizeTooLargeError
)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring
@@ -25,20 +28,19 @@ log = logging.getLogger("kafka")
BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20
-BATCH_RETRY_BACKOFF_MS = 300
-BATCH_RETRIES_LIMIT = 0
+BATCH_RETRY_OPTIONS = RetryOptions(
+ limit=0, backoff_ms=300, retry_on_timeouts=True)
STOP_ASYNC_PRODUCER = -1
def _send_upstream(queue, client, codec, batch_time, batch_size,
- req_acks, ack_timeout, retry_backoff, retries_limit, stop_event):
+ req_acks, ack_timeout, retry_options, stop_event):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
"""
- stop = False
reqs = []
client.reinit()
@@ -85,28 +87,71 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
+
+ except RequestTimedOutError as ex:
+ # should retry only if user is fine with duplicates
+ if retry_options.retry_on_timeouts:
+ reqs_to_retry = reqs
+
+ except KafkaUnavailableError as ex:
+ # backoff + retry
+ do_backoff(retry_options)
+ reqs_to_retry = get_requests_for_retry(reqs, retry_options)
+
+ except (NotLeaderForPartitionError, UnknownTopicOrPartitionError) as ex:
+ # refresh + retry
+ client.load_metadata_for_topics()
+ reqs_to_retry = get_requests_for_retry(reqs, retry_options)
+
+ except (LeaderNotAvailableError, ConnectionError) as ex:
+ # backoff + refresh + retry
+ do_backoff(retry_options)
+ client.load_metadata_for_topics()
+ reqs_to_retry = get_requests_for_retry(reqs, retry_options)
+
except FailedPayloadsError as ex:
+ # retry only failed messages with backoff
failed_reqs = ex.failed_payloads
- log.exception("Failed payloads count %s" % len(failed_reqs))
-
- # if no limit, retry all failed messages until success
- if retries_limit is None:
- reqs_to_retry = failed_reqs
- # makes sense to check failed reqs only if we have a limit > 0
- elif retries_limit > 0:
- for req in failed_reqs:
- if retries_limit and req.retries < retries_limit:
- updated_req = req._replace(retries=req.retries+1)
- reqs_to_retry.append(updated_req)
+ do_backoff(retry_options)
+ reqs_to_retry = get_requests_for_retry(failed_reqs, retry_options)
+
+ except (InvalidMessageError, MessageSizeTooLargeError) as ex:
+ # "bad" messages, doesn't make sense to retry
+ log.exception("Message error when sending: %s" % type(ex))
+
except Exception as ex:
log.exception("Unable to send message: %s" % type(ex))
+
finally:
reqs = []
- if reqs_to_retry and retry_backoff:
+ if reqs_to_retry:
reqs = reqs_to_retry
- log.warning("%s requests will be retried next call." % len(reqs))
- time.sleep(float(retry_backoff) / 1000)
+
+
+def get_requests_for_retry(requests, retry_options):
+ log.exception("Failed payloads count %s" % len(requests))
+
+ # if no limit, retry all failed messages until success
+ if retry_options.limit is None:
+ return requests
+
+ # makes sense to check failed reqs only if we have a limit > 0
+ reqs_to_retry = []
+ if retry_options.limit > 0:
+ for req in requests:
+ if req.retries < retry_options.limit:
+ updated_req = req._replace(retries=req.retries+1)
+ reqs_to_retry.append(updated_req)
+
+ return reqs_to_retry
+
+
+def do_backoff(retry_options):
+ if retry_options.backoff_ms:
+ log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms)
+ time.sleep(float(retry_options.backoff_ms) / 1000)
+
class Producer(object):
@@ -142,8 +187,7 @@ class Producer(object):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS,
- batch_retries_limit=BATCH_RETRIES_LIMIT):
+ batch_retry_options=BATCH_RETRY_OPTIONS):
if batch_send:
async = True
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index aa569b3..d11db52 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -7,8 +7,7 @@ from kafka.util import kafka_bytestring
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT,
- BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT
+ BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS
)
log = logging.getLogger("kafka")
@@ -39,8 +38,7 @@ class KeyedProducer(Producer):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS,
- batch_retries_limit=BATCH_RETRIES_LIMIT):
+ batch_retry_options=BATCH_RETRY_OPTIONS):
if not partitioner:
partitioner = HashedPartitioner
self.partitioner_class = partitioner
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 7391be0..b869683 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -10,8 +10,7 @@ from six.moves import xrange
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT,
- BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT
+ BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS
)
log = logging.getLogger("kafka")
@@ -47,16 +46,14 @@ class SimpleProducer(Producer):
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
random_start=True,
- batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS,
- batch_retries_limit=BATCH_RETRIES_LIMIT):
+ batch_retry_options=BATCH_RETRY_OPTIONS):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t,
- batch_retry_backoff_ms,
- batch_retries_limit)
+ batch_retry_options)
def _next_partition(self, topic):
if topic not in self.partition_cycles:
diff --git a/test/test_producer.py b/test/test_producer.py
index cc58fe4..c9bdc47 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -6,7 +6,7 @@ import logging
from mock import MagicMock
from . import unittest
-from kafka.common import TopicAndPartition, FailedPayloadsError
+from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions
from kafka.producer.base import Producer
from kafka.producer.base import _send_upstream
from kafka.protocol import CODEC_NONE
@@ -68,8 +68,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
3, # batch length
Producer.ACK_AFTER_LOCAL_WRITE,
Producer.DEFAULT_ACK_TIMEOUT,
- 50, # retry backoff (ms)
- retries_limit,
+ RetryOptions(limit=retries_limit, backoff_ms=50,
+ retry_on_timeouts=True),
stop_event))
self.thread.daemon = True
self.thread.start()