summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py43
1 files changed, 36 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 73daa36..2782057 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -40,6 +40,7 @@ class Fetcher(six.Iterator):
'value_deserializer': None,
'fetch_min_bytes': 1,
'fetch_max_wait_ms': 500,
+ 'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1048576,
'max_poll_records': sys.maxsize,
'check_crcs': True,
@@ -64,6 +65,15 @@ class Fetcher(six.Iterator):
the server will block before answering the fetch request if
there isn't sufficient data to immediately satisfy the
requirement given by fetch_min_bytes. Default: 500.
+ fetch_max_bytes (int): The maximum amount of data the server should
+ return for a fetch request. This is not an absolute maximum, if
+ the first message in the first non-empty partition of the fetch
+ is larger than this value, the message will still be returned
+ to ensure that the consumer can make progress. NOTE: consumer
+ performs fetches to multiple brokers in parallel so memory
+ usage will depend on the number of brokers containing
+ partitions for the topic.
+ Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb).
max_partition_fetch_bytes (int): The maximum amount of data
per-partition the server will return. The maximum total memory
used for a request = #partitions * max_partition_fetch_bytes.
@@ -617,7 +627,7 @@ class Fetcher(six.Iterator):
log.debug("Fetched offset %d for partition %s", offset, partition)
future.success(offset)
elif error_type in (Errors.NotLeaderForPartitionError,
- Errors.UnknownTopicOrPartitionError):
+ Errors.UnknownTopicOrPartitionError):
log.debug("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.",
partition)
@@ -664,7 +674,9 @@ class Fetcher(six.Iterator):
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)
- if self.config['api_version'] >= (0, 10):
+ if self.config['api_version'] >= (0, 10, 1):
+ version = 3
+ elif self.config['api_version'] >= (0, 10):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
@@ -672,11 +684,28 @@ class Fetcher(six.Iterator):
version = 0
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
- requests[node_id] = FetchRequest[version](
- -1, # replica_id
- self.config['fetch_max_wait_ms'],
- self.config['fetch_min_bytes'],
- partition_data.items())
+ if version < 3:
+ requests[node_id] = FetchRequest[version](
+ -1, # replica_id
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
+ partition_data.items())
+ else:
+ # As of version == 3 partitions will be returned in order as
+ # they are requested, so to avoid starvation with
+ # `fetch_max_bytes` option we need this shuffle
+ # NOTE: we do have partition_data in random order due to usage
+ # of unordered structures like dicts, but that does not
+ # guaranty equal distribution, and starting Python3.6
+ # dicts retain insert order.
+ partition_data = list(partition_data.items())
+ random.shuffle(partition_data)
+ requests[node_id] = FetchRequest[version](
+ -1, # replica_id
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
+ self.config['fetch_max_bytes'],
+ partition_data)
return requests
def _handle_fetch_response(self, request, send_time, response):