summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 10:29:37 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-16 12:07:26 -0700
commit2c4b9cdf2cc18ccd40d2489e74766c6b702c2725 (patch)
tree72be396969e5f04509787f496219c0c8b298336b /kafka/client_async.py
parent061cb4e83469166873912fca2aac62ca8376377f (diff)
downloadkafka-python-client_api_version.tar.gz
Add api_version config to KafkaClient, deprecate str in favor of tuplesclient_api_version
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py30
1 files changed, 28 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 8839dee..6fa9434 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import copy
import functools
@@ -61,7 +61,16 @@ class KafkaClient(object):
'ssl_keyfile': None,
'ssl_password': None,
'ssl_crlfile': None,
+ 'api_version': None,
+ 'api_version_auto_timeout_ms': 2000,
}
+ API_VERSIONS = [
+ (0, 10),
+ (0, 9),
+ (0, 8, 2),
+ (0, 8, 1),
+ (0, 8, 0)
+ ]
def __init__(self, **configs):
"""Initialize an asynchronous kafka client
@@ -118,12 +127,24 @@ class KafkaClient(object):
providing a file, only the leaf certificate will be checked against
this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
default: none.
+ api_version (tuple): specify which kafka API version to use. Accepted
+ values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10)
+ If None, KafkaClient will attempt to infer the broker
+ version by probing various APIs. Default: None
+ api_version_auto_timeout_ms (int): number of milliseconds to throw a
+ timeout exception from the constructor when checking the broker
+ api version. Only applies if api_version is None
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
self.config[key] = configs[key]
+ if self.config['api_version'] is not None:
+ assert self.config['api_version'] in self.API_VERSIONS, (
+ 'api_version [{}] must be one of: {}'.format(
+ self.config['api_version'], str(self.API_VERSIONS)))
+
self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
@@ -141,6 +162,11 @@ class KafkaClient(object):
self._closed = False
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
+ # Check Broker Version if not set explicitly
+ if self.config['api_version'] is None:
+ check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
+ self.config['api_version'] = self.check_version(timeout=check_timeout)
+
def _bootstrap(self, hosts):
# Exponential backoff if bootstrap fails
backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
@@ -683,7 +709,7 @@ class KafkaClient(object):
is down and the client enters a bootstrap backoff sleep.
This is only possible if node_id is None.
- Returns: version str, i.e. '0.10', '0.9', '0.8.2', '0.8.1', '0.8.0'
+ Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
Raises:
NodeNotReadyError (if node_id is provided)