diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-05 12:44:39 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-06 19:27:55 -0700 |
commit | 794ab5bba4807888839c2030d9b97422bddc3cc9 (patch) | |
tree | 93fc76c8b0c9dddf14a052329fe96dbb5fb831e0 /kafka/producer/base.py | |
parent | d96a9b732ececb2f319e9e37ad4e040b366ce80b (diff) | |
download | kafka-python-794ab5bba4807888839c2030d9b97422bddc3cc9.tar.gz |
PR 331 fixup: Rename reqs dict to request_tries
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r-- | kafka/producer/base.py | 22 |
1 files changed, 13 insertions, 9 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 0bb0c81..15768be 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -47,7 +47,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, a specified timeout and send them upstream to the brokers in one request """ - reqs = {} + request_tries = {} client.reinit() while not stop_event.is_set(): @@ -55,7 +55,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # it's a simplification: we're comparing message sets and # messages: each set can contain [1..batch_size] messages - count = batch_size - len(reqs) + count = batch_size - len(request_tries) send_at = time.time() + timeout msgset = defaultdict(list) @@ -83,9 +83,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, req = ProduceRequest(topic_partition.topic, topic_partition.partition, tuple(messages)) - reqs[req] = 0 + request_tries[req] = 0 - if not reqs: + if not request_tries: continue reqs_to_retry, error_cls = [], None @@ -120,7 +120,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, _handle_error(error_cls, orig_req) if not reqs_to_retry: - reqs = {} + request_tries = {} continue # doing backoff before next retry @@ -133,10 +133,14 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, log.warn('Async producer forcing metadata refresh metadata before retrying') client.load_metadata_for_topics() - reqs = dict((key, count + 1) - for (key, count) in reqs.items() - if key in reqs_to_retry - and (retry_options.limit is None or (count < retry_options.limit))) + # Apply retry limit, dropping messages that are over + request_tries = dict( + (key, count + 1) + for (key, count) in request_tries.items() + if key in reqs_to_retry + and (retry_options.limit is None + or (count < retry_options.limit)) + ) class Producer(object): |