diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-05 12:42:07 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-06 19:27:55 -0700 |
commit | d96a9b732ececb2f319e9e37ad4e040b366ce80b (patch) | |
tree | df9c46c1fe8158e0f053c9368691f5931e8e90b9 | |
parent | 48e278941206815d680b4d4c81d7f1fd2637255c (diff) | |
download | kafka-python-d96a9b732ececb2f319e9e37ad4e040b366ce80b.tar.gz |
PR 331 fixup: Dont need try/except when calling send_produce_requests with fail_on_error=False
-rw-r--r-- | kafka/producer/base.py | 31 |
1 files changed, 16 insertions, 15 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index c1bc0c5..0bb0c81 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -102,21 +102,22 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): retry_state['do_refresh'] |= True - try: - reply = client.send_produce_request(reqs.keys(), - acks=req_acks, - timeout=ack_timeout, - fail_on_error=False) - for i, response in enumerate(reply): - if isinstance(response, FailedPayloadsError): - _handle_error(FailedPayloadsError, response.payload) - elif isinstance(response, ProduceResponse) and response.error: - error_cls = kafka_errors.get(response.error, UnknownError) - _handle_error(error_cls, reqs.keys()[i]) - - except Exception as ex: - error_cls = kafka_errors.get(type(ex), UnknownError) - _handle_error(error_cls, reqs.keys()) + reply = client.send_produce_request(request_tries.keys(), + acks=req_acks, + timeout=ack_timeout, + fail_on_error=False) + for i, response in enumerate(reply): + error_cls = None + if isinstance(response, FailedPayloadsError): + error_cls = response.__class__ + orig_req = response.payload + + elif isinstance(response, ProduceResponse) and response.error: + error_cls = kafka_errors.get(response.error, UnknownError) + orig_req = request_tries.keys()[i] + + if error_cls: + _handle_error(error_cls, orig_req) if not reqs_to_retry: reqs = {} |