diff options
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r-- | kafka/admin/client.py | 37 |
1 files changed, 27 insertions, 10 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 8afe95b..accbf14 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -435,7 +435,7 @@ class KafkaAdminClient(object): create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], timeout=timeout_ms ) - elif version <= 2: + elif version <= 3: request = CreateTopicsRequest[version]( create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], timeout=timeout_ms, @@ -459,7 +459,7 @@ class KafkaAdminClient(object): """ version = self._matching_api_version(DeleteTopicsRequest) timeout_ms = self._validate_timeout(timeout_ms) - if version <= 1: + if version <= 3: request = DeleteTopicsRequest[version]( topics=topics, timeout=timeout_ms @@ -803,7 +803,7 @@ class KafkaAdminClient(object): DescribeConfigsRequest[version](resources=topic_resources) )) - elif version == 1: + elif version <= 2: if len(broker_resources) > 0: for broker_resource in broker_resources: try: @@ -853,7 +853,7 @@ class KafkaAdminClient(object): :return: Appropriate version of AlterConfigsResponse class. """ version = self._matching_api_version(AlterConfigsRequest) - if version == 0: + if version <= 1: request = AlterConfigsRequest[version]( resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] ) @@ -901,7 +901,7 @@ class KafkaAdminClient(object): """ version = self._matching_api_version(CreatePartitionsRequest) timeout_ms = self._validate_timeout(timeout_ms) - if version == 0: + if version <= 1: request = CreatePartitionsRequest[version]( topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()], timeout=timeout_ms, @@ -928,7 +928,7 @@ class KafkaAdminClient(object): # describe delegation_token protocol not yet implemented # Note: send the request to the least_loaded_node() - def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id): + def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, include_authorized_operations=False): """Send a DescribeGroupsRequest to the group's coordinator. :param group_id: The group name as a string @@ -937,13 +937,24 @@ class KafkaAdminClient(object): :return: A message future. """ version = self._matching_api_version(DescribeGroupsRequest) - if version <= 1: + if version <= 2: + if include_authorized_operations: + raise IncompatibleBrokerVersion( + "include_authorized_operations requests " + "DescribeGroupsRequest >= v3, which is not " + "supported by Kafka {}".format(version) + ) # Note: KAFKA-6788 A potential optimization is to group the # request per coordinator and send one request with a list of # all consumer groups. Java still hasn't implemented this # because the error checking is hard to get right when some # groups error and others don't. request = DescribeGroupsRequest[version](groups=(group_id,)) + elif version <= 3: + request = DescribeGroupsRequest[version]( + groups=(group_id,), + include_authorized_operations=include_authorized_operations + ) else: raise NotImplementedError( "Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient." @@ -952,7 +963,7 @@ class KafkaAdminClient(object): def _describe_consumer_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" - if response.API_VERSION <= 1: + if response.API_VERSION <= 3: assert len(response.groups) == 1 # TODO need to implement converting the response tuple into # a more accessible interface like a namedtuple and then stop @@ -976,7 +987,7 @@ class KafkaAdminClient(object): .format(response.API_VERSION)) return group_description - def describe_consumer_groups(self, group_ids, group_coordinator_id=None): + def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): """Describe a set of consumer groups. Any errors are immediately raised. @@ -989,6 +1000,9 @@ class KafkaAdminClient(object): useful for avoiding extra network round trips if you already know the group coordinator. This is only useful when all the group_ids have the same coordinator, otherwise it will error. Default: None. + :param include_authorized_operatoins: Whether or not to include + information about the operations a group is allowed to perform. + Only supported on API version >= v3. Default: False. :return: A list of group descriptions. For now the group descriptions are the raw results from the DescribeGroupsResponse. Long-term, we plan to change this to return namedtuples as well as decoding the @@ -1001,7 +1015,10 @@ class KafkaAdminClient(object): this_groups_coordinator_id = group_coordinator_id else: this_groups_coordinator_id = self._find_coordinator_id(group_id) - f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id) + f = self._describe_consumer_groups_send_request( + group_id, + this_groups_coordinator_id, + include_authorized_operations) futures.append(f) self._wait_for_futures(futures) |