diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/producer/base.py | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 2e2f3c4..2edeace 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -16,8 +16,8 @@ import six from kafka.common import ( ProduceRequest, TopicAndPartition, RetryOptions, - UnsupportedCodecError, FailedPayloadsError, - RequestTimedOutError, AsyncProducerQueueFull + kafka_errors, UnsupportedCodecError, FailedPayloadsError, + RequestTimedOutError, AsyncProducerQueueFull, UnknownError ) from kafka.common import ( RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) @@ -89,41 +89,46 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if not reqs: continue - reqs_to_retry, error_type = [], None + reqs_to_retry, error_cls = [], None + do_backoff, do_refresh = False, False + + def _handle_error(error_cls, reqs, all_retries): + if ((error_cls == RequestTimedOutError and + retry_options.retry_on_timeouts) or + error_cls in RETRY_ERROR_TYPES): + all_retries += reqs + if error_cls in RETRY_BACKOFF_ERROR_TYPES: + do_backoff = True + if error_cls in RETRY_REFRESH_ERROR_TYPES: + do_refresh = True try: reply = client.send_produce_request(reqs.keys(), acks=req_acks, timeout=ack_timeout, fail_on_error=False) - reqs_to_retry = [req for broker_responses in reply - for response in broker_responses - for req in response.failed_payloads - if isinstance(response, FailedPayloadsError)] - if reqs_to_retry: - error_type = FailedPayloadsError - - except RequestTimedOutError: - error_type = RequestTimedOutError - if retry_options.retry_on_timeouts: - reqs_to_retry = reqs.keys() + for i, response in enumerate(reply): + if isinstance(response, FailedPayloadsError): + _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry) + elif isinstance(response, ProduceResponse) and response.error: + error_cls = kafka_errors.get(response.error, UnknownError) + _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry) except Exception as ex: - error_type = type(ex) - if type(ex) in RETRY_ERROR_TYPES: - reqs_to_retry = reqs.keys() + error_cls = kafka_errors.get(type(ex), UnknownError) + _handle_error(error_cls, reqs.keys(), reqs_to_retry) if not reqs_to_retry: reqs = {} continue # doing backoff before next retry - if error_type in RETRY_BACKOFF_ERROR_TYPES and retry_options.backoff_ms: + if do_backoff and retry_options.backoff_ms: log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms) time.sleep(float(retry_options.backoff_ms) / 1000) # refresh topic metadata before next retry - if error_type in RETRY_REFRESH_ERROR_TYPES: + if do_refresh: client.load_metadata_for_topics() reqs = dict((key, count + 1) for (key, count) in reqs.items() |