diff options
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r-- | kafka/producer/base.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index e0c086b..498539d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -144,10 +144,12 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): retry_state['do_refresh'] |= True - reply = client.send_produce_request(request_tries.keys(), + requests = list(request_tries.keys()) + reply = client.send_produce_request(requests, acks=req_acks, timeout=ack_timeout, fail_on_error=False) + for i, response in enumerate(reply): error_cls = None if isinstance(response, FailedPayloadsError): @@ -156,7 +158,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, elif isinstance(response, ProduceResponse) and response.error: error_cls = kafka_errors.get(response.error, UnknownError) - orig_req = request_tries.keys()[i] + orig_req = requests[i] if error_cls: _handle_error(error_cls, orig_req) |