summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py52
1 files changed, 52 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 8c61288..8a92159 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -471,6 +471,58 @@ class KafkaClient(object):
"""
self._delayed_tasks.remove(task)
+ def check_version(self, node_id=None):
+ """Attempt to guess the broker version"""
+ if node_id is None:
+ node_id = self.least_loaded_node()
+
+ def connect():
+ timeout = time.time() + 10
+ # brokers < 0.9 do not return any broker metadata if there are no topics
+ # so we're left with a single bootstrap connection
+ while not self.ready(node_id):
+ if time.time() >= timeout:
+ raise Errors.NodeNotReadyError(node_id)
+ time.sleep(0.025)
+
+ # kafka kills the connection when it doesnt recognize an API request
+ # so we can send a test request and then follow immediately with a
+ # vanilla MetadataRequest. If the server did not recognize the first
+ # request, both will be failed with a ConnectionError that wraps
+ # socket.error (32 or 54)
+ import socket
+ from .protocol.admin import ListGroupsRequest
+ from .protocol.commit import (
+ OffsetFetchRequest_v0, GroupCoordinatorRequest)
+ from .protocol.metadata import MetadataRequest
+
+ test_cases = [
+ ('0.9', ListGroupsRequest()),
+ ('0.8.2', GroupCoordinatorRequest('kafka-python-default-group')),
+ ('0.8.1', OffsetFetchRequest_v0('kafka-python-default-group', [])),
+ ('0.8.0', MetadataRequest([])),
+ ]
+
+
+ for version, request in test_cases:
+ connect()
+ f = self.send(node_id, request)
+ time.sleep(0.5)
+ self.send(node_id, MetadataRequest([]))
+ self.poll(future=f)
+
+ assert f.is_done
+
+ if f.succeeded():
+ log.info('Broker version identifed as %s', version)
+ return version
+
+ assert isinstance(f.exception.message, socket.error)
+ assert f.exception.message.errno in (32, 54)
+ log.info("Broker is not v%s -- it did not recognize %s",
+ version, request.__class__.__name__)
+ continue
+
class DelayedTaskQueue(object):
# see https://docs.python.org/2/library/heapq.html