diff options
-rw-r--r-- | kafka/common.py | 3 | ||||
-rw-r--r-- | kafka/producer/base.py | 86 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 6 | ||||
-rw-r--r-- | kafka/producer/simple.py | 9 | ||||
-rw-r--r-- | test/test_producer.py | 6 |
5 files changed, 76 insertions, 34 deletions
diff --git a/kafka/common.py b/kafka/common.py index 5c2b788..cbb4013 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -78,6 +78,9 @@ TopicAndPartition = namedtuple("TopicAndPartition", KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"]) +RetryOptions = namedtuple("RetryOptions", + ["limit", "backoff_ms", "retry_on_timeouts"]) + ################# # Exceptions # diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 9bfe98b..ebeb82d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -16,7 +16,10 @@ import six from kafka.common import ( ProduceRequest, TopicAndPartition, - UnsupportedCodecError, FailedPayloadsError + UnsupportedCodecError, FailedPayloadsError, RetryOptions, + RequestTimedOutError, KafkaUnavailableError, LeaderNotAvailableError, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, ConnectionError, + InvalidMessageError, MessageSizeTooLargeError ) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set from kafka.util import kafka_bytestring @@ -25,20 +28,19 @@ log = logging.getLogger("kafka") BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 -BATCH_RETRY_BACKOFF_MS = 300 -BATCH_RETRIES_LIMIT = 0 +BATCH_RETRY_OPTIONS = RetryOptions( + limit=0, backoff_ms=300, retry_on_timeouts=True) STOP_ASYNC_PRODUCER = -1 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, retry_backoff, retries_limit, stop_event): + req_acks, ack_timeout, retry_options, stop_event): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one request """ - stop = False reqs = [] client.reinit() @@ -85,28 +87,71 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, client.send_produce_request(reqs, acks=req_acks, timeout=ack_timeout) + + except RequestTimedOutError as ex: + # should retry only if user is fine with duplicates + if retry_options.retry_on_timeouts: + reqs_to_retry = reqs + + except KafkaUnavailableError as ex: + # backoff + retry + do_backoff(retry_options) + reqs_to_retry = get_requests_for_retry(reqs, retry_options) + + except (NotLeaderForPartitionError, UnknownTopicOrPartitionError) as ex: + # refresh + retry + client.load_metadata_for_topics() + reqs_to_retry = get_requests_for_retry(reqs, retry_options) + + except (LeaderNotAvailableError, ConnectionError) as ex: + # backoff + refresh + retry + do_backoff(retry_options) + client.load_metadata_for_topics() + reqs_to_retry = get_requests_for_retry(reqs, retry_options) + except FailedPayloadsError as ex: + # retry only failed messages with backoff failed_reqs = ex.failed_payloads - log.exception("Failed payloads count %s" % len(failed_reqs)) - - # if no limit, retry all failed messages until success - if retries_limit is None: - reqs_to_retry = failed_reqs - # makes sense to check failed reqs only if we have a limit > 0 - elif retries_limit > 0: - for req in failed_reqs: - if retries_limit and req.retries < retries_limit: - updated_req = req._replace(retries=req.retries+1) - reqs_to_retry.append(updated_req) + do_backoff(retry_options) + reqs_to_retry = get_requests_for_retry(failed_reqs, retry_options) + + except (InvalidMessageError, MessageSizeTooLargeError) as ex: + # "bad" messages, doesn't make sense to retry + log.exception("Message error when sending: %s" % type(ex)) + except Exception as ex: log.exception("Unable to send message: %s" % type(ex)) + finally: reqs = [] - if reqs_to_retry and retry_backoff: + if reqs_to_retry: reqs = reqs_to_retry - log.warning("%s requests will be retried next call." % len(reqs)) - time.sleep(float(retry_backoff) / 1000) + + +def get_requests_for_retry(requests, retry_options): + log.exception("Failed payloads count %s" % len(requests)) + + # if no limit, retry all failed messages until success + if retry_options.limit is None: + return requests + + # makes sense to check failed reqs only if we have a limit > 0 + reqs_to_retry = [] + if retry_options.limit > 0: + for req in requests: + if req.retries < retry_options.limit: + updated_req = req._replace(retries=req.retries+1) + reqs_to_retry.append(updated_req) + + return reqs_to_retry + + +def do_backoff(retry_options): + if retry_options.backoff_ms: + log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms) + time.sleep(float(retry_options.backoff_ms) / 1000) + class Producer(object): @@ -142,8 +187,7 @@ class Producer(object): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, - batch_retries_limit=BATCH_RETRIES_LIMIT): + batch_retry_options=BATCH_RETRY_OPTIONS): if batch_send: async = True diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index aa569b3..d11db52 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -7,8 +7,7 @@ from kafka.util import kafka_bytestring from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, - BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT + BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS ) log = logging.getLogger("kafka") @@ -39,8 +38,7 @@ class KeyedProducer(Producer): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, - batch_retries_limit=BATCH_RETRIES_LIMIT): + batch_retry_options=BATCH_RETRY_OPTIONS): if not partitioner: partitioner = HashedPartitioner self.partitioner_class = partitioner diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 7391be0..b869683 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -10,8 +10,7 @@ from six.moves import xrange from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, - BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT + BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS ) log = logging.getLogger("kafka") @@ -47,16 +46,14 @@ class SimpleProducer(Producer): batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, random_start=True, - batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, - batch_retries_limit=BATCH_RETRIES_LIMIT): + batch_retry_options=BATCH_RETRY_OPTIONS): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - batch_retry_backoff_ms, - batch_retries_limit) + batch_retry_options) def _next_partition(self, topic): if topic not in self.partition_cycles: diff --git a/test/test_producer.py b/test/test_producer.py index cc58fe4..c9bdc47 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -6,7 +6,7 @@ import logging from mock import MagicMock from . import unittest -from kafka.common import TopicAndPartition, FailedPayloadsError +from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions from kafka.producer.base import Producer from kafka.producer.base import _send_upstream from kafka.protocol import CODEC_NONE @@ -68,8 +68,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): 3, # batch length Producer.ACK_AFTER_LOCAL_WRITE, Producer.DEFAULT_ACK_TIMEOUT, - 50, # retry backoff (ms) - retries_limit, + RetryOptions(limit=retries_limit, backoff_ms=50, + retry_on_timeouts=True), stop_event)) self.thread.daemon = True self.thread.start() |