summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py8
-rw-r--r--test/test_producer.py23
2 files changed, 16 insertions, 15 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 505e31b..87d923a 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -86,13 +86,15 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
acks=req_acks,
timeout=ack_timeout)
except FailedPayloadsError as ex:
- log.exception("Failed payloads count %s" % len(ex.message))
+ failed_reqs = ex.args[0]
+ log.exception("Failed payloads count %s" % len(failed_reqs))
+
if retries_limit is None:
# retry all failed messages until success
- reqs_to_retry = ex.message
+ reqs_to_retry = failed_reqs
elif not retries_limit < 0:
#
- for req in ex.message:
+ for req in failed_reqs:
if retries_limit and req.retries < retries_limit:
updated_req = req._replace(retries=req.retries+1)
reqs_to_retry.append(updated_req)
diff --git a/test/test_producer.py b/test/test_producer.py
index eecc7a7..51a74b5 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -106,11 +106,11 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
for i in range(10):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
- flag = mp.Value('c', 'f')
+ is_first_time = mp.Value('b', True)
def send_side_effect(reqs, *args, **kwargs):
self.send_calls_count.value += 1
- if flag.value == 'f':
- flag.value = 't'
+ if is_first_time.value:
+ is_first_time.value = False
raise FailedPayloadsError(reqs)
self.client.send_produce_request.side_effect = send_side_effect
@@ -166,14 +166,13 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# the queue should have 7 elements
# 3 batches of 1 msg each were retried all this time
self.assertEqual(self.queue.empty(), False)
- left = 0
- for i in range(10):
- try:
+ try:
+ for i in range(7):
self.queue.get(timeout=0.01)
- left += 1
- except Empty:
- break
- self.assertEqual(left, 7)
+ except Empty:
+ self.fail("Should be 7 elems in the queue")
+ self.assertEqual(self.queue.empty(), True)
- # 1s / 50ms of backoff = 20 times
- self.assertEqual(self.send_calls_count.value, 20)
+ # 1s / 50ms of backoff = 20 times max
+ self.assertTrue(self.send_calls_count.value > 10)
+ self.assertTrue(self.send_calls_count.value <= 20)