summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-05 12:44:39 -0700
committerDana Powers <dana.powers@rd.io>2015-06-06 19:27:55 -0700
commit794ab5bba4807888839c2030d9b97422bddc3cc9 (patch)
tree93fc76c8b0c9dddf14a052329fe96dbb5fb831e0
parentd96a9b732ececb2f319e9e37ad4e040b366ce80b (diff)
downloadkafka-python-794ab5bba4807888839c2030d9b97422bddc3cc9.tar.gz
PR 331 fixup: Rename reqs dict to request_tries
-rw-r--r--kafka/producer/base.py22
1 files changed, 13 insertions, 9 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 0bb0c81..15768be 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -47,7 +47,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
a specified timeout and send them upstream to the brokers in one
request
"""
- reqs = {}
+ request_tries = {}
client.reinit()
while not stop_event.is_set():
@@ -55,7 +55,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# it's a simplification: we're comparing message sets and
# messages: each set can contain [1..batch_size] messages
- count = batch_size - len(reqs)
+ count = batch_size - len(request_tries)
send_at = time.time() + timeout
msgset = defaultdict(list)
@@ -83,9 +83,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
tuple(messages))
- reqs[req] = 0
+ request_tries[req] = 0
- if not reqs:
+ if not request_tries:
continue
reqs_to_retry, error_cls = [], None
@@ -120,7 +120,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
_handle_error(error_cls, orig_req)
if not reqs_to_retry:
- reqs = {}
+ request_tries = {}
continue
# doing backoff before next retry
@@ -133,10 +133,14 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
log.warn('Async producer forcing metadata refresh metadata before retrying')
client.load_metadata_for_topics()
- reqs = dict((key, count + 1)
- for (key, count) in reqs.items()
- if key in reqs_to_retry
- and (retry_options.limit is None or (count < retry_options.limit)))
+ # Apply retry limit, dropping messages that are over
+ request_tries = dict(
+ (key, count + 1)
+ for (key, count) in request_tries.items()
+ if key in reqs_to_retry
+ and (retry_options.limit is None
+ or (count < retry_options.limit))
+ )
class Producer(object):