diff options
-rw-r--r-- | kafka/producer/base.py | 6 | ||||
-rw-r--r-- | test/test_producer.py | 66 |
2 files changed, 60 insertions, 12 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index e0c086b..498539d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -144,10 +144,12 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): retry_state['do_refresh'] |= True - reply = client.send_produce_request(request_tries.keys(), + requests = list(request_tries.keys()) + reply = client.send_produce_request(requests, acks=req_acks, timeout=ack_timeout, fail_on_error=False) + for i, response in enumerate(reply): error_cls = None if isinstance(response, FailedPayloadsError): @@ -156,7 +158,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, elif isinstance(response, ProduceResponse) and response.error: error_cls = kafka_errors.get(response.error, UnknownError) - orig_req = request_tries.keys()[i] + orig_req = requests[i] if error_cls: _handle_error(error_cls, orig_req) diff --git a/test/test_producer.py b/test/test_producer.py index c12af02..c7bdfdb 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,15 +1,17 @@ # -*- coding: utf-8 -*- -import time +import collections import logging +import time from mock import MagicMock, patch from . import unittest -from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions -from kafka.common import AsyncProducerQueueFull -from kafka.producer.base import Producer -from kafka.producer.base import _send_upstream +from kafka.common import ( + AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, + ProduceResponse, RetryOptions, TopicAndPartition +) +from kafka.producer.base import Producer, _send_upstream from kafka.protocol import CODEC_NONE import threading @@ -122,12 +124,21 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): for i in range(10): self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + # Mock offsets counter for closure + offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) self.client.is_first_time = True def send_side_effect(reqs, *args, **kwargs): if self.client.is_first_time: self.client.is_first_time = False return [FailedPayloadsError(req) for req in reqs] - return [] + responses = [] + for req in reqs: + offset = offsets[req.topic][req.partition] + offsets[req.topic][req.partition] += len(req.messages) + responses.append( + ProduceResponse(req.topic, req.partition, 0, offset) + ) + return responses self.client.send_produce_request.side_effect = send_side_effect @@ -136,8 +147,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # the queue should be void at the end of the test self.assertEqual(self.queue.empty(), True) - # there should be 5 non-void cals: 1st failed batch of 3 msgs - # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + # there should be 5 non-void calls: 1st failed batch of 3 msgs + # plus 3 batches of 3 msgs each + 1 batch of 1 message self.assertEqual(self.client.send_produce_request.call_count, 5) def test_with_limited_retries(self): @@ -157,11 +168,46 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # the queue should be void at the end of the test self.assertEqual(self.queue.empty(), True) - # there should be 16 non-void cals: + # there should be 16 non-void calls: # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + - # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed + # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16 self.assertEqual(self.client.send_produce_request.call_count, 16) + def test_async_producer_not_leader(self): + + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + # Mock offsets counter for closure + offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) + self.client.is_first_time = True + def send_side_effect(reqs, *args, **kwargs): + if self.client.is_first_time: + self.client.is_first_time = False + return [ProduceResponse(req.topic, req.partition, + NotLeaderForPartitionError.errno, -1) + for req in reqs] + + responses = [] + for req in reqs: + offset = offsets[req.topic][req.partition] + offsets[req.topic][req.partition] += len(req.messages) + responses.append( + ProduceResponse(req.topic, req.partition, 0, offset) + ) + return responses + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void calls: 1st failed batch of 3 msgs + # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + self.assertEqual(self.client.send_produce_request.call_count, 5) + def tearDown(self): for _ in xrange(self.queue.qsize()): self.queue.get() |