summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/common.py2
-rw-r--r--kafka/producer/base.py58
2 files changed, 29 insertions, 31 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 0e769e4..e327d02 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -248,4 +248,4 @@ RETRY_REFRESH_ERROR_TYPES = (
)
-RETRY_ERROR_TYPES = list(set(RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES))
+RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 3f0431c..fffea94 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -85,48 +85,46 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if not reqs:
continue
- reqs_to_retry = []
+ reqs_to_retry, error_type = [], None
try:
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
- except tuple(RETRY_ERROR_TYPES) as ex:
+ except FailedPayloadsError as ex:
+ error_type = FailedPayloadsError
+ reqs_to_retry = ex.failed_payloads
- # by default, retry all sent messages
- reqs_to_retry = reqs
-
- if type(ex) == FailedPayloadsError:
- reqs_to_retry = ex.failed_payloads
-
- elif (type(ex) == RequestTimedOutError and
- not retry_options.retry_on_timeouts):
- reqs_to_retry = []
-
- # filter reqs_to_retry if there's a retry limit
- if retry_options.limit and retry_options.limit > 0:
- reqs_to_retry = [req._replace(retries=req.retries+1)
- for req in reqs_to_retry
- if req.retries < retry_options.limit]
-
- # doing backoff before next retry
- if (reqs_to_retry and type(ex) in RETRY_BACKOFF_ERROR_TYPES
- and retry_options.backoff_ms):
- log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms)
- time.sleep(float(retry_options.backoff_ms) / 1000)
-
- # refresh topic metadata before next retry
- if reqs_to_retry and type(ex) in RETRY_REFRESH_ERROR_TYPES:
- client.load_metadata_for_topics()
+ except RequestTimedOutError:
+ error_type = RequestTimedOutError
+ if retry_options.retry_on_timeouts:
+ reqs_to_retry = reqs
except Exception as ex:
- log.exception("Unable to send message: %s" % type(ex))
+ error_type = type(ex)
+ if type(ex) in RETRY_ERROR_TYPES:
+ reqs_to_retry = reqs
finally:
reqs = []
- if reqs_to_retry:
- reqs = reqs_to_retry
+ if not reqs_to_retry:
+ continue
+
+ # doing backoff before next retry
+ if error_type in RETRY_BACKOFF_ERROR_TYPES and retry_options.backoff_ms:
+ log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms)
+ time.sleep(float(retry_options.backoff_ms) / 1000)
+
+ # refresh topic metadata before next retry
+ if error_type in RETRY_REFRESH_ERROR_TYPES:
+ client.load_metadata_for_topics()
+
+ reqs = reqs_to_retry
+ # filter reqs_to_retry if there's a retry limit
+ if retry_options.limit and retry_options.limit > 0:
+ reqs = [req._replace(retries=req.retries+1)
+ for req in reqs if req.retries < retry_options.limit]
class Producer(object):