diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2018-11-17 02:53:08 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-11-18 08:41:59 -0800 |
commit | 3eba52d65734da76858a5511aab19c6ed5e2bfe2 (patch) | |
tree | 55815d7d6628341db02026d6e90e668c2e0d39c8 | |
parent | 09c1bcd7e27f2d59f3ba2a7a1e31a8eb82d27feb (diff) | |
download | kafka-python-add-list-consumer-groups-offsets.tar.gz |
Add list_consumer_group_offsets()add-list-consumer-groups-offsets
Support fetching the offsets of a consumer group.
Note: As far as I can tell (the Java code is a little inscrutable), the
Java AdminClient doesn't allow specifying the `coordinator_id` or the
`partitions`.
But I decided to include them because they provide a lot of additional
flexibility:
1. allowing users to specify the partitions allows this method to be used even for
older brokers that don't support the OffsetFetchRequest_v2
2. allowing users to specify the coordinator ID gives them a way to
bypass a network round trip. This method will frequently be used for
monitoring, and if you've got 1,000 consumer groups that are being
monitored once a minute, that's ~1.5M requests a day that are
unnecessarily duplicated as the coordinator doesn't change unless
there's an error.
-rw-r--r-- | kafka/admin/kafka.py | 81 | ||||
-rw-r--r-- | kafka/structs.py | 1 |
2 files changed, 80 insertions, 2 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index b997ea2..714669e 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -1,9 +1,14 @@ from __future__ import absolute_import +from collections import defaultdict import copy import logging import socket + +from kafka.vendor import six + from kafka.client_async import KafkaClient, selectors +import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError, NodeNotReadyError, NotControllerError) @@ -11,8 +16,9 @@ from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest) -from kafka.protocol.commit import GroupCoordinatorRequest +from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest +from kafka.structs import TopicPartition, OffsetAndMetadata from kafka.version import __version__ log = logging.getLogger(__name__) @@ -540,4 +546,75 @@ class KafkaAdmin(object): .format(version)) return self._send(request) - # delete groups protocol not implemented + def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, + partitions=None): + """Fetch Consumer Group Offsets. + + Note: + This does not verify that the group_id or partitions actually exist + in the cluster. + + As soon as any error is encountered, it is immediately raised. + + :param group_id: The consumer group id name for which to fetch offsets. + :param group_coordinator_id: The node_id of the group's coordinator + broker. If set to None, will query the cluster to find the group + coordinator. Explicitly specifying this can be useful to prevent + that extra network round trip if you already know the group + coordinator. Default: None. + :param partitions: A list of TopicPartitions for which to fetch + offsets. On brokers >= 0.10.2, this can be set to None to fetch all + known offsets for the consumer group. Default: None. + :return dictionary: A dictionary with TopicPartition keys and + OffsetAndMetada values. Partitions that are not specified and for + which the group_id does not have a recorded offset are omitted. An + offset value of `-1` indicates the group_id has no offset for that + TopicPartition. A `-1` can only happen for partitions that are + explicitly specified. + """ + group_offsets_listing = {} + if group_coordinator_id is None: + group_coordinator_id = self._find_group_coordinator_id(group_id) + version = self._matching_api_version(OffsetFetchRequest) + if version <= 3: + if partitions is None: + if version <= 1: + raise ValueError( + """OffsetFetchRequest_v{} requires specifying the + partitions for which to fetch offsets. Omitting the + partitions is only supported on brokers >= 0.10.2. + For details, see KIP-88.""".format(version)) + topics_partitions = None + else: + # transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])] + topics_partitions_dict = defaultdict(set) + for topic, partition in partitions: + topics_partitions_dict[topic].add(partition) + topics_partitions = list(six.iteritems(topics_partitions_dict)) + request = OffsetFetchRequest[version](group_id, topics_partitions) + response = self._send_request_to_node(group_coordinator_id, request) + if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + # optionally we could retry if error_type.retriable + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + # transform response into a dictionary with TopicPartition keys and + # OffsetAndMetada values--this is what the Java AdminClient returns + for topic, partitions in response.topics: + for partition, offset, metadata, error_code in partitions: + error_type = Errors.for_code(error_code) + if error_type is not Errors.NoError: + raise error_type( + "Unable to fetch offsets for group_id {}, topic {}, partition {}" + .format(group_id, topic, partition)) + group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata) + else: + raise NotImplementedError( + "Support for OffsetFetch v{} has not yet been added to KafkaAdmin." + .format(version)) + return group_offsets_listing + + # delete groups protocol not yet implemented + # Note: send the request to the group's coordinator. diff --git a/kafka/structs.py b/kafka/structs.py index e15e92e..baacbcd 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -72,6 +72,7 @@ PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr", "error"]) OffsetAndMetadata = namedtuple("OffsetAndMetadata", + # TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata) ["offset", "metadata"]) OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", |