diff options
-rw-r--r-- | kafka/producer/base.py | 31 |
1 files changed, 16 insertions, 15 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index c1bc0c5..0bb0c81 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -102,21 +102,22 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): retry_state['do_refresh'] |= True - try: - reply = client.send_produce_request(reqs.keys(), - acks=req_acks, - timeout=ack_timeout, - fail_on_error=False) - for i, response in enumerate(reply): - if isinstance(response, FailedPayloadsError): - _handle_error(FailedPayloadsError, response.payload) - elif isinstance(response, ProduceResponse) and response.error: - error_cls = kafka_errors.get(response.error, UnknownError) - _handle_error(error_cls, reqs.keys()[i]) - - except Exception as ex: - error_cls = kafka_errors.get(type(ex), UnknownError) - _handle_error(error_cls, reqs.keys()) + reply = client.send_produce_request(request_tries.keys(), + acks=req_acks, + timeout=ack_timeout, + fail_on_error=False) + for i, response in enumerate(reply): + error_cls = None + if isinstance(response, FailedPayloadsError): + error_cls = response.__class__ + orig_req = response.payload + + elif isinstance(response, ProduceResponse) and response.error: + error_cls = kafka_errors.get(response.error, UnknownError) + orig_req = request_tries.keys()[i] + + if error_cls: + _handle_error(error_cls, orig_req) if not reqs_to_retry: reqs = {} |