summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-02-26 12:40:44 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:47 +0300
commit4b8288a578c0cee696ef9d0523f9cec32e8b1f05 (patch)
treea1ddd88dbee4f826e8f1448d56ac9b1918c0d8dd
parent81d868869fa2f7ab980df5477d82654dc2598356 (diff)
downloadkafka-python-4b8288a578c0cee696ef9d0523f9cec32e8b1f05.tar.gz
Producer _send_upstream fixes, added tests for retries
-rw-r--r--kafka/producer/base.py26
-rw-r--r--test/test_producer.py137
2 files changed, 157 insertions, 6 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 34b1d04..505e31b 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -44,6 +44,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
while not stop_event.is_set():
timeout = batch_time
+
+ # it's a simplification: we're comparing message sets and
+ # messages: each set can contain [1..batch_size] messages
count = batch_size - len(reqs)
send_at = time.time() + timeout
msgset = defaultdict(list)
@@ -74,6 +77,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
messages)
reqs.append(req)
+ if not reqs:
+ continue
+
+ reqs_to_retry = []
try:
client.send_produce_request(reqs,
acks=req_acks,
@@ -81,15 +88,22 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
except FailedPayloadsError as ex:
log.exception("Failed payloads count %s" % len(ex.message))
if retries_limit is None:
- reqs = ex.message
- continue
- for req in ex.message:
- if retries_limit and req.retries < retries_limit:
- reqs.append(req._replace(retries=req.retries+1))
+ # retry all failed messages until success
+ reqs_to_retry = ex.message
+ elif not retries_limit < 0:
+ #
+ for req in ex.message:
+ if retries_limit and req.retries < retries_limit:
+ updated_req = req._replace(retries=req.retries+1)
+ reqs_to_retry.append(updated_req)
except Exception as ex:
log.exception("Unable to send message: %s" % type(ex))
+ finally:
+ reqs = []
- if reqs and retry_backoff:
+ if reqs_to_retry and retry_backoff:
+ reqs = reqs_to_retry
+ log.warning("%s requests will be retried next call." % len(reqs))
time.sleep(float(retry_backoff) / 1000)
diff --git a/test/test_producer.py b/test/test_producer.py
index f6b3d6a..eecc7a7 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,11 +1,22 @@
# -*- coding: utf-8 -*-
+import time
import logging
from mock import MagicMock
from . import unittest
+from kafka.common import TopicAndPartition, FailedPayloadsError
from kafka.producer.base import Producer
+from kafka.producer.base import _send_upstream
+from kafka.protocol import CODEC_NONE
+
+import threading
+import multiprocessing as mp
+try:
+ from queue import Empty
+except ImportError:
+ from Queue import Empty
class TestKafkaProducer(unittest.TestCase):
@@ -40,3 +51,129 @@ class TestKafkaProducer(unittest.TestCase):
topic = b"test-topic"
producer.send_messages(topic, b'hi')
assert client.send_produce_request.called
+
+
+class TestKafkaProducerSendUpstream(unittest.TestCase):
+
+ def setUp(self):
+
+ # create a multiprocessing Value to store call counter
+ # (magicmock counters don't work with other processes)
+ self.send_calls_count = mp.Value('i', 0)
+
+ def send_side_effect(*args, **kwargs):
+ self.send_calls_count.value += 1
+
+ self.client = MagicMock()
+ self.client.send_produce_request.side_effect = send_side_effect
+ self.queue = mp.Queue()
+
+ def _run_process(self, retries_limit=3, sleep_timeout=1):
+ # run _send_upstream process with the queue
+ self.process = mp.Process(
+ target=_send_upstream,
+ args=(self.queue, self.client, CODEC_NONE,
+ 0.3, # batch time (seconds)
+ 3, # batch length
+ Producer.ACK_AFTER_LOCAL_WRITE,
+ Producer.DEFAULT_ACK_TIMEOUT,
+ 50, # retry backoff (ms)
+ retries_limit))
+ self.process.daemon = True
+ self.process.start()
+ time.sleep(sleep_timeout)
+ self.process.terminate()
+
+ def test_wo_retries(self):
+
+ # lets create a queue and add 10 messages for 1 partition
+ for i in range(10):
+ self.queue.put((TopicAndPartition("test", 0), "msg %i", "key %i"))
+
+ self._run_process()
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 4 non-void cals:
+ # 3 batches of 3 msgs each + 1 batch of 1 message
+ self.assertEqual(self.send_calls_count.value, 4)
+
+ def test_first_send_failed(self):
+
+ # lets create a queue and add 10 messages for 10 different partitions
+ # to show how retries should work ideally
+ for i in range(10):
+ self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+
+ flag = mp.Value('c', 'f')
+ def send_side_effect(reqs, *args, **kwargs):
+ self.send_calls_count.value += 1
+ if flag.value == 'f':
+ flag.value = 't'
+ raise FailedPayloadsError(reqs)
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(2)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 5 non-void cals: 1st failed batch of 3 msgs
+ # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
+ self.assertEqual(self.send_calls_count.value, 5)
+
+ def test_with_limited_retries(self):
+
+ # lets create a queue and add 10 messages for 10 different partitions
+ # to show how retries should work ideally
+ for i in range(10):
+ self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+
+ def send_side_effect(reqs, *args, **kwargs):
+ self.send_calls_count.value += 1
+ raise FailedPayloadsError(reqs)
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(3, 2)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 16 non-void cals:
+ # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
+ # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed
+ self.assertEqual(self.send_calls_count.value, 16)
+
+
+ def test_with_unlimited_retries(self):
+
+ # lets create a queue and add 10 messages for 10 different partitions
+ # to show how retries should work ideally
+ for i in range(10):
+ self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+
+ def send_side_effect(reqs, *args, **kwargs):
+ self.send_calls_count.value += 1
+ raise FailedPayloadsError(reqs)
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(None)
+
+ # the queue should have 7 elements
+ # 3 batches of 1 msg each were retried all this time
+ self.assertEqual(self.queue.empty(), False)
+ left = 0
+ for i in range(10):
+ try:
+ self.queue.get(timeout=0.01)
+ left += 1
+ except Empty:
+ break
+ self.assertEqual(left, 7)
+
+ # 1s / 50ms of backoff = 20 times
+ self.assertEqual(self.send_calls_count.value, 20)