summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-06 16:09:30 -0700
committerDana Powers <dana.powers@rd.io>2015-06-06 19:27:56 -0700
commit9712f613c9e7e4b0436f501b513249eab4edc4e9 (patch)
tree89fb8c87d0797f825f48b17c54d4bffab3565711 /kafka
parent794ab5bba4807888839c2030d9b97422bddc3cc9 (diff)
downloadkafka-python-9712f613c9e7e4b0436f501b513249eab4edc4e9.tar.gz
PR 331 fixup: do not attempt to get new messages if there are pending retries
Diffstat (limited to 'kafka')
-rw-r--r--kafka/producer/base.py13
1 files changed, 9 insertions, 4 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 15768be..2f47d87 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -52,13 +52,18 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
while not stop_event.is_set():
timeout = batch_time
-
- # it's a simplification: we're comparing message sets and
- # messages: each set can contain [1..batch_size] messages
- count = batch_size - len(request_tries)
+ count = batch_size
send_at = time.time() + timeout
msgset = defaultdict(list)
+ # Merging messages will require a bit more work to manage correctly
+ # for now, dont look for new batches if we have old ones to retry
+ if request_tries:
+ count = 0
+ log.debug('Skipping new batch collection to handle retries')
+ else:
+ log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout))
+
# Keep fetching till we gather enough messages or a
# timeout is reached
while count > 0 and timeout >= 0: