summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-04 13:28:48 -0700
committerDana Powers <dana.powers@rd.io>2015-06-06 19:27:55 -0700
commit6faa7c0e697b3096391453e50149c0dac59b05e0 (patch)
treef1d33d26dae9d055821a05835ccd912f55c5c1b0
parent68f5506c20d936257f3c11aa12cb692c8c732ed0 (diff)
downloadkafka-python-6faa7c0e697b3096391453e50149c0dac59b05e0.tar.gz
PR 331 fixup: fix _handle_error closure
-rw-r--r--kafka/producer/base.py33
1 files changed, 17 insertions, 16 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 2542df5..ef81a69 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -89,17 +89,18 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
continue
reqs_to_retry, error_cls = [], None
- do_backoff, do_refresh = False, False
-
- def _handle_error(error_cls, reqs, all_retries):
- if ((error_cls == RequestTimedOutError and
- retry_options.retry_on_timeouts) or
- error_cls in RETRY_ERROR_TYPES):
- all_retries += reqs
- if error_cls in RETRY_BACKOFF_ERROR_TYPES:
- do_backoff = True
- if error_cls in RETRY_REFRESH_ERROR_TYPES:
- do_refresh = True
+ retry_state = {
+ 'do_backoff': False,
+ 'do_refresh': False
+ }
+
+ def _handle_error(error_cls, request):
+ if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)):
+ reqs_to_retry.append(request)
+ if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES):
+ retry_state['do_backoff'] |= True
+ if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
+ retry_state['do_refresh'] |= True
try:
reply = client.send_produce_request(reqs.keys(),
@@ -108,26 +109,26 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
fail_on_error=False)
for i, response in enumerate(reply):
if isinstance(response, FailedPayloadsError):
- _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
+ _handle_error(FailedPayloadsError, response.payload)
elif isinstance(response, ProduceResponse) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
- _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)
+ _handle_error(error_cls, reqs.keys()[i])
except Exception as ex:
error_cls = kafka_errors.get(type(ex), UnknownError)
- _handle_error(error_cls, reqs.keys(), reqs_to_retry)
+ _handle_error(error_cls, reqs.keys())
if not reqs_to_retry:
reqs = {}
continue
# doing backoff before next retry
- if do_backoff and retry_options.backoff_ms:
+ if retry_state['do_backoff'] and retry_options.backoff_ms:
log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
time.sleep(float(retry_options.backoff_ms) / 1000)
# refresh topic metadata before next retry
- if do_refresh:
+ if retry_state['do_refresh']:
client.load_metadata_for_topics()
reqs = dict((key, count + 1) for (key, count) in reqs.items()