diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-06 16:09:30 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-06 19:27:56 -0700 |
commit | 9712f613c9e7e4b0436f501b513249eab4edc4e9 (patch) | |
tree | 89fb8c87d0797f825f48b17c54d4bffab3565711 /kafka | |
parent | 794ab5bba4807888839c2030d9b97422bddc3cc9 (diff) | |
download | kafka-python-9712f613c9e7e4b0436f501b513249eab4edc4e9.tar.gz |
PR 331 fixup: do not attempt to get new messages if there are pending retries
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/producer/base.py | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 15768be..2f47d87 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -52,13 +52,18 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, while not stop_event.is_set(): timeout = batch_time - - # it's a simplification: we're comparing message sets and - # messages: each set can contain [1..batch_size] messages - count = batch_size - len(request_tries) + count = batch_size send_at = time.time() + timeout msgset = defaultdict(list) + # Merging messages will require a bit more work to manage correctly + # for now, dont look for new batches if we have old ones to retry + if request_tries: + count = 0 + log.debug('Skipping new batch collection to handle retries') + else: + log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout)) + # Keep fetching till we gather enough messages or a # timeout is reached while count > 0 and timeout >= 0: |