diff options
-rw-r--r-- | kafka/producer/base.py | 8 | ||||
-rw-r--r-- | test/test_producer.py | 23 |
2 files changed, 16 insertions, 15 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 505e31b..87d923a 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -86,13 +86,15 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, acks=req_acks, timeout=ack_timeout) except FailedPayloadsError as ex: - log.exception("Failed payloads count %s" % len(ex.message)) + failed_reqs = ex.args[0] + log.exception("Failed payloads count %s" % len(failed_reqs)) + if retries_limit is None: # retry all failed messages until success - reqs_to_retry = ex.message + reqs_to_retry = failed_reqs elif not retries_limit < 0: # - for req in ex.message: + for req in failed_reqs: if retries_limit and req.retries < retries_limit: updated_req = req._replace(retries=req.retries+1) reqs_to_retry.append(updated_req) diff --git a/test/test_producer.py b/test/test_producer.py index eecc7a7..51a74b5 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -106,11 +106,11 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): for i in range(10): self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) - flag = mp.Value('c', 'f') + is_first_time = mp.Value('b', True) def send_side_effect(reqs, *args, **kwargs): self.send_calls_count.value += 1 - if flag.value == 'f': - flag.value = 't' + if is_first_time.value: + is_first_time.value = False raise FailedPayloadsError(reqs) self.client.send_produce_request.side_effect = send_side_effect @@ -166,14 +166,13 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # the queue should have 7 elements # 3 batches of 1 msg each were retried all this time self.assertEqual(self.queue.empty(), False) - left = 0 - for i in range(10): - try: + try: + for i in range(7): self.queue.get(timeout=0.01) - left += 1 - except Empty: - break - self.assertEqual(left, 7) + except Empty: + self.fail("Should be 7 elems in the queue") + self.assertEqual(self.queue.empty(), True) - # 1s / 50ms of backoff = 20 times - self.assertEqual(self.send_calls_count.value, 20) + # 1s / 50ms of backoff = 20 times max + self.assertTrue(self.send_calls_count.value > 10) + self.assertTrue(self.send_calls_count.value <= 20) |