summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-03-07 00:59:26 +0200
committerDana Powers <dana.powers@gmail.com>2017-03-06 14:59:26 -0800
commit9c19ea7cbe163b0c434ce9dd9c8c42471027cce5 (patch)
tree8802dfc07e053279d18be591af11a1a4edf4988c /kafka/conn.py
parentff6f7bf085b912090b436da1c99f6f8f4cf66f94 (diff)
downloadkafka-python-9c19ea7cbe163b0c434ce9dd9c8c42471027cce5.tar.gz
Added `max_bytes` option and FetchRequest_v3 usage. (#962)
* Added `max_bytes` option and FetchRequest_v3 usage. * Add checks for versions above 0.10 based on ApiVersionResponse
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py25
1 files changed, 24 insertions, 1 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index d88e97c..2f28ed7 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -18,6 +18,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse
+from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Int32
from kafka.version import __version__
@@ -760,6 +761,24 @@ class BrokerConnection(object):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id
+ def _check_version_above_0_10(self, response):
+ test_cases = [
+ # format (<broker verion>, <needed struct>)
+ ((0, 10, 1), MetadataRequest[2])
+ ]
+
+ error_type = Errors.for_code(response.error_code)
+ assert error_type is Errors.NoError, "API version check failed"
+ max_versions = dict([
+ (api_key, max_version)
+ for api_key, _, max_version in response.api_versions
+ ])
+ # Get the best match of test cases
+ for broker_version, struct in test_cases:
+ if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
+ return broker_version
+ return (0, 10, 0)
+
def check_version(self, timeout=2, strict=False):
"""Attempt to guess the broker version.
@@ -784,7 +803,6 @@ class BrokerConnection(object):
# socket.error (32, 54, or 104)
from .protocol.admin import ApiVersionRequest, ListGroupsRequest
from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
- from .protocol.metadata import MetadataRequest
# Socket errors are logged as exceptions and can alarm users. Mute them
from logging import Filter
@@ -798,6 +816,7 @@ class BrokerConnection(object):
log.addFilter(log_filter)
test_cases = [
+ # All cases starting from 0.10 will be based on ApiVersionResponse
((0, 10), ApiVersionRequest[0]()),
((0, 9), ListGroupsRequest[0]()),
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
@@ -838,6 +857,10 @@ class BrokerConnection(object):
self._sock.setblocking(False)
if f.succeeded():
+ if version == (0, 10):
+ # Starting from 0.10 kafka broker we determine version
+ # by looking at ApiVersionResponse
+ version = self._check_version_above_0_10(f.value)
log.info('Broker version identifed as %s', '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)