summaryrefslogtreecommitdiff
path: root/kafka/admin/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r--kafka/admin/client.py752
1 files changed, 752 insertions, 0 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
new file mode 100644
index 0000000..e25afe7
--- /dev/null
+++ b/kafka/admin/client.py
@@ -0,0 +1,752 @@
+from __future__ import absolute_import
+
+from collections import defaultdict
+import copy
+import logging
+import socket
+
+from kafka.vendor import six
+
+from kafka.client_async import KafkaClient, selectors
+import kafka.errors as Errors
+from kafka.errors import (
+ IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
+ UnrecognizedBrokerVersion)
+from kafka.metrics import MetricConfig, Metrics
+from kafka.protocol.admin import (
+ CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
+ ListGroupsRequest, DescribeGroupsRequest)
+from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
+from kafka.protocol.metadata import MetadataRequest
+from kafka.structs import TopicPartition, OffsetAndMetadata
+from kafka.version import __version__
+
+
+log = logging.getLogger(__name__)
+
+
+class KafkaAdminClient(object):
+ """A class for administering the Kafka cluster.
+
+ Warning:
+ This is an unstable interface that was recently added and is subject to
+ change without warning. In particular, many methods currently return
+ raw protocol tuples. In future releases, we plan to make these into
+ nicer, more pythonic objects. Unfortunately, this will likely break
+ those interfaces.
+
+ The KafkaAdminClient class will negotiate for the latest version of each message
+ protocol format supported by both the kafka-python client library and the
+ Kafka broker. Usage of optional fields from protocol versions that are not
+ supported by the broker will result in IncompatibleBrokerVersion exceptions.
+
+ Use of this class requires a minimum broker version >= 0.10.0.0.
+
+ Keyword Arguments:
+ bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
+ strings) that the consumer should contact to bootstrap initial
+ cluster metadata. This does not have to be the full node list.
+ It just needs to have at least one broker that will respond to a
+ Metadata API Request. Default port is 9092. If no servers are
+ specified, will default to localhost:9092.
+ client_id (str): a name for this client. This string is passed in
+ each request to servers and can be used to identify specific
+ server-side log entries that correspond to this client. Also
+ submitted to GroupCoordinator for logging with respect to
+ consumer group administration. Default: 'kafka-python-{version}'
+ reconnect_backoff_ms (int): The amount of time in milliseconds to
+ wait before attempting to reconnect to a given host.
+ Default: 50.
+ reconnect_backoff_max_ms (int): The maximum amount of time in
+ milliseconds to wait when reconnecting to a broker that has
+ repeatedly failed to connect. If provided, the backoff per host
+ will increase exponentially for each consecutive connection
+ failure, up to this maximum. To avoid connection storms, a
+ randomization factor of 0.2 will be applied to the backoff
+ resulting in a random range between 20% below and 20% above
+ the computed value. Default: 1000.
+ request_timeout_ms (int): Client request timeout in milliseconds.
+ Default: 30000.
+ connections_max_idle_ms: Close idle connections after the number of
+ milliseconds specified by this config. The broker closes idle
+ connections after connections.max.idle.ms, so this avoids hitting
+ unexpected socket disconnected errors on the client.
+ Default: 540000
+ retry_backoff_ms (int): Milliseconds to backoff when retrying on
+ errors. Default: 100.
+ max_in_flight_requests_per_connection (int): Requests are pipelined
+ to kafka brokers up to this number of maximum requests per
+ broker connection. Default: 5.
+ receive_buffer_bytes (int): The size of the TCP receive buffer
+ (SO_RCVBUF) to use when reading data. Default: None (relies on
+ system defaults). Java client defaults to 32768.
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: None (relies on
+ system defaults). Java client defaults to 131072.
+ socket_options (list): List of tuple-arguments to socket.setsockopt
+ to apply to broker connection sockets. Default:
+ [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
+ metadata_max_age_ms (int): The period of time in milliseconds after
+ which we force a refresh of metadata even if we haven't seen any
+ partition leadership changes to proactively discover any new
+ brokers or partitions. Default: 300000
+ security_protocol (str): Protocol used to communicate with brokers.
+ Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ ssl_context (ssl.SSLContext): Pre-configured SSLContext for wrapping
+ socket connections. If provided, all other ssl_* configurations
+ will be ignored. Default: None.
+ ssl_check_hostname (bool): Flag to configure whether SSL handshake
+ should verify that the certificate matches the broker's hostname.
+ Default: True.
+ ssl_cafile (str): Optional filename of CA file to use in certificate
+ veriication. Default: None.
+ ssl_certfile (str): Optional filename of file in PEM format containing
+ the client certificate, as well as any CA certificates needed to
+ establish the certificate's authenticity. Default: None.
+ ssl_keyfile (str): Optional filename containing the client private key.
+ Default: None.
+ ssl_password (str): Optional password to be used when loading the
+ certificate chain. Default: None.
+ ssl_crlfile (str): Optional filename containing the CRL to check for
+ certificate expiration. By default, no CRL check is done. When
+ providing a file, only the leaf certificate will be checked against
+ this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
+ Default: None.
+ api_version (tuple): Specify which Kafka API version to use. If set
+ to None, KafkaClient will attempt to infer the broker version by
+ probing various APIs. Example: (0, 10, 2). Default: None
+ api_version_auto_timeout_ms (int): number of milliseconds to throw a
+ timeout exception from the constructor when checking the broker
+ api version. Only applies if api_version is None
+ selector (selectors.BaseSelector): Provide a specific selector
+ implementation to use for I/O multiplexing.
+ Default: selectors.DefaultSelector
+ metrics (kafka.metrics.Metrics): Optionally provide a metrics
+ instance for capturing network IO stats. Default: None.
+ metric_group_prefix (str): Prefix for metric names. Default: ''
+ sasl_mechanism (str): string picking sasl mechanism when security_protocol
+ is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
+ Default: None
+ sasl_plain_username (str): username for sasl PLAIN authentication.
+ Default: None
+ sasl_plain_password (str): password for sasl PLAIN authentication.
+ Default: None
+ sasl_kerberos_service_name (str): Service name to include in GSSAPI
+ sasl mechanism handshake. Default: 'kafka'
+
+ """
+ DEFAULT_CONFIG = {
+ # client configs
+ 'bootstrap_servers': 'localhost',
+ 'client_id': 'kafka-python-' + __version__,
+ 'request_timeout_ms': 30000,
+ 'connections_max_idle_ms': 9 * 60 * 1000,
+ 'reconnect_backoff_ms': 50,
+ 'reconnect_backoff_max_ms': 1000,
+ 'max_in_flight_requests_per_connection': 5,
+ 'receive_buffer_bytes': None,
+ 'send_buffer_bytes': None,
+ 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
+ 'sock_chunk_bytes': 4096, # undocumented experimental option
+ 'sock_chunk_buffer_count': 1000, # undocumented experimental option
+ 'retry_backoff_ms': 100,
+ 'metadata_max_age_ms': 300000,
+ 'security_protocol': 'PLAINTEXT',
+ 'ssl_context': None,
+ 'ssl_check_hostname': True,
+ 'ssl_cafile': None,
+ 'ssl_certfile': None,
+ 'ssl_keyfile': None,
+ 'ssl_password': None,
+ 'ssl_crlfile': None,
+ 'api_version': None,
+ 'api_version_auto_timeout_ms': 2000,
+ 'selector': selectors.DefaultSelector,
+ 'sasl_mechanism': None,
+ 'sasl_plain_username': None,
+ 'sasl_plain_password': None,
+ 'sasl_kerberos_service_name': 'kafka',
+
+ # metrics configs
+ 'metric_reporters': [],
+ 'metrics_num_samples': 2,
+ 'metrics_sample_window_ms': 30000,
+ }
+
+ def __init__(self, **configs):
+ log.debug("Starting KafkaAdminClient with configuration: %s", configs)
+ extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
+ if extra_configs:
+ raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
+
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ self.config.update(configs)
+
+ # Configure metrics
+ metrics_tags = {'client-id': self.config['client_id']}
+ metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
+ time_window_ms=self.config['metrics_sample_window_ms'],
+ tags=metrics_tags)
+ reporters = [reporter() for reporter in self.config['metric_reporters']]
+ self._metrics = Metrics(metric_config, reporters)
+
+ self._client = KafkaClient(metrics=self._metrics,
+ metric_group_prefix='admin',
+ **self.config)
+
+ # Get auto-discovered version from client if necessary
+ if self.config['api_version'] is None:
+ self.config['api_version'] = self._client.config['api_version']
+
+ self._closed = False
+ self._refresh_controller_id()
+ log.debug("KafkaAdminClient started.")
+
+ def close(self):
+ """Close the KafkaAdminClient connection to the Kafka broker."""
+ if not hasattr(self, '_closed') or self._closed:
+ log.info("KafkaAdminClient already closed.")
+ return
+
+ self._metrics.close()
+ self._client.close()
+ self._closed = True
+ log.debug("KafkaAdminClient is now closed.")
+
+ def _matching_api_version(self, operation):
+ """Find the latest version of the protocol operation supported by both
+ this library and the broker.
+
+ This resolves to the lesser of either the latest api version this
+ library supports, or the max version supported by the broker.
+
+ :param operation: A list of protocol operation versions from kafka.protocol.
+ :return: The max matching version number between client and broker.
+ """
+ version = min(len(operation) - 1,
+ self._client.get_api_versions()[operation[0].API_KEY][1])
+ if version < self._client.get_api_versions()[operation[0].API_KEY][0]:
+ # max library version is less than min broker version. Currently,
+ # no Kafka versions specify a min msg version. Maybe in the future?
+ raise IncompatibleBrokerVersion(
+ "No version of the '{}' Kafka protocol is supported by both the client and broker."
+ .format(operation.__name__))
+ return version
+
+ def _validate_timeout(self, timeout_ms):
+ """Validate the timeout is set or use the configuration default.
+
+ :param timeout_ms: The timeout provided by api call, in milliseconds.
+ :return: The timeout to use for the operation.
+ """
+ return timeout_ms or self.config['request_timeout_ms']
+
+ def _refresh_controller_id(self):
+ """Determine the Kafka cluster controller."""
+ version = self._matching_api_version(MetadataRequest)
+ if 1 <= version <= 6:
+ request = MetadataRequest[version]()
+ response = self._send_request_to_node(self._client.least_loaded_node(), request)
+ controller_id = response.controller_id
+ # verify the controller is new enough to support our requests
+ controller_version = self._client.check_version(controller_id)
+ if controller_version < (0, 10, 0):
+ raise IncompatibleBrokerVersion(
+ "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
+ .format(controller_version))
+ self._controller_id = controller_id
+ else:
+ raise UnrecognizedBrokerVersion(
+ "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
+ .format(version))
+
+ def _find_group_coordinator_id(self, group_id):
+ """Find the broker node_id of the coordinator of the given group.
+
+ Sends a FindCoordinatorRequest message to the cluster. Will block until
+ the FindCoordinatorResponse is received. Any errors are immediately
+ raised.
+
+ :param group_id: The consumer group ID. This is typically the group
+ name as a string.
+ :return: The node_id of the broker that is the coordinator.
+ """
+ # Note: Java may change how this is implemented in KAFKA-6791.
+ #
+ # TODO add support for dynamically picking version of
+ # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
+ # When I experimented with this, GroupCoordinatorResponse_v1 didn't
+ # match GroupCoordinatorResponse_v0 and I couldn't figure out why.
+ gc_request = GroupCoordinatorRequest[0](group_id)
+ gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
+ # use the extra error checking in add_group_coordinator() rather than
+ # immediately returning the group coordinator.
+ success = self._client.cluster.add_group_coordinator(group_id, gc_response)
+ if not success:
+ error_type = Errors.for_code(gc_response.error_code)
+ assert error_type is not Errors.NoError
+ # Note: When error_type.retriable, Java will retry... see
+ # KafkaAdminClient's handleFindCoordinatorError method
+ raise error_type(
+ "Could not identify group coordinator for group_id '{}' from response '{}'."
+ .format(group_id, gc_response))
+ group_coordinator = self._client.cluster.coordinator_for_group(group_id)
+ # will be None if the coordinator was never populated, which should never happen here
+ assert group_coordinator is not None
+ # will be -1 if add_group_coordinator() failed... but by this point the
+ # error should have been raised.
+ assert group_coordinator != -1
+ return group_coordinator
+
+ def _send_request_to_node(self, node_id, request):
+ """Send a Kafka protocol message to a specific broker.
+
+ Will block until the message result is received.
+
+ :param node_id: The broker id to which to send the message.
+ :param request: The message to send.
+ :return: The Kafka protocol response for the message.
+ :exception: The exception if the message could not be sent.
+ """
+ while not self._client.ready(node_id):
+ # poll until the connection to broker is ready, otherwise send()
+ # will fail with NodeNotReadyError
+ self._client.poll()
+ future = self._client.send(node_id, request)
+ self._client.poll(future=future)
+ if future.succeeded():
+ return future.value
+ else:
+ raise future.exception # pylint: disable-msg=raising-bad-type
+
+ def _send_request_to_controller(self, request):
+ """Send a Kafka protocol message to the cluster controller.
+
+ Will block until the message result is received.
+
+ :param request: The message to send.
+ :return: The Kafka protocol response for the message.
+ """
+ tries = 2 # in case our cached self._controller_id is outdated
+ while tries:
+ tries -= 1
+ response = self._send_request_to_node(self._controller_id, request)
+ # DeleteTopicsResponse returns topic_error_codes rather than topic_errors
+ for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
+ error_type = Errors.for_code(error_code)
+ if tries and isinstance(error_type, NotControllerError):
+ # No need to inspect the rest of the errors for
+ # non-retriable errors because NotControllerError should
+ # either be thrown for all errors or no errors.
+ self._refresh_controller_id()
+ break
+ elif error_type is not Errors.NoError:
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+ else:
+ return response
+ raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")
+
+ @staticmethod
+ def _convert_new_topic_request(new_topic):
+ return (
+ new_topic.name,
+ new_topic.num_partitions,
+ new_topic.replication_factor,
+ [
+ (partition_id, replicas) for partition_id, replicas in new_topic.replica_assignments.items()
+ ],
+ [
+ (config_key, config_value) for config_key, config_value in new_topic.topic_configs.items()
+ ]
+ )
+
+ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
+ """Create new topics in the cluster.
+
+ :param new_topics: A list of NewTopic objects.
+ :param timeout_ms: Milliseconds to wait for new topics to be created
+ before the broker returns.
+ :param validate_only: If True, don't actually create new topics.
+ Not supported by all versions. Default: False
+ :return: Appropriate version of CreateTopicResponse class.
+ """
+ version = self._matching_api_version(CreateTopicsRequest)
+ timeout_ms = self._validate_timeout(timeout_ms)
+ if version == 0:
+ if validate_only:
+ raise IncompatibleBrokerVersion(
+ "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
+ .format(self.config['api_version']))
+ request = CreateTopicsRequest[version](
+ create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
+ timeout=timeout_ms
+ )
+ elif version <= 2:
+ request = CreateTopicsRequest[version](
+ create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
+ timeout=timeout_ms,
+ validate_only=validate_only
+ )
+ else:
+ raise NotImplementedError(
+ "Support for CreateTopics v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ # TODO convert structs to a more pythonic interface
+ # TODO raise exceptions if errors
+ return self._send_request_to_controller(request)
+
+ def delete_topics(self, topics, timeout_ms=None):
+ """Delete topics from the cluster.
+
+ :param topics: A list of topic name strings.
+ :param timeout_ms: Milliseconds to wait for topics to be deleted
+ before the broker returns.
+ :return: Appropriate version of DeleteTopicsResponse class.
+ """
+ version = self._matching_api_version(DeleteTopicsRequest)
+ timeout_ms = self._validate_timeout(timeout_ms)
+ if version <= 1:
+ request = DeleteTopicsRequest[version](
+ topics=topics,
+ timeout=timeout_ms
+ )
+ response = self._send_request_to_controller(request)
+ else:
+ raise NotImplementedError(
+ "Support for DeleteTopics v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return response
+
+ # list topics functionality is in ClusterMetadata
+ # Note: if implemented here, send the request to the least_loaded_node()
+
+ # describe topics functionality is in ClusterMetadata
+ # Note: if implemented here, send the request to the controller
+
+ # describe cluster functionality is in ClusterMetadata
+ # Note: if implemented here, send the request to the least_loaded_node()
+
+ # describe_acls protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
+
+ # create_acls protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
+
+ # delete_acls protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
+
+ @staticmethod
+ def _convert_describe_config_resource_request(config_resource):
+ return (
+ config_resource.resource_type,
+ config_resource.name,
+ [
+ config_key for config_key, config_value in config_resource.configs.items()
+ ] if config_resource.configs else None
+ )
+
+ def describe_configs(self, config_resources, include_synonyms=False):
+ """Fetch configuration parameters for one or more Kafka resources.
+
+ :param config_resources: An list of ConfigResource objects.
+ Any keys in ConfigResource.configs dict will be used to filter the
+ result. Setting the configs dict to None will get all values. An
+ empty dict will get zero values (as per Kafka protocol).
+ :param include_synonyms: If True, return synonyms in response. Not
+ supported by all versions. Default: False.
+ :return: Appropriate version of DescribeConfigsResponse class.
+ """
+ version = self._matching_api_version(DescribeConfigsRequest)
+ if version == 0:
+ if include_synonyms:
+ raise IncompatibleBrokerVersion(
+ "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
+ .format(self.config['api_version']))
+ request = DescribeConfigsRequest[version](
+ resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
+ )
+ elif version == 1:
+ request = DescribeConfigsRequest[version](
+ resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
+ include_synonyms=include_synonyms
+ )
+ else:
+ raise NotImplementedError(
+ "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_node(self._client.least_loaded_node(), request)
+
+ @staticmethod
+ def _convert_alter_config_resource_request(config_resource):
+ return (
+ config_resource.resource_type,
+ config_resource.name,
+ [
+ (config_key, config_value) for config_key, config_value in config_resource.configs.items()
+ ]
+ )
+
+ def alter_configs(self, config_resources):
+ """Alter configuration parameters of one or more Kafka resources.
+
+ Warning:
+ This is currently broken for BROKER resources because those must be
+ sent to that specific broker, versus this always picks the
+ least-loaded node. See the comment in the source code for details.
+ We would happily accept a PR fixing this.
+
+ :param config_resources: A list of ConfigResource objects.
+ :return: Appropriate version of AlterConfigsResponse class.
+ """
+ version = self._matching_api_version(AlterConfigsRequest)
+ if version == 0:
+ request = AlterConfigsRequest[version](
+ resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
+ )
+ else:
+ raise NotImplementedError(
+ "Support for AlterConfigs v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ # TODO the Java client has the note:
+ # // We must make a separate AlterConfigs request for every BROKER resource we want to alter
+ # // and send the request to that specific broker. Other resources are grouped together into
+ # // a single request that may be sent to any broker.
+ #
+ # So this is currently broken as it always sends to the least_loaded_node()
+ return self._send_request_to_node(self._client.least_loaded_node(), request)
+
+ # alter replica logs dir protocol not yet implemented
+ # Note: have to lookup the broker with the replica assignment and send the request to that broker
+
+ # describe log dirs protocol not yet implemented
+ # Note: have to lookup the broker with the replica assignment and send the request to that broker
+
+ @staticmethod
+ def _convert_create_partitions_request(topic_name, new_partitions):
+ return (
+ topic_name,
+ (
+ new_partitions.total_count,
+ new_partitions.new_assignments
+ )
+ )
+
+ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False):
+ """Create additional partitions for an existing topic.
+
+ :param topic_partitions: A map of topic name strings to NewPartition objects.
+ :param timeout_ms: Milliseconds to wait for new partitions to be
+ created before the broker returns.
+ :param validate_only: If True, don't actually create new partitions.
+ Default: False
+ :return: Appropriate version of CreatePartitionsResponse class.
+ """
+ version = self._matching_api_version(CreatePartitionsRequest)
+ timeout_ms = self._validate_timeout(timeout_ms)
+ if version == 0:
+ request = CreatePartitionsRequest[version](
+ topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
+ timeout=timeout_ms,
+ validate_only=validate_only
+ )
+ else:
+ raise NotImplementedError(
+ "Support for CreatePartitions v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_controller(request)
+
+ # delete records protocol not yet implemented
+ # Note: send the request to the partition leaders
+
+ # create delegation token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
+
+ # renew delegation token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
+
+ # expire delegation_token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
+
+ # describe delegation_token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
+
+ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
+ """Describe a set of consumer groups.
+
+ Any errors are immediately raised.
+
+ :param group_ids: A list of consumer group IDs. These are typically the
+ group names as strings.
+ :param group_coordinator_id: The node_id of the groups' coordinator
+ broker. If set to None, it will query the cluster for each group to
+ find that group's coordinator. Explicitly specifying this can be
+ useful for avoiding extra network round trips if you already know
+ the group coordinator. This is only useful when all the group_ids
+ have the same coordinator, otherwise it will error. Default: None.
+ :return: A list of group descriptions. For now the group descriptions
+ are the raw results from the DescribeGroupsResponse. Long-term, we
+ plan to change this to return namedtuples as well as decoding the
+ partition assignments.
+ """
+ group_descriptions = []
+ version = self._matching_api_version(DescribeGroupsRequest)
+ for group_id in group_ids:
+ if group_coordinator_id is not None:
+ this_groups_coordinator_id = group_coordinator_id
+ else:
+ this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
+ if version <= 1:
+ # Note: KAFKA-6788 A potential optimization is to group the
+ # request per coordinator and send one request with a list of
+ # all consumer groups. Java still hasn't implemented this
+ # because the error checking is hard to get right when some
+ # groups error and others don't.
+ request = DescribeGroupsRequest[version](groups=(group_id,))
+ response = self._send_request_to_node(this_groups_coordinator_id, request)
+ assert len(response.groups) == 1
+ # TODO need to implement converting the response tuple into
+ # a more accessible interface like a namedtuple and then stop
+ # hardcoding tuple indices here. Several Java examples,
+ # including KafkaAdminClient.java
+ group_description = response.groups[0]
+ error_code = group_description[0]
+ error_type = Errors.for_code(error_code)
+ # Java has the note: KAFKA-6789, we can retry based on the error code
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+ # TODO Java checks the group protocol type, and if consumer
+ # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
+ # the members' partition assignments... that hasn't yet been
+ # implemented here so just return the raw struct results
+ group_descriptions.append(group_description)
+ else:
+ raise NotImplementedError(
+ "Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return group_descriptions
+
+ def list_consumer_groups(self, broker_ids=None):
+ """List all consumer groups known to the cluster.
+
+ This returns a list of Consumer Group tuples. The tuples are
+ composed of the consumer group name and the consumer group protocol
+ type.
+
+ Only consumer groups that store their offsets in Kafka are returned.
+ The protocol type will be an empty string for groups created using
+ Kafka < 0.9 APIs because, although they store their offsets in Kafka,
+ they don't use Kafka for group coordination. For groups created using
+ Kafka >= 0.9, the protocol type will typically be "consumer".
+
+ As soon as any error is encountered, it is immediately raised.
+
+ :param broker_ids: A list of broker node_ids to query for consumer
+ groups. If set to None, will query all brokers in the cluster.
+ Explicitly specifying broker(s) can be useful for determining which
+ consumer groups are coordinated by those broker(s). Default: None
+ :return list: List of tuples of Consumer Groups.
+ :exception GroupCoordinatorNotAvailableError: The coordinator is not
+ available, so cannot process requests.
+ :exception GroupLoadInProgressError: The coordinator is loading and
+ hence can't process requests.
+ """
+ # While we return a list, internally use a set to prevent duplicates
+ # because if a group coordinator fails after being queried, and its
+ # consumer groups move to new brokers that haven't yet been queried,
+ # then the same group could be returned by multiple brokers.
+ consumer_groups = set()
+ if broker_ids is None:
+ broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
+ version = self._matching_api_version(ListGroupsRequest)
+ if version <= 2:
+ request = ListGroupsRequest[version]()
+ for broker_id in broker_ids:
+ response = self._send_request_to_node(broker_id, request)
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+ consumer_groups.update(response.groups)
+ else:
+ raise NotImplementedError(
+ "Support for ListGroups v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return list(consumer_groups)
+
+ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
+ partitions=None):
+ """Fetch Consumer Group Offsets.
+
+ Note:
+ This does not verify that the group_id or partitions actually exist
+ in the cluster.
+
+ As soon as any error is encountered, it is immediately raised.
+
+ :param group_id: The consumer group id name for which to fetch offsets.
+ :param group_coordinator_id: The node_id of the group's coordinator
+ broker. If set to None, will query the cluster to find the group
+ coordinator. Explicitly specifying this can be useful to prevent
+ that extra network round trip if you already know the group
+ coordinator. Default: None.
+ :param partitions: A list of TopicPartitions for which to fetch
+ offsets. On brokers >= 0.10.2, this can be set to None to fetch all
+ known offsets for the consumer group. Default: None.
+ :return dictionary: A dictionary with TopicPartition keys and
+ OffsetAndMetada values. Partitions that are not specified and for
+ which the group_id does not have a recorded offset are omitted. An
+ offset value of `-1` indicates the group_id has no offset for that
+ TopicPartition. A `-1` can only happen for partitions that are
+ explicitly specified.
+ """
+ group_offsets_listing = {}
+ if group_coordinator_id is None:
+ group_coordinator_id = self._find_group_coordinator_id(group_id)
+ version = self._matching_api_version(OffsetFetchRequest)
+ if version <= 3:
+ if partitions is None:
+ if version <= 1:
+ raise ValueError(
+ """OffsetFetchRequest_v{} requires specifying the
+ partitions for which to fetch offsets. Omitting the
+ partitions is only supported on brokers >= 0.10.2.
+ For details, see KIP-88.""".format(version))
+ topics_partitions = None
+ else:
+ # transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])]
+ topics_partitions_dict = defaultdict(set)
+ for topic, partition in partitions:
+ topics_partitions_dict[topic].add(partition)
+ topics_partitions = list(six.iteritems(topics_partitions_dict))
+ request = OffsetFetchRequest[version](group_id, topics_partitions)
+ response = self._send_request_to_node(group_coordinator_id, request)
+ if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ # optionally we could retry if error_type.retriable
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+ # transform response into a dictionary with TopicPartition keys and
+ # OffsetAndMetada values--this is what the Java AdminClient returns
+ for topic, partitions in response.topics:
+ for partition, offset, metadata, error_code in partitions:
+ error_type = Errors.for_code(error_code)
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "Unable to fetch offsets for group_id {}, topic {}, partition {}"
+ .format(group_id, topic, partition))
+ group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
+ else:
+ raise NotImplementedError(
+ "Support for OffsetFetch v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return group_offsets_listing
+
+ # delete groups protocol not yet implemented
+ # Note: send the request to the group's coordinator.