diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-03-07 00:59:26 +0200 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-03-06 14:59:26 -0800 |
commit | 9c19ea7cbe163b0c434ce9dd9c8c42471027cce5 (patch) | |
tree | 8802dfc07e053279d18be591af11a1a4edf4988c /kafka/consumer/fetcher.py | |
parent | ff6f7bf085b912090b436da1c99f6f8f4cf66f94 (diff) | |
download | kafka-python-9c19ea7cbe163b0c434ce9dd9c8c42471027cce5.tar.gz |
Added `max_bytes` option and FetchRequest_v3 usage. (#962)
* Added `max_bytes` option and FetchRequest_v3 usage.
* Add checks for versions above 0.10 based on ApiVersionResponse
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 43 |
1 files changed, 36 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 73daa36..2782057 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -40,6 +40,7 @@ class Fetcher(six.Iterator): 'value_deserializer': None, 'fetch_min_bytes': 1, 'fetch_max_wait_ms': 500, + 'fetch_max_bytes': 52428800, 'max_partition_fetch_bytes': 1048576, 'max_poll_records': sys.maxsize, 'check_crcs': True, @@ -64,6 +65,15 @@ class Fetcher(six.Iterator): the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch_min_bytes. Default: 500. + fetch_max_bytes (int): The maximum amount of data the server should + return for a fetch request. This is not an absolute maximum, if + the first message in the first non-empty partition of the fetch + is larger than this value, the message will still be returned + to ensure that the consumer can make progress. NOTE: consumer + performs fetches to multiple brokers in parallel so memory + usage will depend on the number of brokers containing + partitions for the topic. + Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb). max_partition_fetch_bytes (int): The maximum amount of data per-partition the server will return. The maximum total memory used for a request = #partitions * max_partition_fetch_bytes. @@ -617,7 +627,7 @@ class Fetcher(six.Iterator): log.debug("Fetched offset %d for partition %s", offset, partition) future.success(offset) elif error_type in (Errors.NotLeaderForPartitionError, - Errors.UnknownTopicOrPartitionError): + Errors.UnknownTopicOrPartitionError): log.debug("Attempt to fetch offsets for partition %s failed due" " to obsolete leadership information, retrying.", partition) @@ -664,7 +674,9 @@ class Fetcher(six.Iterator): log.debug("Adding fetch request for partition %s at offset %d", partition, position) - if self.config['api_version'] >= (0, 10): + if self.config['api_version'] >= (0, 10, 1): + version = 3 + elif self.config['api_version'] >= (0, 10): version = 2 elif self.config['api_version'] == (0, 9): version = 1 @@ -672,11 +684,28 @@ class Fetcher(six.Iterator): version = 0 requests = {} for node_id, partition_data in six.iteritems(fetchable): - requests[node_id] = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - partition_data.items()) + if version < 3: + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + partition_data.items()) + else: + # As of version == 3 partitions will be returned in order as + # they are requested, so to avoid starvation with + # `fetch_max_bytes` option we need this shuffle + # NOTE: we do have partition_data in random order due to usage + # of unordered structures like dicts, but that does not + # guaranty equal distribution, and starting Python3.6 + # dicts retain insert order. + partition_data = list(partition_data.items()) + random.shuffle(partition_data) + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + self.config['fetch_max_bytes'], + partition_data) return requests def _handle_fetch_response(self, request, send_time, response): |