diff options
-rw-r--r-- | kafka/producer/base.py | 43 | ||||
-rw-r--r-- | test/test_producer.py | 4 | ||||
-rw-r--r-- | tox.ini | 1 |
3 files changed, 27 insertions, 21 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 2e2f3c4..2edeace 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -16,8 +16,8 @@ import six from kafka.common import ( ProduceRequest, TopicAndPartition, RetryOptions, - UnsupportedCodecError, FailedPayloadsError, - RequestTimedOutError, AsyncProducerQueueFull + kafka_errors, UnsupportedCodecError, FailedPayloadsError, + RequestTimedOutError, AsyncProducerQueueFull, UnknownError ) from kafka.common import ( RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) @@ -89,41 +89,46 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if not reqs: continue - reqs_to_retry, error_type = [], None + reqs_to_retry, error_cls = [], None + do_backoff, do_refresh = False, False + + def _handle_error(error_cls, reqs, all_retries): + if ((error_cls == RequestTimedOutError and + retry_options.retry_on_timeouts) or + error_cls in RETRY_ERROR_TYPES): + all_retries += reqs + if error_cls in RETRY_BACKOFF_ERROR_TYPES: + do_backoff = True + if error_cls in RETRY_REFRESH_ERROR_TYPES: + do_refresh = True try: reply = client.send_produce_request(reqs.keys(), acks=req_acks, timeout=ack_timeout, fail_on_error=False) - reqs_to_retry = [req for broker_responses in reply - for response in broker_responses - for req in response.failed_payloads - if isinstance(response, FailedPayloadsError)] - if reqs_to_retry: - error_type = FailedPayloadsError - - except RequestTimedOutError: - error_type = RequestTimedOutError - if retry_options.retry_on_timeouts: - reqs_to_retry = reqs.keys() + for i, response in enumerate(reply): + if isinstance(response, FailedPayloadsError): + _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry) + elif isinstance(response, ProduceResponse) and response.error: + error_cls = kafka_errors.get(response.error, UnknownError) + _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry) except Exception as ex: - error_type = type(ex) - if type(ex) in RETRY_ERROR_TYPES: - reqs_to_retry = reqs.keys() + error_cls = kafka_errors.get(type(ex), UnknownError) + _handle_error(error_cls, reqs.keys(), reqs_to_retry) if not reqs_to_retry: reqs = {} continue # doing backoff before next retry - if error_type in RETRY_BACKOFF_ERROR_TYPES and retry_options.backoff_ms: + if do_backoff and retry_options.backoff_ms: log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms) time.sleep(float(retry_options.backoff_ms) / 1000) # refresh topic metadata before next retry - if error_type in RETRY_REFRESH_ERROR_TYPES: + if do_refresh: client.load_metadata_for_topics() reqs = dict((key, count + 1) for (key, count) in reqs.items() diff --git a/test/test_producer.py b/test/test_producer.py index a2ba877..258b9c3 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -143,7 +143,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): def send_side_effect(reqs, *args, **kwargs): if self.client.is_first_time: self.client.is_first_time = False - return [[FailedPayloadsError(reqs)]] + return [FailedPayloadsError(reqs)] return [] self.client.send_produce_request.side_effect = send_side_effect @@ -165,7 +165,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i)) def send_side_effect(reqs, *args, **kwargs): - return [[FailedPayloadsError(reqs)]] + return [FailedPayloadsError(reqs)] self.client.send_produce_request.side_effect = send_side_effect @@ -14,6 +14,7 @@ commands = nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka} setenv = PROJECT_ROOT = {toxinidir} +passenv = KAFKA_VERSION [testenv:py33] deps = |