summaryrefslogtreecommitdiff
path: root/kafka/admin/kafka.py
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-11-20 09:03:50 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-11-20 09:24:50 -0800
commitfcc800f96f14192c44b09d1d37108377dcaed245 (patch)
tree40ebfb2b5d6766ac21f74895652e412088e5f160 /kafka/admin/kafka.py
parent45196e31d5cbd4da02a81f0c459faee1f8165306 (diff)
downloadkafka-python-fcc800f96f14192c44b09d1d37108377dcaed245.tar.gz
Rename KafkaAdmin to KafkaAdminClient
Diffstat (limited to 'kafka/admin/kafka.py')
-rw-r--r--kafka/admin/kafka.py752
1 files changed, 0 insertions, 752 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
deleted file mode 100644
index c8abb4e..0000000
--- a/kafka/admin/kafka.py
+++ /dev/null
@@ -1,752 +0,0 @@
-from __future__ import absolute_import
-
-from collections import defaultdict
-import copy
-import logging
-import socket
-
-from kafka.vendor import six
-
-from kafka.client_async import KafkaClient, selectors
-import kafka.errors as Errors
-from kafka.errors import (
- IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
- UnrecognizedBrokerVersion)
-from kafka.metrics import MetricConfig, Metrics
-from kafka.protocol.admin import (
- CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
- ListGroupsRequest, DescribeGroupsRequest)
-from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
-from kafka.protocol.metadata import MetadataRequest
-from kafka.structs import TopicPartition, OffsetAndMetadata
-from kafka.version import __version__
-
-
-log = logging.getLogger(__name__)
-
-
-class KafkaAdmin(object):
- """A class for administering the Kafka cluster.
-
- Warning:
- This is an unstable interface that was recently added and is subject to
- change without warning. In particular, many methods currently return
- raw protocol tuples. In future releases, we plan to make these into
- nicer, more pythonic objects. Unfortunately, this will likely break
- those interfaces.
-
- The KafkaAdmin class will negotiate for the latest version of each message
- protocol format supported by both the kafka-python client library and the
- Kafka broker. Usage of optional fields from protocol versions that are not
- supported by the broker will result in IncompatibleBrokerVersion exceptions.
-
- Use of this class requires a minimum broker version >= 0.10.0.0.
-
- Keyword Arguments:
- bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
- strings) that the consumer should contact to bootstrap initial
- cluster metadata. This does not have to be the full node list.
- It just needs to have at least one broker that will respond to a
- Metadata API Request. Default port is 9092. If no servers are
- specified, will default to localhost:9092.
- client_id (str): a name for this client. This string is passed in
- each request to servers and can be used to identify specific
- server-side log entries that correspond to this client. Also
- submitted to GroupCoordinator for logging with respect to
- consumer group administration. Default: 'kafka-python-{version}'
- reconnect_backoff_ms (int): The amount of time in milliseconds to
- wait before attempting to reconnect to a given host.
- Default: 50.
- reconnect_backoff_max_ms (int): The maximum amount of time in
- milliseconds to wait when reconnecting to a broker that has
- repeatedly failed to connect. If provided, the backoff per host
- will increase exponentially for each consecutive connection
- failure, up to this maximum. To avoid connection storms, a
- randomization factor of 0.2 will be applied to the backoff
- resulting in a random range between 20% below and 20% above
- the computed value. Default: 1000.
- request_timeout_ms (int): Client request timeout in milliseconds.
- Default: 30000.
- connections_max_idle_ms: Close idle connections after the number of
- milliseconds specified by this config. The broker closes idle
- connections after connections.max.idle.ms, so this avoids hitting
- unexpected socket disconnected errors on the client.
- Default: 540000
- retry_backoff_ms (int): Milliseconds to backoff when retrying on
- errors. Default: 100.
- max_in_flight_requests_per_connection (int): Requests are pipelined
- to kafka brokers up to this number of maximum requests per
- broker connection. Default: 5.
- receive_buffer_bytes (int): The size of the TCP receive buffer
- (SO_RCVBUF) to use when reading data. Default: None (relies on
- system defaults). Java client defaults to 32768.
- send_buffer_bytes (int): The size of the TCP send buffer
- (SO_SNDBUF) to use when sending data. Default: None (relies on
- system defaults). Java client defaults to 131072.
- socket_options (list): List of tuple-arguments to socket.setsockopt
- to apply to broker connection sockets. Default:
- [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
- metadata_max_age_ms (int): The period of time in milliseconds after
- which we force a refresh of metadata even if we haven't seen any
- partition leadership changes to proactively discover any new
- brokers or partitions. Default: 300000
- security_protocol (str): Protocol used to communicate with brokers.
- Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
- ssl_context (ssl.SSLContext): Pre-configured SSLContext for wrapping
- socket connections. If provided, all other ssl_* configurations
- will be ignored. Default: None.
- ssl_check_hostname (bool): Flag to configure whether SSL handshake
- should verify that the certificate matches the broker's hostname.
- Default: True.
- ssl_cafile (str): Optional filename of CA file to use in certificate
- veriication. Default: None.
- ssl_certfile (str): Optional filename of file in PEM format containing
- the client certificate, as well as any CA certificates needed to
- establish the certificate's authenticity. Default: None.
- ssl_keyfile (str): Optional filename containing the client private key.
- Default: None.
- ssl_password (str): Optional password to be used when loading the
- certificate chain. Default: None.
- ssl_crlfile (str): Optional filename containing the CRL to check for
- certificate expiration. By default, no CRL check is done. When
- providing a file, only the leaf certificate will be checked against
- this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
- Default: None.
- api_version (tuple): Specify which Kafka API version to use. If set
- to None, KafkaClient will attempt to infer the broker version by
- probing various APIs. Example: (0, 10, 2). Default: None
- api_version_auto_timeout_ms (int): number of milliseconds to throw a
- timeout exception from the constructor when checking the broker
- api version. Only applies if api_version is None
- selector (selectors.BaseSelector): Provide a specific selector
- implementation to use for I/O multiplexing.
- Default: selectors.DefaultSelector
- metrics (kafka.metrics.Metrics): Optionally provide a metrics
- instance for capturing network IO stats. Default: None.
- metric_group_prefix (str): Prefix for metric names. Default: ''
- sasl_mechanism (str): string picking sasl mechanism when security_protocol
- is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
- Default: None
- sasl_plain_username (str): username for sasl PLAIN authentication.
- Default: None
- sasl_plain_password (str): password for sasl PLAIN authentication.
- Default: None
- sasl_kerberos_service_name (str): Service name to include in GSSAPI
- sasl mechanism handshake. Default: 'kafka'
-
- """
- DEFAULT_CONFIG = {
- # client configs
- 'bootstrap_servers': 'localhost',
- 'client_id': 'kafka-python-' + __version__,
- 'request_timeout_ms': 30000,
- 'connections_max_idle_ms': 9 * 60 * 1000,
- 'reconnect_backoff_ms': 50,
- 'reconnect_backoff_max_ms': 1000,
- 'max_in_flight_requests_per_connection': 5,
- 'receive_buffer_bytes': None,
- 'send_buffer_bytes': None,
- 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
- 'sock_chunk_bytes': 4096, # undocumented experimental option
- 'sock_chunk_buffer_count': 1000, # undocumented experimental option
- 'retry_backoff_ms': 100,
- 'metadata_max_age_ms': 300000,
- 'security_protocol': 'PLAINTEXT',
- 'ssl_context': None,
- 'ssl_check_hostname': True,
- 'ssl_cafile': None,
- 'ssl_certfile': None,
- 'ssl_keyfile': None,
- 'ssl_password': None,
- 'ssl_crlfile': None,
- 'api_version': None,
- 'api_version_auto_timeout_ms': 2000,
- 'selector': selectors.DefaultSelector,
- 'sasl_mechanism': None,
- 'sasl_plain_username': None,
- 'sasl_plain_password': None,
- 'sasl_kerberos_service_name': 'kafka',
-
- # metrics configs
- 'metric_reporters': [],
- 'metrics_num_samples': 2,
- 'metrics_sample_window_ms': 30000,
- }
-
- def __init__(self, **configs):
- log.debug("Starting KafkaAdmin interface.")
- extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
- if extra_configs:
- raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
-
- self.config = copy.copy(self.DEFAULT_CONFIG)
- self.config.update(configs)
-
- # Configure metrics
- metrics_tags = {'client-id': self.config['client_id']}
- metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
- time_window_ms=self.config['metrics_sample_window_ms'],
- tags=metrics_tags)
- reporters = [reporter() for reporter in self.config['metric_reporters']]
- self._metrics = Metrics(metric_config, reporters)
-
- self._client = KafkaClient(metrics=self._metrics,
- metric_group_prefix='admin',
- **self.config)
-
- # Get auto-discovered version from client if necessary
- if self.config['api_version'] is None:
- self.config['api_version'] = self._client.config['api_version']
-
- self._closed = False
- self._refresh_controller_id()
- log.debug("KafkaAdmin interface started.")
-
- def close(self):
- """Close the KafkaAdmin connection to the Kafka broker."""
- if not hasattr(self, '_closed') or self._closed:
- log.info("KafkaAdmin interface already closed.")
- return
-
- self._metrics.close()
- self._client.close()
- self._closed = True
- log.debug("KafkaAdmin interface has closed.")
-
- def _matching_api_version(self, operation):
- """Find the latest version of the protocol operation supported by both
- this library and the broker.
-
- This resolves to the lesser of either the latest api version this
- library supports, or the max version supported by the broker.
-
- :param operation: A list of protocol operation versions from kafka.protocol.
- :return: The max matching version number between client and broker.
- """
- version = min(len(operation) - 1,
- self._client.get_api_versions()[operation[0].API_KEY][1])
- if version < self._client.get_api_versions()[operation[0].API_KEY][0]:
- # max library version is less than min broker version. Currently,
- # no Kafka versions specify a min msg version. Maybe in the future?
- raise IncompatibleBrokerVersion(
- "No version of the '{}' Kafka protocol is supported by both the client and broker."
- .format(operation.__name__))
- return version
-
- def _validate_timeout(self, timeout_ms):
- """Validate the timeout is set or use the configuration default.
-
- :param timeout_ms: The timeout provided by api call, in milliseconds.
- :return: The timeout to use for the operation.
- """
- return timeout_ms or self.config['request_timeout_ms']
-
- def _refresh_controller_id(self):
- """Determine the Kafka cluster controller."""
- version = self._matching_api_version(MetadataRequest)
- if 1 <= version <= 6:
- request = MetadataRequest[version]()
- response = self._send_request_to_node(self._client.least_loaded_node(), request)
- controller_id = response.controller_id
- # verify the controller is new enough to support our requests
- controller_version = self._client.check_version(controller_id)
- if controller_version < (0, 10, 0):
- raise IncompatibleBrokerVersion(
- "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
- .format(controller_version))
- self._controller_id = controller_id
- else:
- raise UnrecognizedBrokerVersion(
- "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
- .format(version))
-
- def _find_group_coordinator_id(self, group_id):
- """Find the broker node_id of the coordinator of the given group.
-
- Sends a FindCoordinatorRequest message to the cluster. Will block until
- the FindCoordinatorResponse is received. Any errors are immediately
- raised.
-
- :param group_id: The consumer group ID. This is typically the group
- name as a string.
- :return: The node_id of the broker that is the coordinator.
- """
- # Note: Java may change how this is implemented in KAFKA-6791.
- #
- # TODO add support for dynamically picking version of
- # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
- # When I experimented with this, GroupCoordinatorResponse_v1 didn't
- # match GroupCoordinatorResponse_v0 and I couldn't figure out why.
- gc_request = GroupCoordinatorRequest[0](group_id)
- gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
- # use the extra error checking in add_group_coordinator() rather than
- # immediately returning the group coordinator.
- success = self._client.cluster.add_group_coordinator(group_id, gc_response)
- if not success:
- error_type = Errors.for_code(gc_response.error_code)
- assert error_type is not Errors.NoError
- # Note: When error_type.retriable, Java will retry... see
- # KafkaAdminClient's handleFindCoordinatorError method
- raise error_type(
- "Could not identify group coordinator for group_id '{}' from response '{}'."
- .format(group_id, gc_response))
- group_coordinator = self._client.cluster.coordinator_for_group(group_id)
- # will be None if the coordinator was never populated, which should never happen here
- assert group_coordinator is not None
- # will be -1 if add_group_coordinator() failed... but by this point the
- # error should have been raised.
- assert group_coordinator != -1
- return group_coordinator
-
- def _send_request_to_node(self, node_id, request):
- """Send a Kafka protocol message to a specific broker.
-
- Will block until the message result is received.
-
- :param node_id: The broker id to which to send the message.
- :param request: The message to send.
- :return: The Kafka protocol response for the message.
- :exception: The exception if the message could not be sent.
- """
- while not self._client.ready(node_id):
- # poll until the connection to broker is ready, otherwise send()
- # will fail with NodeNotReadyError
- self._client.poll()
- future = self._client.send(node_id, request)
- self._client.poll(future=future)
- if future.succeeded():
- return future.value
- else:
- raise future.exception # pylint: disable-msg=raising-bad-type
-
- def _send_request_to_controller(self, request):
- """Send a Kafka protocol message to the cluster controller.
-
- Will block until the message result is received.
-
- :param request: The message to send.
- :return: The Kafka protocol response for the message.
- """
- tries = 2 # in case our cached self._controller_id is outdated
- while tries:
- tries -= 1
- response = self._send_request_to_node(self._controller_id, request)
- # DeleteTopicsResponse returns topic_error_codes rather than topic_errors
- for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
- error_type = Errors.for_code(error_code)
- if tries and isinstance(error_type, NotControllerError):
- # No need to inspect the rest of the errors for
- # non-retriable errors because NotControllerError should
- # either be thrown for all errors or no errors.
- self._refresh_controller_id()
- break
- elif error_type is not Errors.NoError:
- raise error_type(
- "Request '{}' failed with response '{}'."
- .format(request, response))
- else:
- return response
- raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")
-
- @staticmethod
- def _convert_new_topic_request(new_topic):
- return (
- new_topic.name,
- new_topic.num_partitions,
- new_topic.replication_factor,
- [
- (partition_id, replicas) for partition_id, replicas in new_topic.replica_assignments.items()
- ],
- [
- (config_key, config_value) for config_key, config_value in new_topic.topic_configs.items()
- ]
- )
-
- def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
- """Create new topics in the cluster.
-
- :param new_topics: A list of NewTopic objects.
- :param timeout_ms: Milliseconds to wait for new topics to be created
- before the broker returns.
- :param validate_only: If True, don't actually create new topics.
- Not supported by all versions. Default: False
- :return: Appropriate version of CreateTopicResponse class.
- """
- version = self._matching_api_version(CreateTopicsRequest)
- timeout_ms = self._validate_timeout(timeout_ms)
- if version == 0:
- if validate_only:
- raise IncompatibleBrokerVersion(
- "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
- .format(self.config['api_version']))
- request = CreateTopicsRequest[version](
- create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
- timeout=timeout_ms
- )
- elif version <= 2:
- request = CreateTopicsRequest[version](
- create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
- timeout=timeout_ms,
- validate_only=validate_only
- )
- else:
- raise NotImplementedError(
- "Support for CreateTopics v{} has not yet been added to KafkaAdmin."
- .format(version))
- # TODO convert structs to a more pythonic interface
- # TODO raise exceptions if errors
- return self._send_request_to_controller(request)
-
- def delete_topics(self, topics, timeout_ms=None):
- """Delete topics from the cluster.
-
- :param topics: A list of topic name strings.
- :param timeout_ms: Milliseconds to wait for topics to be deleted
- before the broker returns.
- :return: Appropriate version of DeleteTopicsResponse class.
- """
- version = self._matching_api_version(DeleteTopicsRequest)
- timeout_ms = self._validate_timeout(timeout_ms)
- if version <= 1:
- request = DeleteTopicsRequest[version](
- topics=topics,
- timeout=timeout_ms
- )
- response = self._send_request_to_controller(request)
- else:
- raise NotImplementedError(
- "Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
- .format(version))
- return response
-
- # list topics functionality is in ClusterMetadata
- # Note: if implemented here, send the request to the least_loaded_node()
-
- # describe topics functionality is in ClusterMetadata
- # Note: if implemented here, send the request to the controller
-
- # describe cluster functionality is in ClusterMetadata
- # Note: if implemented here, send the request to the least_loaded_node()
-
- # describe_acls protocol not yet implemented
- # Note: send the request to the least_loaded_node()
-
- # create_acls protocol not yet implemented
- # Note: send the request to the least_loaded_node()
-
- # delete_acls protocol not yet implemented
- # Note: send the request to the least_loaded_node()
-
- @staticmethod
- def _convert_describe_config_resource_request(config_resource):
- return (
- config_resource.resource_type,
- config_resource.name,
- [
- config_key for config_key, config_value in config_resource.configs.items()
- ] if config_resource.configs else None
- )
-
- def describe_configs(self, config_resources, include_synonyms=False):
- """Fetch configuration parameters for one or more Kafka resources.
-
- :param config_resources: An list of ConfigResource objects.
- Any keys in ConfigResource.configs dict will be used to filter the
- result. Setting the configs dict to None will get all values. An
- empty dict will get zero values (as per Kafka protocol).
- :param include_synonyms: If True, return synonyms in response. Not
- supported by all versions. Default: False.
- :return: Appropriate version of DescribeConfigsResponse class.
- """
- version = self._matching_api_version(DescribeConfigsRequest)
- if version == 0:
- if include_synonyms:
- raise IncompatibleBrokerVersion(
- "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
- .format(self.config['api_version']))
- request = DescribeConfigsRequest[version](
- resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
- )
- elif version == 1:
- request = DescribeConfigsRequest[version](
- resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
- include_synonyms=include_synonyms
- )
- else:
- raise NotImplementedError(
- "Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
- .format(version))
- return self._send_request_to_node(self._client.least_loaded_node(), request)
-
- @staticmethod
- def _convert_alter_config_resource_request(config_resource):
- return (
- config_resource.resource_type,
- config_resource.name,
- [
- (config_key, config_value) for config_key, config_value in config_resource.configs.items()
- ]
- )
-
- def alter_configs(self, config_resources):
- """Alter configuration parameters of one or more Kafka resources.
-
- Warning:
- This is currently broken for BROKER resources because those must be
- sent to that specific broker, versus this always picks the
- least-loaded node. See the comment in the source code for details.
- We would happily accept a PR fixing this.
-
- :param config_resources: A list of ConfigResource objects.
- :return: Appropriate version of AlterConfigsResponse class.
- """
- version = self._matching_api_version(AlterConfigsRequest)
- if version == 0:
- request = AlterConfigsRequest[version](
- resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
- )
- else:
- raise NotImplementedError(
- "Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
- .format(version))
- # TODO the Java client has the note:
- # // We must make a separate AlterConfigs request for every BROKER resource we want to alter
- # // and send the request to that specific broker. Other resources are grouped together into
- # // a single request that may be sent to any broker.
- #
- # So this is currently broken as it always sends to the least_loaded_node()
- return self._send_request_to_node(self._client.least_loaded_node(), request)
-
- # alter replica logs dir protocol not yet implemented
- # Note: have to lookup the broker with the replica assignment and send the request to that broker
-
- # describe log dirs protocol not yet implemented
- # Note: have to lookup the broker with the replica assignment and send the request to that broker
-
- @staticmethod
- def _convert_create_partitions_request(topic_name, new_partitions):
- return (
- topic_name,
- (
- new_partitions.total_count,
- new_partitions.new_assignments
- )
- )
-
- def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False):
- """Create additional partitions for an existing topic.
-
- :param topic_partitions: A map of topic name strings to NewPartition objects.
- :param timeout_ms: Milliseconds to wait for new partitions to be
- created before the broker returns.
- :param validate_only: If True, don't actually create new partitions.
- Default: False
- :return: Appropriate version of CreatePartitionsResponse class.
- """
- version = self._matching_api_version(CreatePartitionsRequest)
- timeout_ms = self._validate_timeout(timeout_ms)
- if version == 0:
- request = CreatePartitionsRequest[version](
- topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
- timeout=timeout_ms,
- validate_only=validate_only
- )
- else:
- raise NotImplementedError(
- "Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
- .format(version))
- return self._send_request_to_controller(request)
-
- # delete records protocol not yet implemented
- # Note: send the request to the partition leaders
-
- # create delegation token protocol not yet implemented
- # Note: send the request to the least_loaded_node()
-
- # renew delegation token protocol not yet implemented
- # Note: send the request to the least_loaded_node()
-
- # expire delegation_token protocol not yet implemented
- # Note: send the request to the least_loaded_node()
-
- # describe delegation_token protocol not yet implemented
- # Note: send the request to the least_loaded_node()
-
- def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
- """Describe a set of consumer groups.
-
- Any errors are immediately raised.
-
- :param group_ids: A list of consumer group IDs. These are typically the
- group names as strings.
- :param group_coordinator_id: The node_id of the groups' coordinator
- broker. If set to None, it will query the cluster for each group to
- find that group's coordinator. Explicitly specifying this can be
- useful for avoiding extra network round trips if you already know
- the group coordinator. This is only useful when all the group_ids
- have the same coordinator, otherwise it will error. Default: None.
- :return: A list of group descriptions. For now the group descriptions
- are the raw results from the DescribeGroupsResponse. Long-term, we
- plan to change this to return namedtuples as well as decoding the
- partition assignments.
- """
- group_descriptions = []
- version = self._matching_api_version(DescribeGroupsRequest)
- for group_id in group_ids:
- if group_coordinator_id is not None:
- this_groups_coordinator_id = group_coordinator_id
- else:
- this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
- if version <= 1:
- # Note: KAFKA-6788 A potential optimization is to group the
- # request per coordinator and send one request with a list of
- # all consumer groups. Java still hasn't implemented this
- # because the error checking is hard to get right when some
- # groups error and others don't.
- request = DescribeGroupsRequest[version](groups=(group_id,))
- response = self._send_request_to_node(this_groups_coordinator_id, request)
- assert len(response.groups) == 1
- # TODO need to implement converting the response tuple into
- # a more accessible interface like a namedtuple and then stop
- # hardcoding tuple indices here. Several Java examples,
- # including KafkaAdminClient.java
- group_description = response.groups[0]
- error_code = group_description[0]
- error_type = Errors.for_code(error_code)
- # Java has the note: KAFKA-6789, we can retry based on the error code
- if error_type is not Errors.NoError:
- raise error_type(
- "Request '{}' failed with response '{}'."
- .format(request, response))
- # TODO Java checks the group protocol type, and if consumer
- # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
- # the members' partition assignments... that hasn't yet been
- # implemented here so just return the raw struct results
- group_descriptions.append(group_description)
- else:
- raise NotImplementedError(
- "Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
- .format(version))
- return group_descriptions
-
- def list_consumer_groups(self, broker_ids=None):
- """List all consumer groups known to the cluster.
-
- This returns a list of Consumer Group tuples. The tuples are
- composed of the consumer group name and the consumer group protocol
- type.
-
- Only consumer groups that store their offsets in Kafka are returned.
- The protocol type will be an empty string for groups created using
- Kafka < 0.9 APIs because, although they store their offsets in Kafka,
- they don't use Kafka for group coordination. For groups created using
- Kafka >= 0.9, the protocol type will typically be "consumer".
-
- As soon as any error is encountered, it is immediately raised.
-
- :param broker_ids: A list of broker node_ids to query for consumer
- groups. If set to None, will query all brokers in the cluster.
- Explicitly specifying broker(s) can be useful for determining which
- consumer groups are coordinated by those broker(s). Default: None
- :return list: List of tuples of Consumer Groups.
- :exception GroupCoordinatorNotAvailableError: The coordinator is not
- available, so cannot process requests.
- :exception GroupLoadInProgressError: The coordinator is loading and
- hence can't process requests.
- """
- # While we return a list, internally use a set to prevent duplicates
- # because if a group coordinator fails after being queried, and its
- # consumer groups move to new brokers that haven't yet been queried,
- # then the same group could be returned by multiple brokers.
- consumer_groups = set()
- if broker_ids is None:
- broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
- version = self._matching_api_version(ListGroupsRequest)
- if version <= 2:
- request = ListGroupsRequest[version]()
- for broker_id in broker_ids:
- response = self._send_request_to_node(broker_id, request)
- error_type = Errors.for_code(response.error_code)
- if error_type is not Errors.NoError:
- raise error_type(
- "Request '{}' failed with response '{}'."
- .format(request, response))
- consumer_groups.update(response.groups)
- else:
- raise NotImplementedError(
- "Support for ListGroups v{} has not yet been added to KafkaAdmin."
- .format(version))
- return list(consumer_groups)
-
- def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
- partitions=None):
- """Fetch Consumer Group Offsets.
-
- Note:
- This does not verify that the group_id or partitions actually exist
- in the cluster.
-
- As soon as any error is encountered, it is immediately raised.
-
- :param group_id: The consumer group id name for which to fetch offsets.
- :param group_coordinator_id: The node_id of the group's coordinator
- broker. If set to None, will query the cluster to find the group
- coordinator. Explicitly specifying this can be useful to prevent
- that extra network round trip if you already know the group
- coordinator. Default: None.
- :param partitions: A list of TopicPartitions for which to fetch
- offsets. On brokers >= 0.10.2, this can be set to None to fetch all
- known offsets for the consumer group. Default: None.
- :return dictionary: A dictionary with TopicPartition keys and
- OffsetAndMetada values. Partitions that are not specified and for
- which the group_id does not have a recorded offset are omitted. An
- offset value of `-1` indicates the group_id has no offset for that
- TopicPartition. A `-1` can only happen for partitions that are
- explicitly specified.
- """
- group_offsets_listing = {}
- if group_coordinator_id is None:
- group_coordinator_id = self._find_group_coordinator_id(group_id)
- version = self._matching_api_version(OffsetFetchRequest)
- if version <= 3:
- if partitions is None:
- if version <= 1:
- raise ValueError(
- """OffsetFetchRequest_v{} requires specifying the
- partitions for which to fetch offsets. Omitting the
- partitions is only supported on brokers >= 0.10.2.
- For details, see KIP-88.""".format(version))
- topics_partitions = None
- else:
- # transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])]
- topics_partitions_dict = defaultdict(set)
- for topic, partition in partitions:
- topics_partitions_dict[topic].add(partition)
- topics_partitions = list(six.iteritems(topics_partitions_dict))
- request = OffsetFetchRequest[version](group_id, topics_partitions)
- response = self._send_request_to_node(group_coordinator_id, request)
- if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
- error_type = Errors.for_code(response.error_code)
- if error_type is not Errors.NoError:
- # optionally we could retry if error_type.retriable
- raise error_type(
- "Request '{}' failed with response '{}'."
- .format(request, response))
- # transform response into a dictionary with TopicPartition keys and
- # OffsetAndMetada values--this is what the Java AdminClient returns
- for topic, partitions in response.topics:
- for partition, offset, metadata, error_code in partitions:
- error_type = Errors.for_code(error_code)
- if error_type is not Errors.NoError:
- raise error_type(
- "Unable to fetch offsets for group_id {}, topic {}, partition {}"
- .format(group_id, topic, partition))
- group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
- else:
- raise NotImplementedError(
- "Support for OffsetFetch v{} has not yet been added to KafkaAdmin."
- .format(version))
- return group_offsets_listing
-
- # delete groups protocol not yet implemented
- # Note: send the request to the group's coordinator.