summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/admin/client.py249
1 files changed, 155 insertions, 94 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 155ad21..5082f4d 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -349,7 +349,7 @@ class KafkaAdminClient(object):
# one of these attributes and that they always unpack into
# (topic, error_code) tuples.
topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors')
- else response.topic_error_codes)
+ else response.topic_error_codes)
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
@@ -501,8 +501,8 @@ class KafkaAdminClient(object):
future = self._send_request_to_node(self._client.least_loaded_node(), request)
self._wait_for_futures([future])
-
- return future.value
+ response = future.value
+ return response
@staticmethod
def _convert_alter_config_resource_request(config_resource):
@@ -544,8 +544,8 @@ class KafkaAdminClient(object):
future = self._send_request_to_node(self._client.least_loaded_node(), request)
self._wait_for_futures([future])
-
- return future.value
+ response = future.value
+ return response
# alter replica logs dir protocol not yet implemented
# Note: have to lookup the broker with the replica assignment and send the request to that broker
@@ -602,6 +602,54 @@ class KafkaAdminClient(object):
# describe delegation_token protocol not yet implemented
# Note: send the request to the least_loaded_node()
+ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id):
+ """Send a DescribeGroupsRequest to the group's coordinator.
+
+ :param group_id: The group name as a string
+ :param group_coordinator_id: The node_id of the groups' coordinator
+ broker.
+ :return: A message future.
+ """
+ version = self._matching_api_version(DescribeGroupsRequest)
+ if version <= 1:
+ # Note: KAFKA-6788 A potential optimization is to group the
+ # request per coordinator and send one request with a list of
+ # all consumer groups. Java still hasn't implemented this
+ # because the error checking is hard to get right when some
+ # groups error and others don't.
+ request = DescribeGroupsRequest[version](groups=(group_id,))
+ else:
+ raise NotImplementedError(
+ "Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_node(group_coordinator_id, request)
+
+ def _describe_consumer_groups_process_response(self, response):
+ """Process a DescribeGroupsResponse into a group description."""
+ if response.API_VERSION <= 1:
+ assert len(response.groups) == 1
+ # TODO need to implement converting the response tuple into
+ # a more accessible interface like a namedtuple and then stop
+ # hardcoding tuple indices here. Several Java examples,
+ # including KafkaAdminClient.java
+ group_description = response.groups[0]
+ error_code = group_description[0]
+ error_type = Errors.for_code(error_code)
+ # Java has the note: KAFKA-6789, we can retry based on the error code
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "DescribeGroupsResponse failed with response '{}'."
+ .format(response))
+ # TODO Java checks the group protocol type, and if consumer
+ # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
+ # the members' partition assignments... that hasn't yet been
+ # implemented here so just return the raw struct results
+ else:
+ raise NotImplementedError(
+ "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
+ .format(response.API_VERSION))
+ return group_description
+
def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
"""Describe a set of consumer groups.
@@ -622,51 +670,52 @@ class KafkaAdminClient(object):
"""
group_descriptions = []
futures = []
- version = self._matching_api_version(DescribeGroupsRequest)
for group_id in group_ids:
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
-
- if version <= 1:
- # Note: KAFKA-6788 A potential optimization is to group the
- # request per coordinator and send one request with a list of
- # all consumer groups. Java still hasn't implemented this
- # because the error checking is hard to get right when some
- # groups error and others don't.
- request = DescribeGroupsRequest[version](groups=(group_id,))
- futures.append(self._send_request_to_node(this_groups_coordinator_id, request))
- else:
- raise NotImplementedError(
- "Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
- .format(version))
+ f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id)
+ futures.append(f)
self._wait_for_futures(futures)
for future in futures:
response = future.value
- assert len(response.groups) == 1
- # TODO need to implement converting the response tuple into
- # a more accessible interface like a namedtuple and then stop
- # hardcoding tuple indices here. Several Java examples,
- # including KafkaAdminClient.java
- group_description = response.groups[0]
- error_code = group_description[0]
- error_type = Errors.for_code(error_code)
- # Java has the note: KAFKA-6789, we can retry based on the error code
- if error_type is not Errors.NoError:
- raise error_type(
- "Request '{}' failed with response '{}'."
- .format(request, response))
- # TODO Java checks the group protocol type, and if consumer
- # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
- # the members' partition assignments... that hasn't yet been
- # implemented here so just return the raw struct results
+ group_description = self._describe_consumer_groups_process_response(response)
group_descriptions.append(group_description)
return group_descriptions
+ def _list_consumer_groups_send_request(self, broker_id):
+ """Send a ListGroupsRequest to a broker.
+
+ :param broker_id: The broker's node_id.
+ :return: A message future
+ """
+ version = self._matching_api_version(ListGroupsRequest)
+ if version <= 2:
+ request = ListGroupsRequest[version]()
+ else:
+ raise NotImplementedError(
+ "Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_node(broker_id, request)
+
+ def _list_consumer_groups_process_response(self, response):
+ """Process a ListGroupsResponse into a list of groups."""
+ if response.API_VERSION <= 2:
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "ListGroupsRequest failed with response '{}'."
+ .format(response))
+ else:
+ raise NotImplementedError(
+ "Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient."
+ .format(response.API_VERSION))
+ return response.groups
+
def list_consumer_groups(self, broker_ids=None):
"""List all consumer groups known to the cluster.
@@ -697,60 +746,24 @@ class KafkaAdminClient(object):
# consumer groups move to new brokers that haven't yet been queried,
# then the same group could be returned by multiple brokers.
consumer_groups = set()
- futures = []
if broker_ids is None:
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
- version = self._matching_api_version(ListGroupsRequest)
- if version <= 2:
- request = ListGroupsRequest[version]()
- for broker_id in broker_ids:
- futures.append(self._send_request_to_node(broker_id, request))
-
- self._wait_for_futures(futures)
-
- for future in futures:
- response = future.value
- error_type = Errors.for_code(response.error_code)
- if error_type is not Errors.NoError:
- raise error_type(
- "Request '{}' failed with response '{}'."
- .format(request, response))
- consumer_groups.update(response.groups)
- else:
- raise NotImplementedError(
- "Support for ListGroups v{} has not yet been added to KafkaAdminClient."
- .format(version))
+ futures = [self._list_consumer_groups_send_request(b) for b in broker_ids]
+ self._wait_for_futures(futures)
+ for f in futures:
+ response = f.value
+ consumer_groups.update(self._list_consumer_groups_process_response(response))
return list(consumer_groups)
- def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
- partitions=None):
- """Fetch Consumer Group Offsets.
-
- Note:
- This does not verify that the group_id or partitions actually exist
- in the cluster.
-
- As soon as any error is encountered, it is immediately raised.
+ def _list_consumer_group_offsets_send_request(self, group_id,
+ group_coordinator_id, partitions=None):
+ """Send an OffsetFetchRequest to a broker.
:param group_id: The consumer group id name for which to fetch offsets.
:param group_coordinator_id: The node_id of the group's coordinator
- broker. If set to None, will query the cluster to find the group
- coordinator. Explicitly specifying this can be useful to prevent
- that extra network round trip if you already know the group
- coordinator. Default: None.
- :param partitions: A list of TopicPartitions for which to fetch
- offsets. On brokers >= 0.10.2, this can be set to None to fetch all
- known offsets for the consumer group. Default: None.
- :return dictionary: A dictionary with TopicPartition keys and
- OffsetAndMetada values. Partitions that are not specified and for
- which the group_id does not have a recorded offset are omitted. An
- offset value of `-1` indicates the group_id has no offset for that
- TopicPartition. A `-1` can only happen for partitions that are
- explicitly specified.
+ broker.
+ :return: A message future
"""
- group_offsets_listing = {}
- if group_coordinator_id is None:
- group_coordinator_id = self._find_group_coordinator_id(group_id)
version = self._matching_api_version(OffsetFetchRequest)
if version <= 3:
if partitions is None:
@@ -768,32 +781,80 @@ class KafkaAdminClient(object):
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
request = OffsetFetchRequest[version](group_id, topics_partitions)
- future = self._send_request_to_node(group_coordinator_id, request)
- self._wait_for_futures([future])
- response = future.value
+ else:
+ raise NotImplementedError(
+ "Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_node(group_coordinator_id, request)
+
+ def _list_consumer_group_offsets_process_response(self, response):
+ """Process an OffsetFetchResponse.
- if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
+ :param response: an OffsetFetchResponse.
+ :return: A dictionary composed of TopicPartition keys and
+ OffsetAndMetada values.
+ """
+ if response.API_VERSION <= 3:
+
+ # OffsetFetchResponse_v1 lacks a top-level error_code
+ if response.API_VERSION > 1:
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
# optionally we could retry if error_type.retriable
raise error_type(
- "Request '{}' failed with response '{}'."
- .format(request, response))
+ "OffsetFetchResponse failed with response '{}'."
+ .format(response))
+
# transform response into a dictionary with TopicPartition keys and
# OffsetAndMetada values--this is what the Java AdminClient returns
+ offsets = {}
for topic, partitions in response.topics:
for partition, offset, metadata, error_code in partitions:
error_type = Errors.for_code(error_code)
if error_type is not Errors.NoError:
raise error_type(
- "Unable to fetch offsets for group_id {}, topic {}, partition {}"
- .format(group_id, topic, partition))
- group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
+ "Unable to fetch consumer group offsets for topic {}, partition {}"
+ .format(topic, partition))
+ offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
else:
raise NotImplementedError(
- "Support for OffsetFetch v{} has not yet been added to KafkaAdminClient."
- .format(version))
- return group_offsets_listing
+ "Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient."
+ .format(response.API_VERSION))
+ return offsets
+
+ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
+ partitions=None):
+ """Fetch Consumer Offsets for a single consumer group.
+
+ Note:
+ This does not verify that the group_id or partitions actually exist
+ in the cluster.
+
+ As soon as any error is encountered, it is immediately raised.
+
+ :param group_id: The consumer group id name for which to fetch offsets.
+ :param group_coordinator_id: The node_id of the group's coordinator
+ broker. If set to None, will query the cluster to find the group
+ coordinator. Explicitly specifying this can be useful to prevent
+ that extra network round trip if you already know the group
+ coordinator. Default: None.
+ :param partitions: A list of TopicPartitions for which to fetch
+ offsets. On brokers >= 0.10.2, this can be set to None to fetch all
+ known offsets for the consumer group. Default: None.
+ :return dictionary: A dictionary with TopicPartition keys and
+ OffsetAndMetada values. Partitions that are not specified and for
+ which the group_id does not have a recorded offset are omitted. An
+ offset value of `-1` indicates the group_id has no offset for that
+ TopicPartition. A `-1` can only happen for partitions that are
+ explicitly specified.
+ """
+ if group_coordinator_id is None:
+ group_coordinator_id = self._find_group_coordinator_id(group_id)
+ future = self._list_consumer_group_offsets_send_request(
+ group_id, group_coordinator_id, partitions)
+ self._wait_for_futures([future])
+ response = future.value
+ return self._list_consumer_group_offsets_process_response(response)
# delete groups protocol not yet implemented
# Note: send the request to the group's coordinator.