diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-04 13:28:48 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-06 19:27:55 -0700 |
commit | 6faa7c0e697b3096391453e50149c0dac59b05e0 (patch) | |
tree | f1d33d26dae9d055821a05835ccd912f55c5c1b0 | |
parent | 68f5506c20d936257f3c11aa12cb692c8c732ed0 (diff) | |
download | kafka-python-6faa7c0e697b3096391453e50149c0dac59b05e0.tar.gz |
PR 331 fixup: fix _handle_error closure
-rw-r--r-- | kafka/producer/base.py | 33 |
1 files changed, 17 insertions, 16 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 2542df5..ef81a69 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -89,17 +89,18 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, continue reqs_to_retry, error_cls = [], None - do_backoff, do_refresh = False, False - - def _handle_error(error_cls, reqs, all_retries): - if ((error_cls == RequestTimedOutError and - retry_options.retry_on_timeouts) or - error_cls in RETRY_ERROR_TYPES): - all_retries += reqs - if error_cls in RETRY_BACKOFF_ERROR_TYPES: - do_backoff = True - if error_cls in RETRY_REFRESH_ERROR_TYPES: - do_refresh = True + retry_state = { + 'do_backoff': False, + 'do_refresh': False + } + + def _handle_error(error_cls, request): + if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)): + reqs_to_retry.append(request) + if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES): + retry_state['do_backoff'] |= True + if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): + retry_state['do_refresh'] |= True try: reply = client.send_produce_request(reqs.keys(), @@ -108,26 +109,26 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, fail_on_error=False) for i, response in enumerate(reply): if isinstance(response, FailedPayloadsError): - _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry) + _handle_error(FailedPayloadsError, response.payload) elif isinstance(response, ProduceResponse) and response.error: error_cls = kafka_errors.get(response.error, UnknownError) - _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry) + _handle_error(error_cls, reqs.keys()[i]) except Exception as ex: error_cls = kafka_errors.get(type(ex), UnknownError) - _handle_error(error_cls, reqs.keys(), reqs_to_retry) + _handle_error(error_cls, reqs.keys()) if not reqs_to_retry: reqs = {} continue # doing backoff before next retry - if do_backoff and retry_options.backoff_ms: + if retry_state['do_backoff'] and retry_options.backoff_ms: log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms) time.sleep(float(retry_options.backoff_ms) / 1000) # refresh topic metadata before next retry - if do_refresh: + if retry_state['do_refresh']: client.load_metadata_for_topics() reqs = dict((key, count + 1) for (key, count) in reqs.items() |