summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py2
-rw-r--r--kafka/conn.py25
-rw-r--r--kafka/consumer/fetcher.py43
-rw-r--r--kafka/consumer/group.py9
-rw-r--r--test/test_consumer_integration.py46
-rw-r--r--test/test_fetcher.py3
6 files changed, 119 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 85de90a..2913b43 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -156,6 +156,8 @@ class KafkaClient(object):
'sasl_plain_password': None,
}
API_VERSIONS = [
+ (0, 10, 1),
+ (0, 10, 0),
(0, 10),
(0, 9),
(0, 8, 2),
diff --git a/kafka/conn.py b/kafka/conn.py
index d88e97c..2f28ed7 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -18,6 +18,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse
+from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Int32
from kafka.version import __version__
@@ -760,6 +761,24 @@ class BrokerConnection(object):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id
+ def _check_version_above_0_10(self, response):
+ test_cases = [
+ # format (<broker verion>, <needed struct>)
+ ((0, 10, 1), MetadataRequest[2])
+ ]
+
+ error_type = Errors.for_code(response.error_code)
+ assert error_type is Errors.NoError, "API version check failed"
+ max_versions = dict([
+ (api_key, max_version)
+ for api_key, _, max_version in response.api_versions
+ ])
+ # Get the best match of test cases
+ for broker_version, struct in test_cases:
+ if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
+ return broker_version
+ return (0, 10, 0)
+
def check_version(self, timeout=2, strict=False):
"""Attempt to guess the broker version.
@@ -784,7 +803,6 @@ class BrokerConnection(object):
# socket.error (32, 54, or 104)
from .protocol.admin import ApiVersionRequest, ListGroupsRequest
from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
- from .protocol.metadata import MetadataRequest
# Socket errors are logged as exceptions and can alarm users. Mute them
from logging import Filter
@@ -798,6 +816,7 @@ class BrokerConnection(object):
log.addFilter(log_filter)
test_cases = [
+ # All cases starting from 0.10 will be based on ApiVersionResponse
((0, 10), ApiVersionRequest[0]()),
((0, 9), ListGroupsRequest[0]()),
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
@@ -838,6 +857,10 @@ class BrokerConnection(object):
self._sock.setblocking(False)
if f.succeeded():
+ if version == (0, 10):
+ # Starting from 0.10 kafka broker we determine version
+ # by looking at ApiVersionResponse
+ version = self._check_version_above_0_10(f.value)
log.info('Broker version identifed as %s', '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 73daa36..2782057 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -40,6 +40,7 @@ class Fetcher(six.Iterator):
'value_deserializer': None,
'fetch_min_bytes': 1,
'fetch_max_wait_ms': 500,
+ 'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1048576,
'max_poll_records': sys.maxsize,
'check_crcs': True,
@@ -64,6 +65,15 @@ class Fetcher(six.Iterator):
the server will block before answering the fetch request if
there isn't sufficient data to immediately satisfy the
requirement given by fetch_min_bytes. Default: 500.
+ fetch_max_bytes (int): The maximum amount of data the server should
+ return for a fetch request. This is not an absolute maximum, if
+ the first message in the first non-empty partition of the fetch
+ is larger than this value, the message will still be returned
+ to ensure that the consumer can make progress. NOTE: consumer
+ performs fetches to multiple brokers in parallel so memory
+ usage will depend on the number of brokers containing
+ partitions for the topic.
+ Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb).
max_partition_fetch_bytes (int): The maximum amount of data
per-partition the server will return. The maximum total memory
used for a request = #partitions * max_partition_fetch_bytes.
@@ -617,7 +627,7 @@ class Fetcher(six.Iterator):
log.debug("Fetched offset %d for partition %s", offset, partition)
future.success(offset)
elif error_type in (Errors.NotLeaderForPartitionError,
- Errors.UnknownTopicOrPartitionError):
+ Errors.UnknownTopicOrPartitionError):
log.debug("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.",
partition)
@@ -664,7 +674,9 @@ class Fetcher(six.Iterator):
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)
- if self.config['api_version'] >= (0, 10):
+ if self.config['api_version'] >= (0, 10, 1):
+ version = 3
+ elif self.config['api_version'] >= (0, 10):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
@@ -672,11 +684,28 @@ class Fetcher(six.Iterator):
version = 0
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
- requests[node_id] = FetchRequest[version](
- -1, # replica_id
- self.config['fetch_max_wait_ms'],
- self.config['fetch_min_bytes'],
- partition_data.items())
+ if version < 3:
+ requests[node_id] = FetchRequest[version](
+ -1, # replica_id
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
+ partition_data.items())
+ else:
+ # As of version == 3 partitions will be returned in order as
+ # they are requested, so to avoid starvation with
+ # `fetch_max_bytes` option we need this shuffle
+ # NOTE: we do have partition_data in random order due to usage
+ # of unordered structures like dicts, but that does not
+ # guaranty equal distribution, and starting Python3.6
+ # dicts retain insert order.
+ partition_data = list(partition_data.items())
+ random.shuffle(partition_data)
+ requests[node_id] = FetchRequest[version](
+ -1, # replica_id
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
+ self.config['fetch_max_bytes'],
+ partition_data)
return requests
def _handle_fetch_response(self, request, send_time, response):
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 89c946f..1addcc2 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -65,6 +65,14 @@ class KafkaConsumer(six.Iterator):
the server will block before answering the fetch request if
there isn't sufficient data to immediately satisfy the
requirement given by fetch_min_bytes. Default: 500.
+ fetch_max_bytes (int): The maximum amount of data the server should
+ return for a fetch request. This is not an absolute maximum, if the
+ first message in the first non-empty partition of the fetch is
+ larger than this value, the message will still be returned to
+ ensure that the consumer can make progress. NOTE: consumer performs
+ fetches to multiple brokers in parallel so memory usage will depend
+ on the number of brokers containing partitions for the topic.
+ Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb).
max_partition_fetch_bytes (int): The maximum amount of data
per-partition the server will return. The maximum total memory
used for a request = #partitions * max_partition_fetch_bytes.
@@ -212,6 +220,7 @@ class KafkaConsumer(six.Iterator):
'value_deserializer': None,
'fetch_max_wait_ms': 500,
'fetch_min_bytes': 1,
+ 'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1 * 1024 * 1024,
'request_timeout_ms': 40 * 1000,
'retry_backoff_ms': 100,
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 998045f..9473691 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -2,6 +2,7 @@ import logging
import os
from six.moves import xrange
+import six
from . import unittest
from kafka import (
@@ -572,3 +573,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20)
self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)
+
+ @kafka_versions('>=0.10.1')
+ def test_kafka_consumer_max_bytes_simple(self):
+ self.send_messages(0, range(100, 200))
+ self.send_messages(1, range(200, 300))
+
+ # Start a consumer
+ consumer = self.kafka_consumer(
+ auto_offset_reset='earliest', fetch_max_bytes=300)
+ fetched_size = 0
+ seen_partitions = set([])
+ for i in range(10):
+ poll_res = consumer.poll(timeout_ms=100)
+ for partition, msgs in six.iteritems(poll_res):
+ for msg in msgs:
+ fetched_size += len(msg.value)
+ seen_partitions.add(partition)
+
+ # Check that we fetched at least 1 message from both partitions
+ self.assertEqual(
+ seen_partitions, set([
+ TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
+ self.assertLess(fetched_size, 3000)
+
+ @kafka_versions('>=0.10.1')
+ def test_kafka_consumer_max_bytes_one_msg(self):
+ # We send to only 1 partition so we don't have parallel requests to 2
+ # nodes for data.
+ self.send_messages(0, range(100, 200))
+
+ # Start a consumer. FetchResponse_v3 should always include at least 1
+ # full msg, so by setting fetch_max_bytes=1 we must get 1 msg at a time
+ consumer = self.kafka_consumer(
+ auto_offset_reset='earliest', fetch_max_bytes=1)
+ fetched_msgs = []
+ # A bit hacky, but we need this in order for message count to be exact
+ consumer._coordinator.ensure_active_group()
+ for i in range(10):
+ poll_res = consumer.poll(timeout_ms=2000)
+ print(poll_res)
+ for partition, msgs in six.iteritems(poll_res):
+ for msg in msgs:
+ fetched_msgs.append(msg)
+
+ self.assertEqual(len(fetched_msgs), 10)
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 984de88..dcfba78 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -58,7 +58,8 @@ def test_send_fetches(fetcher, mocker):
@pytest.mark.parametrize(("api_version", "fetch_version"), [
- ((0, 10), 2),
+ ((0, 10, 1), 3),
+ ((0, 10, 0), 2),
((0, 9), 1),
((0, 8), 0)
])