diff options
-rw-r--r-- | kafka/producer/base.py | 33 |
1 files changed, 17 insertions, 16 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 2542df5..ef81a69 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -89,17 +89,18 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, continue reqs_to_retry, error_cls = [], None - do_backoff, do_refresh = False, False - - def _handle_error(error_cls, reqs, all_retries): - if ((error_cls == RequestTimedOutError and - retry_options.retry_on_timeouts) or - error_cls in RETRY_ERROR_TYPES): - all_retries += reqs - if error_cls in RETRY_BACKOFF_ERROR_TYPES: - do_backoff = True - if error_cls in RETRY_REFRESH_ERROR_TYPES: - do_refresh = True + retry_state = { + 'do_backoff': False, + 'do_refresh': False + } + + def _handle_error(error_cls, request): + if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)): + reqs_to_retry.append(request) + if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES): + retry_state['do_backoff'] |= True + if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): + retry_state['do_refresh'] |= True try: reply = client.send_produce_request(reqs.keys(), @@ -108,26 +109,26 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, fail_on_error=False) for i, response in enumerate(reply): if isinstance(response, FailedPayloadsError): - _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry) + _handle_error(FailedPayloadsError, response.payload) elif isinstance(response, ProduceResponse) and response.error: error_cls = kafka_errors.get(response.error, UnknownError) - _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry) + _handle_error(error_cls, reqs.keys()[i]) except Exception as ex: error_cls = kafka_errors.get(type(ex), UnknownError) - _handle_error(error_cls, reqs.keys(), reqs_to_retry) + _handle_error(error_cls, reqs.keys()) if not reqs_to_retry: reqs = {} continue # doing backoff before next retry - if do_backoff and retry_options.backoff_ms: + if retry_state['do_backoff'] and retry_options.backoff_ms: log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms) time.sleep(float(retry_options.backoff_ms) / 1000) # refresh topic metadata before next retry - if do_refresh: + if retry_state['do_refresh']: client.load_metadata_for_topics() reqs = dict((key, count + 1) for (key, count) in reqs.items() |