diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-03-07 00:59:26 +0200 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-03-06 14:59:26 -0800 |
commit | 9c19ea7cbe163b0c434ce9dd9c8c42471027cce5 (patch) | |
tree | 8802dfc07e053279d18be591af11a1a4edf4988c | |
parent | ff6f7bf085b912090b436da1c99f6f8f4cf66f94 (diff) | |
download | kafka-python-9c19ea7cbe163b0c434ce9dd9c8c42471027cce5.tar.gz |
Added `max_bytes` option and FetchRequest_v3 usage. (#962)
* Added `max_bytes` option and FetchRequest_v3 usage.
* Add checks for versions above 0.10 based on ApiVersionResponse
-rw-r--r-- | kafka/client_async.py | 2 | ||||
-rw-r--r-- | kafka/conn.py | 25 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 43 | ||||
-rw-r--r-- | kafka/consumer/group.py | 9 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 46 | ||||
-rw-r--r-- | test/test_fetcher.py | 3 |
6 files changed, 119 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 85de90a..2913b43 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -156,6 +156,8 @@ class KafkaClient(object): 'sasl_plain_password': None, } API_VERSIONS = [ + (0, 10, 1), + (0, 10, 0), (0, 10), (0, 9), (0, 8, 2), diff --git a/kafka/conn.py b/kafka/conn.py index d88e97c..2f28ed7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -18,6 +18,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.api import RequestHeader from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import GroupCoordinatorResponse +from kafka.protocol.metadata import MetadataRequest from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -760,6 +761,24 @@ class BrokerConnection(object): self._correlation_id = (self._correlation_id + 1) % 2**31 return self._correlation_id + def _check_version_above_0_10(self, response): + test_cases = [ + # format (<broker verion>, <needed struct>) + ((0, 10, 1), MetadataRequest[2]) + ] + + error_type = Errors.for_code(response.error_code) + assert error_type is Errors.NoError, "API version check failed" + max_versions = dict([ + (api_key, max_version) + for api_key, _, max_version in response.api_versions + ]) + # Get the best match of test cases + for broker_version, struct in test_cases: + if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION: + return broker_version + return (0, 10, 0) + def check_version(self, timeout=2, strict=False): """Attempt to guess the broker version. @@ -784,7 +803,6 @@ class BrokerConnection(object): # socket.error (32, 54, or 104) from .protocol.admin import ApiVersionRequest, ListGroupsRequest from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest - from .protocol.metadata import MetadataRequest # Socket errors are logged as exceptions and can alarm users. Mute them from logging import Filter @@ -798,6 +816,7 @@ class BrokerConnection(object): log.addFilter(log_filter) test_cases = [ + # All cases starting from 0.10 will be based on ApiVersionResponse ((0, 10), ApiVersionRequest[0]()), ((0, 9), ListGroupsRequest[0]()), ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')), @@ -838,6 +857,10 @@ class BrokerConnection(object): self._sock.setblocking(False) if f.succeeded(): + if version == (0, 10): + # Starting from 0.10 kafka broker we determine version + # by looking at ApiVersionResponse + version = self._check_version_above_0_10(f.value) log.info('Broker version identifed as %s', '.'.join(map(str, version))) log.info('Set configuration api_version=%s to skip auto' ' check_version requests on startup', version) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 73daa36..2782057 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -40,6 +40,7 @@ class Fetcher(six.Iterator): 'value_deserializer': None, 'fetch_min_bytes': 1, 'fetch_max_wait_ms': 500, + 'fetch_max_bytes': 52428800, 'max_partition_fetch_bytes': 1048576, 'max_poll_records': sys.maxsize, 'check_crcs': True, @@ -64,6 +65,15 @@ class Fetcher(six.Iterator): the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch_min_bytes. Default: 500. + fetch_max_bytes (int): The maximum amount of data the server should + return for a fetch request. This is not an absolute maximum, if + the first message in the first non-empty partition of the fetch + is larger than this value, the message will still be returned + to ensure that the consumer can make progress. NOTE: consumer + performs fetches to multiple brokers in parallel so memory + usage will depend on the number of brokers containing + partitions for the topic. + Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb). max_partition_fetch_bytes (int): The maximum amount of data per-partition the server will return. The maximum total memory used for a request = #partitions * max_partition_fetch_bytes. @@ -617,7 +627,7 @@ class Fetcher(six.Iterator): log.debug("Fetched offset %d for partition %s", offset, partition) future.success(offset) elif error_type in (Errors.NotLeaderForPartitionError, - Errors.UnknownTopicOrPartitionError): + Errors.UnknownTopicOrPartitionError): log.debug("Attempt to fetch offsets for partition %s failed due" " to obsolete leadership information, retrying.", partition) @@ -664,7 +674,9 @@ class Fetcher(six.Iterator): log.debug("Adding fetch request for partition %s at offset %d", partition, position) - if self.config['api_version'] >= (0, 10): + if self.config['api_version'] >= (0, 10, 1): + version = 3 + elif self.config['api_version'] >= (0, 10): version = 2 elif self.config['api_version'] == (0, 9): version = 1 @@ -672,11 +684,28 @@ class Fetcher(six.Iterator): version = 0 requests = {} for node_id, partition_data in six.iteritems(fetchable): - requests[node_id] = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - partition_data.items()) + if version < 3: + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + partition_data.items()) + else: + # As of version == 3 partitions will be returned in order as + # they are requested, so to avoid starvation with + # `fetch_max_bytes` option we need this shuffle + # NOTE: we do have partition_data in random order due to usage + # of unordered structures like dicts, but that does not + # guaranty equal distribution, and starting Python3.6 + # dicts retain insert order. + partition_data = list(partition_data.items()) + random.shuffle(partition_data) + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + self.config['fetch_max_bytes'], + partition_data) return requests def _handle_fetch_response(self, request, send_time, response): diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 89c946f..1addcc2 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -65,6 +65,14 @@ class KafkaConsumer(six.Iterator): the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch_min_bytes. Default: 500. + fetch_max_bytes (int): The maximum amount of data the server should + return for a fetch request. This is not an absolute maximum, if the + first message in the first non-empty partition of the fetch is + larger than this value, the message will still be returned to + ensure that the consumer can make progress. NOTE: consumer performs + fetches to multiple brokers in parallel so memory usage will depend + on the number of brokers containing partitions for the topic. + Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb). max_partition_fetch_bytes (int): The maximum amount of data per-partition the server will return. The maximum total memory used for a request = #partitions * max_partition_fetch_bytes. @@ -212,6 +220,7 @@ class KafkaConsumer(six.Iterator): 'value_deserializer': None, 'fetch_max_wait_ms': 500, 'fetch_min_bytes': 1, + 'fetch_max_bytes': 52428800, 'max_partition_fetch_bytes': 1 * 1024 * 1024, 'request_timeout_ms': 40 * 1000, 'retry_backoff_ms': 100, diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 998045f..9473691 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -2,6 +2,7 @@ import logging import os from six.moves import xrange +import six from . import unittest from kafka import ( @@ -572,3 +573,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): output_msgs2.append(m) self.assert_message_count(output_msgs2, 20) self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_max_bytes_simple(self): + self.send_messages(0, range(100, 200)) + self.send_messages(1, range(200, 300)) + + # Start a consumer + consumer = self.kafka_consumer( + auto_offset_reset='earliest', fetch_max_bytes=300) + fetched_size = 0 + seen_partitions = set([]) + for i in range(10): + poll_res = consumer.poll(timeout_ms=100) + for partition, msgs in six.iteritems(poll_res): + for msg in msgs: + fetched_size += len(msg.value) + seen_partitions.add(partition) + + # Check that we fetched at least 1 message from both partitions + self.assertEqual( + seen_partitions, set([ + TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)])) + self.assertLess(fetched_size, 3000) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_max_bytes_one_msg(self): + # We send to only 1 partition so we don't have parallel requests to 2 + # nodes for data. + self.send_messages(0, range(100, 200)) + + # Start a consumer. FetchResponse_v3 should always include at least 1 + # full msg, so by setting fetch_max_bytes=1 we must get 1 msg at a time + consumer = self.kafka_consumer( + auto_offset_reset='earliest', fetch_max_bytes=1) + fetched_msgs = [] + # A bit hacky, but we need this in order for message count to be exact + consumer._coordinator.ensure_active_group() + for i in range(10): + poll_res = consumer.poll(timeout_ms=2000) + print(poll_res) + for partition, msgs in six.iteritems(poll_res): + for msg in msgs: + fetched_msgs.append(msg) + + self.assertEqual(len(fetched_msgs), 10) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 984de88..dcfba78 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -58,7 +58,8 @@ def test_send_fetches(fetcher, mocker): @pytest.mark.parametrize(("api_version", "fetch_version"), [ - ((0, 10), 2), + ((0, 10, 1), 3), + ((0, 10, 0), 2), ((0, 9), 1), ((0, 8), 0) ]) |