diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-16 10:29:37 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-16 12:07:26 -0700 |
commit | 2c4b9cdf2cc18ccd40d2489e74766c6b702c2725 (patch) | |
tree | 72be396969e5f04509787f496219c0c8b298336b /kafka/client_async.py | |
parent | 061cb4e83469166873912fca2aac62ca8376377f (diff) | |
download | kafka-python-client_api_version.tar.gz |
Add api_version config to KafkaClient, deprecate str in favor of tuplesclient_api_version
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 30 |
1 files changed, 28 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 8839dee..6fa9434 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import copy import functools @@ -61,7 +61,16 @@ class KafkaClient(object): 'ssl_keyfile': None, 'ssl_password': None, 'ssl_crlfile': None, + 'api_version': None, + 'api_version_auto_timeout_ms': 2000, } + API_VERSIONS = [ + (0, 10), + (0, 9), + (0, 8, 2), + (0, 8, 1), + (0, 8, 0) + ] def __init__(self, **configs): """Initialize an asynchronous kafka client @@ -118,12 +127,24 @@ class KafkaClient(object): providing a file, only the leaf certificate will be checked against this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. default: none. + api_version (tuple): specify which kafka API version to use. Accepted + values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10) + If None, KafkaClient will attempt to infer the broker + version by probing various APIs. Default: None + api_version_auto_timeout_ms (int): number of milliseconds to throw a + timeout exception from the constructor when checking the broker + api version. Only applies if api_version is None """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: self.config[key] = configs[key] + if self.config['api_version'] is not None: + assert self.config['api_version'] in self.API_VERSIONS, ( + 'api_version [{}] must be one of: {}'.format( + self.config['api_version'], str(self.API_VERSIONS))) + self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False @@ -141,6 +162,11 @@ class KafkaClient(object): self._closed = False self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) + # Check Broker Version if not set explicitly + if self.config['api_version'] is None: + check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 + self.config['api_version'] = self.check_version(timeout=check_timeout) + def _bootstrap(self, hosts): # Exponential backoff if bootstrap fails backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails @@ -683,7 +709,7 @@ class KafkaClient(object): is down and the client enters a bootstrap backoff sleep. This is only possible if node_id is None. - Returns: version str, i.e. '0.10', '0.9', '0.8.2', '0.8.1', '0.8.0' + Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ... Raises: NodeNotReadyError (if node_id is provided) |