diff options
| -rw-r--r-- | kafka/producer/base.py | 6 | 
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index e0c086b..498539d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -144,10 +144,12 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,              if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):                  retry_state['do_refresh'] |= True -        reply = client.send_produce_request(request_tries.keys(), +        requests = list(request_tries.keys()) +        reply = client.send_produce_request(requests,                                              acks=req_acks,                                              timeout=ack_timeout,                                              fail_on_error=False) +          for i, response in enumerate(reply):              error_cls = None              if isinstance(response, FailedPayloadsError): @@ -156,7 +158,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,              elif isinstance(response, ProduceResponse) and response.error:                  error_cls = kafka_errors.get(response.error, UnknownError) -                orig_req = request_tries.keys()[i] +                orig_req = requests[i]              if error_cls:                  _handle_error(error_cls, orig_req)  | 
