diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-12 14:41:22 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-12 14:41:22 -0800 |
commit | cc3e1cc9a17de52a3ab7955548b8bae945777a97 (patch) | |
tree | ef3f0f2a6f3af952f4e7866a26a05c615fa36f80 | |
parent | e58b447b8e9a7eaa307244b7a315c19ac00381a0 (diff) | |
download | kafka-python-cc3e1cc9a17de52a3ab7955548b8bae945777a97.tar.gz |
Attempt to pipeline fetchrequests in iterator
-rw-r--r-- | kafka/consumer/fetcher.py | 3 |
1 files changed, 2 insertions, 1 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 6cafb65..f116bed 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -36,6 +36,7 @@ class Fetcher(six.Iterator): 'fetch_max_wait_ms': 500, 'max_partition_fetch_bytes': 1048576, 'check_crcs': True, + 'iterator_refetch_records': 1, # undocumented -- interface may change } def __init__(self, client, subscriptions, **configs): @@ -369,7 +370,7 @@ class Fetcher(six.Iterator): # Send additional FetchRequests when the internal queue is low # this should enable moderate pipelining - if len(self._records) == 1: + if len(self._records) <= self.config['iterator_refetch_records']: self._init_fetches() (fetch_offset, tp, messages) = self._records.popleft() |