diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 8c61288..8a92159 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -471,6 +471,58 @@ class KafkaClient(object): """ self._delayed_tasks.remove(task) + def check_version(self, node_id=None): + """Attempt to guess the broker version""" + if node_id is None: + node_id = self.least_loaded_node() + + def connect(): + timeout = time.time() + 10 + # brokers < 0.9 do not return any broker metadata if there are no topics + # so we're left with a single bootstrap connection + while not self.ready(node_id): + if time.time() >= timeout: + raise Errors.NodeNotReadyError(node_id) + time.sleep(0.025) + + # kafka kills the connection when it doesnt recognize an API request + # so we can send a test request and then follow immediately with a + # vanilla MetadataRequest. If the server did not recognize the first + # request, both will be failed with a ConnectionError that wraps + # socket.error (32 or 54) + import socket + from .protocol.admin import ListGroupsRequest + from .protocol.commit import ( + OffsetFetchRequest_v0, GroupCoordinatorRequest) + from .protocol.metadata import MetadataRequest + + test_cases = [ + ('0.9', ListGroupsRequest()), + ('0.8.2', GroupCoordinatorRequest('kafka-python-default-group')), + ('0.8.1', OffsetFetchRequest_v0('kafka-python-default-group', [])), + ('0.8.0', MetadataRequest([])), + ] + + + for version, request in test_cases: + connect() + f = self.send(node_id, request) + time.sleep(0.5) + self.send(node_id, MetadataRequest([])) + self.poll(future=f) + + assert f.is_done + + if f.succeeded(): + log.info('Broker version identifed as %s', version) + return version + + assert isinstance(f.exception.message, socket.error) + assert f.exception.message.errno in (32, 54) + log.info("Broker is not v%s -- it did not recognize %s", + version, request.__class__.__name__) + continue + class DelayedTaskQueue(object): # see https://docs.python.org/2/library/heapq.html |