summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-12 14:41:22 -0800
committerDana Powers <dana.powers@rd.io>2016-01-12 14:41:22 -0800
commitcc3e1cc9a17de52a3ab7955548b8bae945777a97 (patch)
treeef3f0f2a6f3af952f4e7866a26a05c615fa36f80
parente58b447b8e9a7eaa307244b7a315c19ac00381a0 (diff)
downloadkafka-python-cc3e1cc9a17de52a3ab7955548b8bae945777a97.tar.gz
Attempt to pipeline fetchrequests in iterator
-rw-r--r--kafka/consumer/fetcher.py3
1 files changed, 2 insertions, 1 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 6cafb65..f116bed 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -36,6 +36,7 @@ class Fetcher(six.Iterator):
'fetch_max_wait_ms': 500,
'max_partition_fetch_bytes': 1048576,
'check_crcs': True,
+ 'iterator_refetch_records': 1, # undocumented -- interface may change
}
def __init__(self, client, subscriptions, **configs):
@@ -369,7 +370,7 @@ class Fetcher(six.Iterator):
# Send additional FetchRequests when the internal queue is low
# this should enable moderate pipelining
- if len(self._records) == 1:
+ if len(self._records) <= self.config['iterator_refetch_records']:
self._init_fetches()
(fetch_offset, tp, messages) = self._records.popleft()