summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/admin/kafka.py63
1 files changed, 51 insertions, 12 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
index b997ea2..674ddf5 100644
--- a/kafka/admin/kafka.py
+++ b/kafka/admin/kafka.py
@@ -4,6 +4,7 @@ import copy
import logging
import socket
from kafka.client_async import KafkaClient, selectors
+import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
NodeNotReadyError, NotControllerError)
@@ -509,22 +510,60 @@ class KafkaAdmin(object):
# describe delegation_token protocol not implemented
- def describe_consumer_groups(self, group_ids):
+ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
"""Describe a set of consumer groups.
- :param group_ids: A list of consumer group id names
- :return: Appropriate version of DescribeGroupsResponse class
+ Any errors are immediately raised.
+
+ :param group_ids: A list of consumer group IDs. These are typically the
+ group names as strings.
+ :param group_coordinator_id: The node_id of the groups' coordinator
+ broker. If set to None, it will query the cluster for each group to
+ find that group's coordinator. Explicitly specifying this can be
+ useful for avoiding extra network round trips if you already know
+ the group coordinator. This is only useful when all the group_ids
+ have the same coordinator, otherwise it will error. Default: None.
+ :return: A list of group descriptions. For now the group descriptions
+ are the raw results from the DescribeGroupsResponse. Long-term, we
+ plan to change this to return namedtuples as well as decoding the
+ partition assignments.
"""
+ group_descriptions = []
version = self._matching_api_version(DescribeGroupsRequest)
- if version <= 1:
- request = DescribeGroupsRequest[version](
- groups = group_ids
- )
- else:
- raise NotImplementedError(
- "Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
- .format(version))
- return self._send(request)
+ for group_id in group_ids:
+ if group_coordinator_id is None:
+ this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
+ if version <= 1:
+ # Note: KAFKA-6788 A potential optimization is to group the
+ # request per coordinator and send one request with a list of
+ # all consumer groups. Java still hasn't implemented this
+ # because the error checking is hard to get right when some
+ # groups error and others don't.
+ request = DescribeGroupsRequest[version](groups=(group_id,))
+ response = self._send_request_to_node(this_groups_coordinator_id, request)
+ assert len(response.groups) == 1
+ # TODO need to implement converting the response tuple into
+ # a more accessible interface like a namedtuple and then stop
+ # hardcoding tuple indices here. Several Java examples,
+ # including KafkaAdminClient.java
+ group_description = response.groups[0]
+ error_code = group_description[0]
+ error_type = Errors.for_code(error_code)
+ # Java has the note: KAFKA-6789, we can retry based on the error code
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+ # TODO Java checks the group protocol type, and if consumer
+ # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
+ # the members' partition assignments... that hasn't yet been
+ # implemented here so just return the raw struct results
+ group_descriptions.append(group_description)
+ else:
+ raise NotImplementedError(
+ "Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
+ .format(version))
+ return group_descriptions
def list_consumer_groups(self):
"""List all consumer groups known to the cluster.