summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-05 12:42:07 -0700
committerDana Powers <dana.powers@rd.io>2015-06-06 19:27:55 -0700
commitd96a9b732ececb2f319e9e37ad4e040b366ce80b (patch)
treedf9c46c1fe8158e0f053c9368691f5931e8e90b9
parent48e278941206815d680b4d4c81d7f1fd2637255c (diff)
downloadkafka-python-d96a9b732ececb2f319e9e37ad4e040b366ce80b.tar.gz
PR 331 fixup: Dont need try/except when calling send_produce_requests with fail_on_error=False
-rw-r--r--kafka/producer/base.py31
1 files changed, 16 insertions, 15 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index c1bc0c5..0bb0c81 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -102,21 +102,22 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
retry_state['do_refresh'] |= True
- try:
- reply = client.send_produce_request(reqs.keys(),
- acks=req_acks,
- timeout=ack_timeout,
- fail_on_error=False)
- for i, response in enumerate(reply):
- if isinstance(response, FailedPayloadsError):
- _handle_error(FailedPayloadsError, response.payload)
- elif isinstance(response, ProduceResponse) and response.error:
- error_cls = kafka_errors.get(response.error, UnknownError)
- _handle_error(error_cls, reqs.keys()[i])
-
- except Exception as ex:
- error_cls = kafka_errors.get(type(ex), UnknownError)
- _handle_error(error_cls, reqs.keys())
+ reply = client.send_produce_request(request_tries.keys(),
+ acks=req_acks,
+ timeout=ack_timeout,
+ fail_on_error=False)
+ for i, response in enumerate(reply):
+ error_cls = None
+ if isinstance(response, FailedPayloadsError):
+ error_cls = response.__class__
+ orig_req = response.payload
+
+ elif isinstance(response, ProduceResponse) and response.error:
+ error_cls = kafka_errors.get(response.error, UnknownError)
+ orig_req = request_tries.keys()[i]
+
+ if error_cls:
+ _handle_error(error_cls, orig_req)
if not reqs_to_retry:
reqs = {}