summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-09 11:29:14 -0700
committerDana Powers <dana.powers@rd.io>2015-06-09 11:29:14 -0700
commit63ecdbd32edc5e0cc989722d9cdd02d384beffb0 (patch)
tree96e706b8a832134584ff861ca50d84d37e14c39e
parentc75b84eb7a7e81947e4d785dc871fee05e350476 (diff)
downloadkafka-python-63ecdbd32edc5e0cc989722d9cdd02d384beffb0.tar.gz
Use a list, not request_tries.keys(), to track requests / responses in async producer
-rw-r--r--kafka/producer/base.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index e0c086b..498539d 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -144,10 +144,12 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
retry_state['do_refresh'] |= True
- reply = client.send_produce_request(request_tries.keys(),
+ requests = list(request_tries.keys())
+ reply = client.send_produce_request(requests,
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
+
for i, response in enumerate(reply):
error_cls = None
if isinstance(response, FailedPayloadsError):
@@ -156,7 +158,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
elif isinstance(response, ProduceResponse) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
- orig_req = request_tries.keys()[i]
+ orig_req = requests[i]
if error_cls:
_handle_error(error_cls, orig_req)