diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-02-26 12:40:44 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:47 +0300 |
commit | 4b8288a578c0cee696ef9d0523f9cec32e8b1f05 (patch) | |
tree | a1ddd88dbee4f826e8f1448d56ac9b1918c0d8dd /test | |
parent | 81d868869fa2f7ab980df5477d82654dc2598356 (diff) | |
download | kafka-python-4b8288a578c0cee696ef9d0523f9cec32e8b1f05.tar.gz |
Producer _send_upstream fixes, added tests for retries
Diffstat (limited to 'test')
-rw-r--r-- | test/test_producer.py | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/test/test_producer.py b/test/test_producer.py index f6b3d6a..eecc7a7 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,11 +1,22 @@ # -*- coding: utf-8 -*- +import time import logging from mock import MagicMock from . import unittest +from kafka.common import TopicAndPartition, FailedPayloadsError from kafka.producer.base import Producer +from kafka.producer.base import _send_upstream +from kafka.protocol import CODEC_NONE + +import threading +import multiprocessing as mp +try: + from queue import Empty +except ImportError: + from Queue import Empty class TestKafkaProducer(unittest.TestCase): @@ -40,3 +51,129 @@ class TestKafkaProducer(unittest.TestCase): topic = b"test-topic" producer.send_messages(topic, b'hi') assert client.send_produce_request.called + + +class TestKafkaProducerSendUpstream(unittest.TestCase): + + def setUp(self): + + # create a multiprocessing Value to store call counter + # (magicmock counters don't work with other processes) + self.send_calls_count = mp.Value('i', 0) + + def send_side_effect(*args, **kwargs): + self.send_calls_count.value += 1 + + self.client = MagicMock() + self.client.send_produce_request.side_effect = send_side_effect + self.queue = mp.Queue() + + def _run_process(self, retries_limit=3, sleep_timeout=1): + # run _send_upstream process with the queue + self.process = mp.Process( + target=_send_upstream, + args=(self.queue, self.client, CODEC_NONE, + 0.3, # batch time (seconds) + 3, # batch length + Producer.ACK_AFTER_LOCAL_WRITE, + Producer.DEFAULT_ACK_TIMEOUT, + 50, # retry backoff (ms) + retries_limit)) + self.process.daemon = True + self.process.start() + time.sleep(sleep_timeout) + self.process.terminate() + + def test_wo_retries(self): + + # lets create a queue and add 10 messages for 1 partition + for i in range(10): + self.queue.put((TopicAndPartition("test", 0), "msg %i", "key %i")) + + self._run_process() + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 4 non-void cals: + # 3 batches of 3 msgs each + 1 batch of 1 message + self.assertEqual(self.send_calls_count.value, 4) + + def test_first_send_failed(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + flag = mp.Value('c', 'f') + def send_side_effect(reqs, *args, **kwargs): + self.send_calls_count.value += 1 + if flag.value == 'f': + flag.value = 't' + raise FailedPayloadsError(reqs) + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void cals: 1st failed batch of 3 msgs + # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + self.assertEqual(self.send_calls_count.value, 5) + + def test_with_limited_retries(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + def send_side_effect(reqs, *args, **kwargs): + self.send_calls_count.value += 1 + raise FailedPayloadsError(reqs) + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(3, 2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 16 non-void cals: + # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + + # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed + self.assertEqual(self.send_calls_count.value, 16) + + + def test_with_unlimited_retries(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + def send_side_effect(reqs, *args, **kwargs): + self.send_calls_count.value += 1 + raise FailedPayloadsError(reqs) + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(None) + + # the queue should have 7 elements + # 3 batches of 1 msg each were retried all this time + self.assertEqual(self.queue.empty(), False) + left = 0 + for i in range(10): + try: + self.queue.get(timeout=0.01) + left += 1 + except Empty: + break + self.assertEqual(left, 7) + + # 1s / 50ms of backoff = 20 times + self.assertEqual(self.send_calls_count.value, 20) |