diff options
-rw-r--r-- | kafka/admin/client.py | 249 |
1 files changed, 155 insertions, 94 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 155ad21..5082f4d 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -349,7 +349,7 @@ class KafkaAdminClient(object): # one of these attributes and that they always unpack into # (topic, error_code) tuples. topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors') - else response.topic_error_codes) + else response.topic_error_codes) # Also small py2/py3 compatibility -- py3 can ignore extra values # during unpack via: for x, y, *rest in list_of_values. py2 cannot. # So for now we have to map across the list and explicitly drop any @@ -501,8 +501,8 @@ class KafkaAdminClient(object): future = self._send_request_to_node(self._client.least_loaded_node(), request) self._wait_for_futures([future]) - - return future.value + response = future.value + return response @staticmethod def _convert_alter_config_resource_request(config_resource): @@ -544,8 +544,8 @@ class KafkaAdminClient(object): future = self._send_request_to_node(self._client.least_loaded_node(), request) self._wait_for_futures([future]) - - return future.value + response = future.value + return response # alter replica logs dir protocol not yet implemented # Note: have to lookup the broker with the replica assignment and send the request to that broker @@ -602,6 +602,54 @@ class KafkaAdminClient(object): # describe delegation_token protocol not yet implemented # Note: send the request to the least_loaded_node() + def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id): + """Send a DescribeGroupsRequest to the group's coordinator. + + :param group_id: The group name as a string + :param group_coordinator_id: The node_id of the groups' coordinator + broker. + :return: A message future. + """ + version = self._matching_api_version(DescribeGroupsRequest) + if version <= 1: + # Note: KAFKA-6788 A potential optimization is to group the + # request per coordinator and send one request with a list of + # all consumer groups. Java still hasn't implemented this + # because the error checking is hard to get right when some + # groups error and others don't. + request = DescribeGroupsRequest[version](groups=(group_id,)) + else: + raise NotImplementedError( + "Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient." + .format(version)) + return self._send_request_to_node(group_coordinator_id, request) + + def _describe_consumer_groups_process_response(self, response): + """Process a DescribeGroupsResponse into a group description.""" + if response.API_VERSION <= 1: + assert len(response.groups) == 1 + # TODO need to implement converting the response tuple into + # a more accessible interface like a namedtuple and then stop + # hardcoding tuple indices here. Several Java examples, + # including KafkaAdminClient.java + group_description = response.groups[0] + error_code = group_description[0] + error_type = Errors.for_code(error_code) + # Java has the note: KAFKA-6789, we can retry based on the error code + if error_type is not Errors.NoError: + raise error_type( + "DescribeGroupsResponse failed with response '{}'." + .format(response)) + # TODO Java checks the group protocol type, and if consumer + # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes + # the members' partition assignments... that hasn't yet been + # implemented here so just return the raw struct results + else: + raise NotImplementedError( + "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + return group_description + def describe_consumer_groups(self, group_ids, group_coordinator_id=None): """Describe a set of consumer groups. @@ -622,51 +670,52 @@ class KafkaAdminClient(object): """ group_descriptions = [] futures = [] - version = self._matching_api_version(DescribeGroupsRequest) for group_id in group_ids: if group_coordinator_id is not None: this_groups_coordinator_id = group_coordinator_id else: this_groups_coordinator_id = self._find_group_coordinator_id(group_id) - - if version <= 1: - # Note: KAFKA-6788 A potential optimization is to group the - # request per coordinator and send one request with a list of - # all consumer groups. Java still hasn't implemented this - # because the error checking is hard to get right when some - # groups error and others don't. - request = DescribeGroupsRequest[version](groups=(group_id,)) - futures.append(self._send_request_to_node(this_groups_coordinator_id, request)) - else: - raise NotImplementedError( - "Support for DescribeGroups v{} has not yet been added to KafkaAdminClient." - .format(version)) + f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id) + futures.append(f) self._wait_for_futures(futures) for future in futures: response = future.value - assert len(response.groups) == 1 - # TODO need to implement converting the response tuple into - # a more accessible interface like a namedtuple and then stop - # hardcoding tuple indices here. Several Java examples, - # including KafkaAdminClient.java - group_description = response.groups[0] - error_code = group_description[0] - error_type = Errors.for_code(error_code) - # Java has the note: KAFKA-6789, we can retry based on the error code - if error_type is not Errors.NoError: - raise error_type( - "Request '{}' failed with response '{}'." - .format(request, response)) - # TODO Java checks the group protocol type, and if consumer - # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes - # the members' partition assignments... that hasn't yet been - # implemented here so just return the raw struct results + group_description = self._describe_consumer_groups_process_response(response) group_descriptions.append(group_description) return group_descriptions + def _list_consumer_groups_send_request(self, broker_id): + """Send a ListGroupsRequest to a broker. + + :param broker_id: The broker's node_id. + :return: A message future + """ + version = self._matching_api_version(ListGroupsRequest) + if version <= 2: + request = ListGroupsRequest[version]() + else: + raise NotImplementedError( + "Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient." + .format(version)) + return self._send_request_to_node(broker_id, request) + + def _list_consumer_groups_process_response(self, response): + """Process a ListGroupsResponse into a list of groups.""" + if response.API_VERSION <= 2: + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + raise error_type( + "ListGroupsRequest failed with response '{}'." + .format(response)) + else: + raise NotImplementedError( + "Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + return response.groups + def list_consumer_groups(self, broker_ids=None): """List all consumer groups known to the cluster. @@ -697,60 +746,24 @@ class KafkaAdminClient(object): # consumer groups move to new brokers that haven't yet been queried, # then the same group could be returned by multiple brokers. consumer_groups = set() - futures = [] if broker_ids is None: broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()] - version = self._matching_api_version(ListGroupsRequest) - if version <= 2: - request = ListGroupsRequest[version]() - for broker_id in broker_ids: - futures.append(self._send_request_to_node(broker_id, request)) - - self._wait_for_futures(futures) - - for future in futures: - response = future.value - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - raise error_type( - "Request '{}' failed with response '{}'." - .format(request, response)) - consumer_groups.update(response.groups) - else: - raise NotImplementedError( - "Support for ListGroups v{} has not yet been added to KafkaAdminClient." - .format(version)) + futures = [self._list_consumer_groups_send_request(b) for b in broker_ids] + self._wait_for_futures(futures) + for f in futures: + response = f.value + consumer_groups.update(self._list_consumer_groups_process_response(response)) return list(consumer_groups) - def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, - partitions=None): - """Fetch Consumer Group Offsets. - - Note: - This does not verify that the group_id or partitions actually exist - in the cluster. - - As soon as any error is encountered, it is immediately raised. + def _list_consumer_group_offsets_send_request(self, group_id, + group_coordinator_id, partitions=None): + """Send an OffsetFetchRequest to a broker. :param group_id: The consumer group id name for which to fetch offsets. :param group_coordinator_id: The node_id of the group's coordinator - broker. If set to None, will query the cluster to find the group - coordinator. Explicitly specifying this can be useful to prevent - that extra network round trip if you already know the group - coordinator. Default: None. - :param partitions: A list of TopicPartitions for which to fetch - offsets. On brokers >= 0.10.2, this can be set to None to fetch all - known offsets for the consumer group. Default: None. - :return dictionary: A dictionary with TopicPartition keys and - OffsetAndMetada values. Partitions that are not specified and for - which the group_id does not have a recorded offset are omitted. An - offset value of `-1` indicates the group_id has no offset for that - TopicPartition. A `-1` can only happen for partitions that are - explicitly specified. + broker. + :return: A message future """ - group_offsets_listing = {} - if group_coordinator_id is None: - group_coordinator_id = self._find_group_coordinator_id(group_id) version = self._matching_api_version(OffsetFetchRequest) if version <= 3: if partitions is None: @@ -768,32 +781,80 @@ class KafkaAdminClient(object): topics_partitions_dict[topic].add(partition) topics_partitions = list(six.iteritems(topics_partitions_dict)) request = OffsetFetchRequest[version](group_id, topics_partitions) - future = self._send_request_to_node(group_coordinator_id, request) - self._wait_for_futures([future]) - response = future.value + else: + raise NotImplementedError( + "Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient." + .format(version)) + return self._send_request_to_node(group_coordinator_id, request) + + def _list_consumer_group_offsets_process_response(self, response): + """Process an OffsetFetchResponse. - if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code + :param response: an OffsetFetchResponse. + :return: A dictionary composed of TopicPartition keys and + OffsetAndMetada values. + """ + if response.API_VERSION <= 3: + + # OffsetFetchResponse_v1 lacks a top-level error_code + if response.API_VERSION > 1: error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: # optionally we could retry if error_type.retriable raise error_type( - "Request '{}' failed with response '{}'." - .format(request, response)) + "OffsetFetchResponse failed with response '{}'." + .format(response)) + # transform response into a dictionary with TopicPartition keys and # OffsetAndMetada values--this is what the Java AdminClient returns + offsets = {} for topic, partitions in response.topics: for partition, offset, metadata, error_code in partitions: error_type = Errors.for_code(error_code) if error_type is not Errors.NoError: raise error_type( - "Unable to fetch offsets for group_id {}, topic {}, partition {}" - .format(group_id, topic, partition)) - group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata) + "Unable to fetch consumer group offsets for topic {}, partition {}" + .format(topic, partition)) + offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata) else: raise NotImplementedError( - "Support for OffsetFetch v{} has not yet been added to KafkaAdminClient." - .format(version)) - return group_offsets_listing + "Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + return offsets + + def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, + partitions=None): + """Fetch Consumer Offsets for a single consumer group. + + Note: + This does not verify that the group_id or partitions actually exist + in the cluster. + + As soon as any error is encountered, it is immediately raised. + + :param group_id: The consumer group id name for which to fetch offsets. + :param group_coordinator_id: The node_id of the group's coordinator + broker. If set to None, will query the cluster to find the group + coordinator. Explicitly specifying this can be useful to prevent + that extra network round trip if you already know the group + coordinator. Default: None. + :param partitions: A list of TopicPartitions for which to fetch + offsets. On brokers >= 0.10.2, this can be set to None to fetch all + known offsets for the consumer group. Default: None. + :return dictionary: A dictionary with TopicPartition keys and + OffsetAndMetada values. Partitions that are not specified and for + which the group_id does not have a recorded offset are omitted. An + offset value of `-1` indicates the group_id has no offset for that + TopicPartition. A `-1` can only happen for partitions that are + explicitly specified. + """ + if group_coordinator_id is None: + group_coordinator_id = self._find_group_coordinator_id(group_id) + future = self._list_consumer_group_offsets_send_request( + group_id, group_coordinator_id, partitions) + self._wait_for_futures([future]) + response = future.value + return self._list_consumer_group_offsets_process_response(response) # delete groups protocol not yet implemented # Note: send the request to the group's coordinator. |