summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/admin/kafka.py44
1 files changed, 39 insertions, 5 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
index 224a660..05f2873 100644
--- a/kafka/admin/kafka.py
+++ b/kafka/admin/kafka.py
@@ -575,20 +575,54 @@ class KafkaAdmin(object):
# TODO this is completely broken, as it needs to send to the group coordinator
# return self._send(request)
- def list_consumer_groups(self):
+ def list_consumer_groups(self, broker_ids=None):
"""List all consumer groups known to the cluster.
- :return: Appropriate version of ListGroupsResponse class
+ This returns a list of Consumer Group tuples. The tuples are
+ composed of the consumer group name and the consumer group protocol
+ type.
+
+ Only consumer groups that store their offsets in Kafka are returned.
+ The protocol type will be an empty string for groups created using
+ Kafka < 0.9 APIs because, although they store their offsets in Kafka,
+ they don't use Kafka for group coordination. For groups created using
+ Kafka >= 0.9, the protocol type will typically be "consumer".
+
+ As soon as any error is encountered, it is immediately raised.
+
+ :param broker_ids: A list of broker node_ids to query for consumer
+ groups. If set to None, will query all brokers in the cluster.
+ Explicitly specifying broker(s) can be useful for determining which
+ consumer groups are coordinated by those broker(s). Default: None
+ :return list: List of tuples of Consumer Groups.
+ :exception GroupCoordinatorNotAvailableError: The coordinator is not
+ available, so cannot process requests.
+ :exception GroupLoadInProgressError: The coordinator is loading and
+ hence can't process requests.
"""
+ # While we return a list, internally use a set to prevent duplicates
+ # because if a group coordinator fails after being queried, and its
+ # consumer groups move to new brokers that haven't yet been queried,
+ # then the same group could be returned by multiple brokers.
+ consumer_groups = set()
+ if broker_ids is None:
+ broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
version = self._matching_api_version(ListGroupsRequest)
- if version <= 1:
+ if version <= 2:
request = ListGroupsRequest[version]()
+ for broker_id in broker_ids:
+ response = self._send_request_to_node(broker_id, request)
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+ consumer_groups.update(response.groups)
else:
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
.format(version))
- # TODO this is completely broken, as it needs to send to the group coordinator
- # return self._send(request)
+ return list(consumer_groups)
def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
partitions=None):