diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-09 11:29:14 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-09 11:29:14 -0700 |
commit | 63ecdbd32edc5e0cc989722d9cdd02d384beffb0 (patch) | |
tree | 96e706b8a832134584ff861ca50d84d37e14c39e | |
parent | c75b84eb7a7e81947e4d785dc871fee05e350476 (diff) | |
download | kafka-python-63ecdbd32edc5e0cc989722d9cdd02d384beffb0.tar.gz |
Use a list, not request_tries.keys(), to track requests / responses in async producer
-rw-r--r-- | kafka/producer/base.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index e0c086b..498539d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -144,10 +144,12 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): retry_state['do_refresh'] |= True - reply = client.send_produce_request(request_tries.keys(), + requests = list(request_tries.keys()) + reply = client.send_produce_request(requests, acks=req_acks, timeout=ack_timeout, fail_on_error=False) + for i, response in enumerate(reply): error_cls = None if isinstance(response, FailedPayloadsError): @@ -156,7 +158,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, elif isinstance(response, ProduceResponse) and response.error: error_cls = kafka_errors.get(response.error, UnknownError) - orig_req = request_tries.keys()[i] + orig_req = requests[i] if error_cls: _handle_error(error_cls, orig_req) |