diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-02-26 12:40:44 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:47 +0300 |
commit | 4b8288a578c0cee696ef9d0523f9cec32e8b1f05 (patch) | |
tree | a1ddd88dbee4f826e8f1448d56ac9b1918c0d8dd | |
parent | 81d868869fa2f7ab980df5477d82654dc2598356 (diff) | |
download | kafka-python-4b8288a578c0cee696ef9d0523f9cec32e8b1f05.tar.gz |
Producer _send_upstream fixes, added tests for retries
-rw-r--r-- | kafka/producer/base.py | 26 | ||||
-rw-r--r-- | test/test_producer.py | 137 |
2 files changed, 157 insertions, 6 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 34b1d04..505e31b 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -44,6 +44,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, while not stop_event.is_set(): timeout = batch_time + + # it's a simplification: we're comparing message sets and + # messages: each set can contain [1..batch_size] messages count = batch_size - len(reqs) send_at = time.time() + timeout msgset = defaultdict(list) @@ -74,6 +77,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, messages) reqs.append(req) + if not reqs: + continue + + reqs_to_retry = [] try: client.send_produce_request(reqs, acks=req_acks, @@ -81,15 +88,22 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, except FailedPayloadsError as ex: log.exception("Failed payloads count %s" % len(ex.message)) if retries_limit is None: - reqs = ex.message - continue - for req in ex.message: - if retries_limit and req.retries < retries_limit: - reqs.append(req._replace(retries=req.retries+1)) + # retry all failed messages until success + reqs_to_retry = ex.message + elif not retries_limit < 0: + # + for req in ex.message: + if retries_limit and req.retries < retries_limit: + updated_req = req._replace(retries=req.retries+1) + reqs_to_retry.append(updated_req) except Exception as ex: log.exception("Unable to send message: %s" % type(ex)) + finally: + reqs = [] - if reqs and retry_backoff: + if reqs_to_retry and retry_backoff: + reqs = reqs_to_retry + log.warning("%s requests will be retried next call." % len(reqs)) time.sleep(float(retry_backoff) / 1000) diff --git a/test/test_producer.py b/test/test_producer.py index f6b3d6a..eecc7a7 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,11 +1,22 @@ # -*- coding: utf-8 -*- +import time import logging from mock import MagicMock from . import unittest +from kafka.common import TopicAndPartition, FailedPayloadsError from kafka.producer.base import Producer +from kafka.producer.base import _send_upstream +from kafka.protocol import CODEC_NONE + +import threading +import multiprocessing as mp +try: + from queue import Empty +except ImportError: + from Queue import Empty class TestKafkaProducer(unittest.TestCase): @@ -40,3 +51,129 @@ class TestKafkaProducer(unittest.TestCase): topic = b"test-topic" producer.send_messages(topic, b'hi') assert client.send_produce_request.called + + +class TestKafkaProducerSendUpstream(unittest.TestCase): + + def setUp(self): + + # create a multiprocessing Value to store call counter + # (magicmock counters don't work with other processes) + self.send_calls_count = mp.Value('i', 0) + + def send_side_effect(*args, **kwargs): + self.send_calls_count.value += 1 + + self.client = MagicMock() + self.client.send_produce_request.side_effect = send_side_effect + self.queue = mp.Queue() + + def _run_process(self, retries_limit=3, sleep_timeout=1): + # run _send_upstream process with the queue + self.process = mp.Process( + target=_send_upstream, + args=(self.queue, self.client, CODEC_NONE, + 0.3, # batch time (seconds) + 3, # batch length + Producer.ACK_AFTER_LOCAL_WRITE, + Producer.DEFAULT_ACK_TIMEOUT, + 50, # retry backoff (ms) + retries_limit)) + self.process.daemon = True + self.process.start() + time.sleep(sleep_timeout) + self.process.terminate() + + def test_wo_retries(self): + + # lets create a queue and add 10 messages for 1 partition + for i in range(10): + self.queue.put((TopicAndPartition("test", 0), "msg %i", "key %i")) + + self._run_process() + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 4 non-void cals: + # 3 batches of 3 msgs each + 1 batch of 1 message + self.assertEqual(self.send_calls_count.value, 4) + + def test_first_send_failed(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + flag = mp.Value('c', 'f') + def send_side_effect(reqs, *args, **kwargs): + self.send_calls_count.value += 1 + if flag.value == 'f': + flag.value = 't' + raise FailedPayloadsError(reqs) + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void cals: 1st failed batch of 3 msgs + # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + self.assertEqual(self.send_calls_count.value, 5) + + def test_with_limited_retries(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + def send_side_effect(reqs, *args, **kwargs): + self.send_calls_count.value += 1 + raise FailedPayloadsError(reqs) + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(3, 2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 16 non-void cals: + # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + + # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed + self.assertEqual(self.send_calls_count.value, 16) + + + def test_with_unlimited_retries(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + def send_side_effect(reqs, *args, **kwargs): + self.send_calls_count.value += 1 + raise FailedPayloadsError(reqs) + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(None) + + # the queue should have 7 elements + # 3 batches of 1 msg each were retried all this time + self.assertEqual(self.queue.empty(), False) + left = 0 + for i in range(10): + try: + self.queue.get(timeout=0.01) + left += 1 + except Empty: + break + self.assertEqual(left, 7) + + # 1s / 50ms of backoff = 20 times + self.assertEqual(self.send_calls_count.value, 20) |