diff options
-rw-r--r-- | kafka/common.py | 26 | ||||
-rw-r--r-- | kafka/producer/base.py | 120 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 17 | ||||
-rw-r--r-- | kafka/producer/simple.py | 17 | ||||
-rw-r--r-- | test/test_producer.py | 144 | ||||
-rw-r--r-- | tox.ini | 1 |
6 files changed, 296 insertions, 29 deletions
diff --git a/kafka/common.py b/kafka/common.py index 8207bec..8c13798 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -71,6 +71,11 @@ TopicAndPartition = namedtuple("TopicAndPartition", KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"]) +# Define retry policy for async producer +# Limit value: int >= 0, 0 means no retries +RetryOptions = namedtuple("RetryOptions", + ["limit", "backoff_ms", "retry_on_timeouts"]) + ################# # Exceptions # @@ -205,6 +210,12 @@ class KafkaConfigurationError(KafkaError): pass +class AsyncProducerQueueFull(KafkaError): + def __init__(self, failed_msgs, *args): + super(AsyncProducerQueueFull, self).__init__(*args) + self.failed_msgs = failed_msgs + + def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: @@ -218,3 +229,18 @@ def check_error(response): if response.error: error_class = kafka_errors.get(response.error, UnknownError) raise error_class(response) + + +RETRY_BACKOFF_ERROR_TYPES = ( + KafkaUnavailableError, LeaderNotAvailableError, + ConnectionError, FailedPayloadsError +) + + +RETRY_REFRESH_ERROR_TYPES = ( + NotLeaderForPartitionError, UnknownTopicOrPartitionError, + LeaderNotAvailableError, ConnectionError +) + + +RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 4bd3de4..2edeace 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -5,9 +5,9 @@ import logging import time try: - from queue import Empty, Queue + from queue import Empty, Full, Queue except ImportError: - from Queue import Empty, Queue + from Queue import Empty, Full, Queue from collections import defaultdict from threading import Thread, Event @@ -15,8 +15,13 @@ from threading import Thread, Event import six from kafka.common import ( - ProduceRequest, TopicAndPartition, UnsupportedCodecError + ProduceRequest, TopicAndPartition, RetryOptions, + kafka_errors, UnsupportedCodecError, FailedPayloadsError, + RequestTimedOutError, AsyncProducerQueueFull, UnknownError ) +from kafka.common import ( + RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) + from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set from kafka.util import kafka_bytestring @@ -25,21 +30,33 @@ log = logging.getLogger("kafka") BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 +# unlimited +ASYNC_QUEUE_MAXSIZE = 0 +ASYNC_QUEUE_PUT_TIMEOUT = 0 +# no retries by default +ASYNC_RETRY_LIMIT = 0 +ASYNC_RETRY_BACKOFF_MS = 0 +ASYNC_RETRY_ON_TIMEOUTS = False + STOP_ASYNC_PRODUCER = -1 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, stop_event): + req_acks, ack_timeout, retry_options, stop_event): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one request """ - stop = False + reqs = {} + client.reinit() while not stop_event.is_set(): timeout = batch_time - count = batch_size + + # it's a simplification: we're comparing message sets and + # messages: each set can contain [1..batch_size] messages + count = batch_size - len(reqs) send_at = time.time() + timeout msgset = defaultdict(list) @@ -48,7 +65,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, while count > 0 and timeout >= 0: try: topic_partition, msg, key = queue.get(timeout=timeout) - except Empty: break @@ -63,20 +79,60 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, msgset[topic_partition].append((msg, key)) # Send collected requests upstream - reqs = [] for topic_partition, msg in msgset.items(): messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, topic_partition.partition, - messages) - reqs.append(req) + tuple(messages)) + reqs[req] = 0 + + if not reqs: + continue + + reqs_to_retry, error_cls = [], None + do_backoff, do_refresh = False, False + + def _handle_error(error_cls, reqs, all_retries): + if ((error_cls == RequestTimedOutError and + retry_options.retry_on_timeouts) or + error_cls in RETRY_ERROR_TYPES): + all_retries += reqs + if error_cls in RETRY_BACKOFF_ERROR_TYPES: + do_backoff = True + if error_cls in RETRY_REFRESH_ERROR_TYPES: + do_refresh = True try: - client.send_produce_request(reqs, - acks=req_acks, - timeout=ack_timeout) - except Exception: - log.exception("Unable to send message") + reply = client.send_produce_request(reqs.keys(), + acks=req_acks, + timeout=ack_timeout, + fail_on_error=False) + for i, response in enumerate(reply): + if isinstance(response, FailedPayloadsError): + _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry) + elif isinstance(response, ProduceResponse) and response.error: + error_cls = kafka_errors.get(response.error, UnknownError) + _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry) + + except Exception as ex: + error_cls = kafka_errors.get(type(ex), UnknownError) + _handle_error(error_cls, reqs.keys(), reqs_to_retry) + + if not reqs_to_retry: + reqs = {} + continue + + # doing backoff before next retry + if do_backoff and retry_options.backoff_ms: + log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms) + time.sleep(float(retry_options.backoff_ms) / 1000) + + # refresh topic metadata before next retry + if do_refresh: + client.load_metadata_for_topics() + + reqs = dict((key, count + 1) for (key, count) in reqs.items() + if key in reqs_to_retry and count < retry_options.limit) class Producer(object): @@ -111,12 +167,18 @@ class Producer(object): codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): if batch_send: async = True assert batch_send_every_n > 0 assert batch_send_every_t > 0 + assert async_queue_maxsize >= 0 else: batch_send_every_n = 1 batch_send_every_t = 3600 @@ -135,10 +197,13 @@ class Producer(object): self.codec = codec if self.async: - log.warning("async producer does not guarantee message delivery!") - log.warning("Current implementation does not retry Failed messages") - log.warning("Use at your own risk! (or help improve with a PR!)") - self.queue = Queue() # Messages are sent through this queue + # Messages are sent through this queue + self.queue = Queue(async_queue_maxsize) + self.async_queue_put_timeout = async_queue_put_timeout + async_retry_options = RetryOptions( + limit=async_retry_limit, + backoff_ms=async_retry_backoff_ms, + retry_on_timeouts=async_retry_on_timeouts) self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, @@ -148,6 +213,7 @@ class Producer(object): batch_send_every_n, self.req_acks, self.ack_timeout, + async_retry_options, self.thread_stop_event)) # Thread will die if main thread exits @@ -199,8 +265,18 @@ class Producer(object): raise TypeError("the key must be type bytes") if self.async: - for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m, key)) + for idx, m in enumerate(msg): + try: + item = (TopicAndPartition(topic, partition), m, key) + if self.async_queue_put_timeout == 0: + self.queue.put_nowait(item) + else: + self.queue.put(item, True, self.async_queue_put_timeout) + except Full: + raise AsyncProducerQueueFull( + msg[idx:], + 'Producer async queue overfilled. ' + 'Current queue size %d.' % self.queue.qsize()) resp = [] else: messages = create_message_set([(m, key) for m in msg], self.codec, key) diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 333b6c0..5252976 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -7,7 +7,8 @@ from kafka.util import kafka_bytestring from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, + ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS ) log = logging.getLogger("kafka") @@ -37,7 +38,12 @@ class KeyedProducer(Producer): codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): if not partitioner: partitioner = HashedPartitioner self.partitioner_class = partitioner @@ -46,7 +52,12 @@ class KeyedProducer(Producer): super(KeyedProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + async_retry_limit, + async_retry_backoff_ms, + async_retry_on_timeouts, + async_queue_maxsize, + async_queue_put_timeout) def _next_partition(self, topic, key): if topic not in self.partitioners: diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 2699cf2..ded6eb6 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -10,7 +10,8 @@ from six.moves import xrange from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, + ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS ) log = logging.getLogger("kafka") @@ -45,13 +46,23 @@ class SimpleProducer(Producer): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - random_start=True): + random_start=True, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + async_retry_limit, + async_retry_backoff_ms, + async_retry_on_timeouts, + async_queue_maxsize, + async_queue_put_timeout) def _next_partition(self, topic): if topic not in self.partition_cycles: diff --git a/test/test_producer.py b/test/test_producer.py index f6b3d6a..258b9c3 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,11 +1,26 @@ # -*- coding: utf-8 -*- +import time import logging -from mock import MagicMock +from mock import MagicMock, patch from . import unittest +from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions +from kafka.common import AsyncProducerQueueFull from kafka.producer.base import Producer +from kafka.producer.base import _send_upstream +from kafka.protocol import CODEC_NONE + +import threading +try: + from queue import Empty, Queue +except ImportError: + from Queue import Empty, Queue +try: + xrange +except NameError: + xrange = range class TestKafkaProducer(unittest.TestCase): @@ -40,3 +55,130 @@ class TestKafkaProducer(unittest.TestCase): topic = b"test-topic" producer.send_messages(topic, b'hi') assert client.send_produce_request.called + + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled_batch_send(self, mock): + queue_size = 2 + producer = Producer(MagicMock(), batch_send=True, + async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + with self.assertRaises(AsyncProducerQueueFull): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() + + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled(self, mock): + queue_size = 2 + producer = Producer(MagicMock(), async=True, + async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + with self.assertRaises(AsyncProducerQueueFull): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() + + +class TestKafkaProducerSendUpstream(unittest.TestCase): + + def setUp(self): + self.client = MagicMock() + self.queue = Queue() + + def _run_process(self, retries_limit=3, sleep_timeout=1): + # run _send_upstream process with the queue + stop_event = threading.Event() + retry_options = RetryOptions(limit=retries_limit, + backoff_ms=50, + retry_on_timeouts=False) + self.thread = threading.Thread( + target=_send_upstream, + args=(self.queue, self.client, CODEC_NONE, + 0.3, # batch time (seconds) + 3, # batch length + Producer.ACK_AFTER_LOCAL_WRITE, + Producer.DEFAULT_ACK_TIMEOUT, + retry_options, + stop_event)) + self.thread.daemon = True + self.thread.start() + time.sleep(sleep_timeout) + stop_event.set() + + def test_wo_retries(self): + + # lets create a queue and add 10 messages for 1 partition + for i in range(10): + self.queue.put((TopicAndPartition("test", 0), "msg %i", "key %i")) + + self._run_process() + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 4 non-void cals: + # 3 batches of 3 msgs each + 1 batch of 1 message + self.assertEqual(self.client.send_produce_request.call_count, 4) + + def test_first_send_failed(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + self.client.is_first_time = True + def send_side_effect(reqs, *args, **kwargs): + if self.client.is_first_time: + self.client.is_first_time = False + return [FailedPayloadsError(reqs)] + return [] + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void cals: 1st failed batch of 3 msgs + # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + self.assertEqual(self.client.send_produce_request.call_count, 5) + + def test_with_limited_retries(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i)) + + def send_side_effect(reqs, *args, **kwargs): + return [FailedPayloadsError(reqs)] + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(3, 3) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 16 non-void cals: + # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + + # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed + self.assertEqual(self.client.send_produce_request.call_count, 16) + + def tearDown(self): + for _ in xrange(self.queue.qsize()): + self.queue.get() @@ -14,6 +14,7 @@ commands = nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka} setenv = PROJECT_ROOT = {toxinidir} +passenv = KAFKA_VERSION [testenv:py33] deps = |