summaryrefslogtreecommitdiff
path: root/kafka/producer/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r--kafka/producer/base.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index e0c086b..498539d 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -144,10 +144,12 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
retry_state['do_refresh'] |= True
- reply = client.send_produce_request(request_tries.keys(),
+ requests = list(request_tries.keys())
+ reply = client.send_produce_request(requests,
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
+
for i, response in enumerate(reply):
error_cls = None
if isinstance(response, FailedPayloadsError):
@@ -156,7 +158,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
elif isinstance(response, ProduceResponse) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
- orig_req = request_tries.keys()[i]
+ orig_req = requests[i]
if error_cls:
_handle_error(error_cls, orig_req)