summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py43
-rw-r--r--test/test_producer.py4
-rw-r--r--tox.ini1
3 files changed, 27 insertions, 21 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 2e2f3c4..2edeace 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -16,8 +16,8 @@ import six
from kafka.common import (
ProduceRequest, TopicAndPartition, RetryOptions,
- UnsupportedCodecError, FailedPayloadsError,
- RequestTimedOutError, AsyncProducerQueueFull
+ kafka_errors, UnsupportedCodecError, FailedPayloadsError,
+ RequestTimedOutError, AsyncProducerQueueFull, UnknownError
)
from kafka.common import (
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
@@ -89,41 +89,46 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if not reqs:
continue
- reqs_to_retry, error_type = [], None
+ reqs_to_retry, error_cls = [], None
+ do_backoff, do_refresh = False, False
+
+ def _handle_error(error_cls, reqs, all_retries):
+ if ((error_cls == RequestTimedOutError and
+ retry_options.retry_on_timeouts) or
+ error_cls in RETRY_ERROR_TYPES):
+ all_retries += reqs
+ if error_cls in RETRY_BACKOFF_ERROR_TYPES:
+ do_backoff = True
+ if error_cls in RETRY_REFRESH_ERROR_TYPES:
+ do_refresh = True
try:
reply = client.send_produce_request(reqs.keys(),
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
- reqs_to_retry = [req for broker_responses in reply
- for response in broker_responses
- for req in response.failed_payloads
- if isinstance(response, FailedPayloadsError)]
- if reqs_to_retry:
- error_type = FailedPayloadsError
-
- except RequestTimedOutError:
- error_type = RequestTimedOutError
- if retry_options.retry_on_timeouts:
- reqs_to_retry = reqs.keys()
+ for i, response in enumerate(reply):
+ if isinstance(response, FailedPayloadsError):
+ _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
+ elif isinstance(response, ProduceResponse) and response.error:
+ error_cls = kafka_errors.get(response.error, UnknownError)
+ _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)
except Exception as ex:
- error_type = type(ex)
- if type(ex) in RETRY_ERROR_TYPES:
- reqs_to_retry = reqs.keys()
+ error_cls = kafka_errors.get(type(ex), UnknownError)
+ _handle_error(error_cls, reqs.keys(), reqs_to_retry)
if not reqs_to_retry:
reqs = {}
continue
# doing backoff before next retry
- if error_type in RETRY_BACKOFF_ERROR_TYPES and retry_options.backoff_ms:
+ if do_backoff and retry_options.backoff_ms:
log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
time.sleep(float(retry_options.backoff_ms) / 1000)
# refresh topic metadata before next retry
- if error_type in RETRY_REFRESH_ERROR_TYPES:
+ if do_refresh:
client.load_metadata_for_topics()
reqs = dict((key, count + 1) for (key, count) in reqs.items()
diff --git a/test/test_producer.py b/test/test_producer.py
index a2ba877..258b9c3 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -143,7 +143,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
def send_side_effect(reqs, *args, **kwargs):
if self.client.is_first_time:
self.client.is_first_time = False
- return [[FailedPayloadsError(reqs)]]
+ return [FailedPayloadsError(reqs)]
return []
self.client.send_produce_request.side_effect = send_side_effect
@@ -165,7 +165,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
def send_side_effect(reqs, *args, **kwargs):
- return [[FailedPayloadsError(reqs)]]
+ return [FailedPayloadsError(reqs)]
self.client.send_produce_request.side_effect = send_side_effect
diff --git a/tox.ini b/tox.ini
index fba7d8e..e3e8568 100644
--- a/tox.ini
+++ b/tox.ini
@@ -14,6 +14,7 @@ commands =
nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
setenv =
PROJECT_ROOT = {toxinidir}
+passenv = KAFKA_VERSION
[testenv:py33]
deps =