summaryrefslogtreecommitdiff
path: root/kafka/admin/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/admin/kafka.py')
-rw-r--r--kafka/admin/kafka.py134
1 files changed, 90 insertions, 44 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
index 5ce8630..950ae3d 100644
--- a/kafka/admin/kafka.py
+++ b/kafka/admin/kafka.py
@@ -4,9 +4,10 @@ import copy
import logging
import socket
from kafka.client_async import KafkaClient, selectors
+import kafka.errors as Errors
from kafka.errors import (
- IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
- NodeNotReadyError, NotControllerError)
+ IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
+ UnrecognizedBrokerVersion)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
@@ -230,17 +231,22 @@ class KafkaAdmin(object):
return timeout_ms or self.config['request_timeout_ms']
def _refresh_controller_id(self):
- """Determine the kafka cluster controller
- """
- response = self._send_request_to_node(
- self._client.least_loaded_node(),
- MetadataRequest[1]([])
- )
- self._controller_id = response.controller_id
- version = self._client.check_version(self._controller_id)
- if version < (0, 10, 0):
- raise IncompatibleBrokerVersion(
- "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
+ """Determine the kafka cluster controller."""
+ version = self._matching_api_version(MetadataRequest)
+ if 1 <= version <= 6:
+ request = MetadataRequest[version]()
+ response = self._send_request_to_node(self._client.least_loaded_node(), request)
+ controller_id = response.controller_id
+ # verify the controller is new enough to support our requests
+ controller_version = self._client.check_version(controller_id)
+ if controller_version < (0, 10, 0):
+ raise IncompatibleBrokerVersion(
+ "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
+ .format(controller_version))
+ self._controller_id = controller_id
+ else:
+ raise UnrecognizedBrokerVersion(
+ "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
.format(version))
def _send_request_to_node(self, node, request):
@@ -261,22 +267,34 @@ class KafkaAdmin(object):
else:
raise future.exception # pylint: disable-msg=raising-bad-type
- def _send(self, request):
- """Send a kafka protocol message to the cluster controller. Will block until the message result is received.
+ def _send_request_to_controller(self, request):
+ """Send a kafka protocol message to the cluster controller.
+
+ Will block until the message result is received.
:param request: The message to send
- :return The kafka protocol response for the message
- :exception NodeNotReadyError: If the controller connection can't be established
+ :return: The kafka protocol response for the message
"""
- remaining_tries = 2
- while remaining_tries > 0:
- remaining_tries = remaining_tries - 1
- try:
- return self._send_request_to_node(self._controller_id, request)
- except (NotControllerError, KafkaConnectionError) as e:
- # controller changed? refresh it
- self._refresh_controller_id()
- raise NodeNotReadyError(self._controller_id)
+ tries = 2 # in case our cached self._controller_id is outdated
+ while tries:
+ tries -= 1
+ response = self._send_request_to_node(self._controller_id, request)
+ # DeleteTopicsResponse returns topic_error_codes rather than topic_errors
+ for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
+ error_type = Errors.for_code(error_code)
+ if tries and isinstance(error_type, NotControllerError):
+ # No need to inspect the rest of the errors for
+ # non-retriable errors because NotControllerError should
+ # either be thrown for all errors or no errors.
+ self._refresh_controller_id()
+ break
+ elif error_type is not Errors.NoError:
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+ else:
+ return response
+ raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")
@staticmethod
def _convert_new_topic_request(new_topic):
@@ -322,7 +340,7 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ return self._send_request_to_controller(request)
def delete_topics(self, topics, timeout_ms=None):
"""Delete topics from the cluster
@@ -342,19 +360,25 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ return self._send_request_to_controller(request)
# list topics functionality is in ClusterMetadata
+ # Note: if implemented here, send the request to the least_loaded_node()
# describe topics functionality is in ClusterMetadata
+ # Note: if implemented here, send the request to the controller
# describe cluster functionality is in ClusterMetadata
+ # Note: if implemented here, send the request to the least_loaded_node()
- # describe_acls protocol not implemented
+ # describe_acls protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
- # create_acls protocol not implemented
+ # create_acls protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
- # delete_acls protocol not implemented
+ # delete_acls protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
@staticmethod
def _convert_describe_config_resource_request(config_resource):
@@ -394,7 +418,7 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ return self._send_request_to_node(self._client.least_loaded_node(), request)
@staticmethod
def _convert_alter_config_resource_request(config_resource):
@@ -409,6 +433,12 @@ class KafkaAdmin(object):
def alter_configs(self, config_resources):
"""Alter configuration parameters of one or more kafka resources.
+ Warning:
+ This is currently broken for BROKER resources because those must be
+ sent to that specific broker, versus this always picks the
+ least-loaded node. See the comment in the source code for details.
+ We would happily accept a PR fixing this.
+
:param config_resources: An array of ConfigResource objects.
:return: Appropriate version of AlterConfigsResponse class
"""
@@ -421,11 +451,19 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ # TODO the Java client has the note:
+ # // We must make a separate AlterConfigs request for every BROKER resource we want to alter
+ # // and send the request to that specific broker. Other resources are grouped together into
+ # // a single request that may be sent to any broker.
+ #
+ # So this is currently broken as it always sends to the least_loaded_node()
+ return self._send_request_to_node(self._client.least_loaded_node(), request)
- # alter replica logs dir protocol not implemented
+ # alter replica logs dir protocol not yet implemented
+ # Note: have to lookup the broker with the replica assignment and send the request to that broker
- # describe log dirs protocol not implemented
+ # describe log dirs protocol not yet implemented
+ # Note: have to lookup the broker with the replica assignment and send the request to that broker
@staticmethod
def _convert_create_partitions_request(topic_name, new_partitions):
@@ -458,17 +496,22 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ return self._send_request_to_controller(request)
- # delete records protocol not implemented
+ # delete records protocol not yet implemented
+ # Note: send the request to the partition leaders
- # create delegation token protocol not implemented
+ # create delegation token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
- # renew delegation token protocol not implemented
+ # renew delegation token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
- # expire delegation_token protocol not implemented
+ # expire delegation_token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
- # describe delegation_token protocol not implemented
+ # describe delegation_token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
def describe_consumer_groups(self, group_ids):
"""Describe a set of consumer groups.
@@ -485,7 +528,8 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ # TODO this is completely broken, as it needs to send to the group coordinator
+ # return self._send(request)
def list_consumer_groups(self):
"""List all consumer groups known to the cluster.
@@ -499,6 +543,8 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ # TODO this is completely broken, as it needs to send to the group coordinator
+ # return self._send(request)
- # delete groups protocol not implemented
+ # delete groups protocol not yet implemented
+ # Note: send the request to the group's coordinator.