diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-03 15:46:37 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-03 15:46:37 -0800 |
commit | 30fefa9b4f6922b97536b5641ec696dcc8257601 (patch) | |
tree | d690819f62950fd2b630f37f32e65a2088ff45ef /kafka/client_async.py | |
parent | 4dc1fbab30b7cbff13b8f12424aa4cac512995d8 (diff) | |
download | kafka-python-30fefa9b4f6922b97536b5641ec696dcc8257601.tar.gz |
Add KafkaClient.check_version() to guess broker version
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 8c61288..8a92159 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -471,6 +471,58 @@ class KafkaClient(object): """ self._delayed_tasks.remove(task) + def check_version(self, node_id=None): + """Attempt to guess the broker version""" + if node_id is None: + node_id = self.least_loaded_node() + + def connect(): + timeout = time.time() + 10 + # brokers < 0.9 do not return any broker metadata if there are no topics + # so we're left with a single bootstrap connection + while not self.ready(node_id): + if time.time() >= timeout: + raise Errors.NodeNotReadyError(node_id) + time.sleep(0.025) + + # kafka kills the connection when it doesnt recognize an API request + # so we can send a test request and then follow immediately with a + # vanilla MetadataRequest. If the server did not recognize the first + # request, both will be failed with a ConnectionError that wraps + # socket.error (32 or 54) + import socket + from .protocol.admin import ListGroupsRequest + from .protocol.commit import ( + OffsetFetchRequest_v0, GroupCoordinatorRequest) + from .protocol.metadata import MetadataRequest + + test_cases = [ + ('0.9', ListGroupsRequest()), + ('0.8.2', GroupCoordinatorRequest('kafka-python-default-group')), + ('0.8.1', OffsetFetchRequest_v0('kafka-python-default-group', [])), + ('0.8.0', MetadataRequest([])), + ] + + + for version, request in test_cases: + connect() + f = self.send(node_id, request) + time.sleep(0.5) + self.send(node_id, MetadataRequest([])) + self.poll(future=f) + + assert f.is_done + + if f.succeeded(): + log.info('Broker version identifed as %s', version) + return version + + assert isinstance(f.exception.message, socket.error) + assert f.exception.message.errno in (32, 54) + log.info("Broker is not v%s -- it did not recognize %s", + version, request.__class__.__name__) + continue + class DelayedTaskQueue(object): # see https://docs.python.org/2/library/heapq.html |