summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py6
-rw-r--r--test/test_producer.py66
2 files changed, 60 insertions, 12 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index e0c086b..498539d 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -144,10 +144,12 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
retry_state['do_refresh'] |= True
- reply = client.send_produce_request(request_tries.keys(),
+ requests = list(request_tries.keys())
+ reply = client.send_produce_request(requests,
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
+
for i, response in enumerate(reply):
error_cls = None
if isinstance(response, FailedPayloadsError):
@@ -156,7 +158,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
elif isinstance(response, ProduceResponse) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
- orig_req = request_tries.keys()[i]
+ orig_req = requests[i]
if error_cls:
_handle_error(error_cls, orig_req)
diff --git a/test/test_producer.py b/test/test_producer.py
index c12af02..c7bdfdb 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,15 +1,17 @@
# -*- coding: utf-8 -*-
-import time
+import collections
import logging
+import time
from mock import MagicMock, patch
from . import unittest
-from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions
-from kafka.common import AsyncProducerQueueFull
-from kafka.producer.base import Producer
-from kafka.producer.base import _send_upstream
+from kafka.common import (
+ AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
+ ProduceResponse, RetryOptions, TopicAndPartition
+)
+from kafka.producer.base import Producer, _send_upstream
from kafka.protocol import CODEC_NONE
import threading
@@ -122,12 +124,21 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
for i in range(10):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+ # Mock offsets counter for closure
+ offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
self.client.is_first_time = True
def send_side_effect(reqs, *args, **kwargs):
if self.client.is_first_time:
self.client.is_first_time = False
return [FailedPayloadsError(req) for req in reqs]
- return []
+ responses = []
+ for req in reqs:
+ offset = offsets[req.topic][req.partition]
+ offsets[req.topic][req.partition] += len(req.messages)
+ responses.append(
+ ProduceResponse(req.topic, req.partition, 0, offset)
+ )
+ return responses
self.client.send_produce_request.side_effect = send_side_effect
@@ -136,8 +147,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# the queue should be void at the end of the test
self.assertEqual(self.queue.empty(), True)
- # there should be 5 non-void cals: 1st failed batch of 3 msgs
- # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
+ # there should be 5 non-void calls: 1st failed batch of 3 msgs
+ # plus 3 batches of 3 msgs each + 1 batch of 1 message
self.assertEqual(self.client.send_produce_request.call_count, 5)
def test_with_limited_retries(self):
@@ -157,11 +168,46 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# the queue should be void at the end of the test
self.assertEqual(self.queue.empty(), True)
- # there should be 16 non-void cals:
+ # there should be 16 non-void calls:
# 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
- # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed
+ # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16
self.assertEqual(self.client.send_produce_request.call_count, 16)
+ def test_async_producer_not_leader(self):
+
+ for i in range(10):
+ self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+
+ # Mock offsets counter for closure
+ offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
+ self.client.is_first_time = True
+ def send_side_effect(reqs, *args, **kwargs):
+ if self.client.is_first_time:
+ self.client.is_first_time = False
+ return [ProduceResponse(req.topic, req.partition,
+ NotLeaderForPartitionError.errno, -1)
+ for req in reqs]
+
+ responses = []
+ for req in reqs:
+ offset = offsets[req.topic][req.partition]
+ offsets[req.topic][req.partition] += len(req.messages)
+ responses.append(
+ ProduceResponse(req.topic, req.partition, 0, offset)
+ )
+ return responses
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(2)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 5 non-void calls: 1st failed batch of 3 msgs
+ # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
+ self.assertEqual(self.client.send_produce_request.call_count, 5)
+
def tearDown(self):
for _ in xrange(self.queue.qsize()):
self.queue.get()