summaryrefslogtreecommitdiff
path: root/kafka/admin/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r--kafka/admin/client.py55
1 files changed, 41 insertions, 14 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index d0fa845..e820587 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -1,6 +1,6 @@
from __future__ import absolute_import
-from collections import defaultdict
+from collections import defaultdict, namedtuple
import copy
import logging
import socket
@@ -8,7 +8,10 @@ import socket
from . import ConfigResourceType
from kafka.vendor import six
+from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
+ ACLResourcePatternType
from kafka.client_async import KafkaClient, selectors
+from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
@@ -19,9 +22,8 @@ from kafka.protocol.admin import (
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
-from kafka.structs import TopicPartition, OffsetAndMetadata
-from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
- ACLResourcePatternType
+from kafka.protocol.types import Array
+from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
from kafka.version import __version__
@@ -1000,22 +1002,47 @@ class KafkaAdminClient(object):
"""Process a DescribeGroupsResponse into a group description."""
if response.API_VERSION <= 3:
assert len(response.groups) == 1
- # TODO need to implement converting the response tuple into
- # a more accessible interface like a namedtuple and then stop
- # hardcoding tuple indices here. Several Java examples,
- # including KafkaAdminClient.java
- group_description = response.groups[0]
- error_code = group_description[0]
+ for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
+ if isinstance(response_field, Array):
+ described_groups = response.__dict__[response_name]
+ described_groups_field_schema = response_field.array_of
+ described_group = response.__dict__[response_name][0]
+ described_group_information_list = []
+ protocol_type_is_consumer = False
+ for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
+ if group_information_name == 'protocol_type':
+ protocol_type = described_group_information
+ protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
+ if isinstance(group_information_field, Array):
+ member_information_list = []
+ member_schema = group_information_field.array_of
+ for members in described_group_information:
+ member_information = []
+ for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
+ if protocol_type_is_consumer:
+ if member_name == 'member_metadata' and member:
+ member_information.append(ConsumerProtocolMemberMetadata.decode(member))
+ elif member_name == 'member_assignment' and member:
+ member_information.append(ConsumerProtocolMemberAssignment.decode(member))
+ else:
+ member_information.append(member)
+ member_info_tuple = MemberInformation._make(member_information)
+ member_information_list.append(member_info_tuple)
+ described_group_information_list.append(member_information_list)
+ else:
+ described_group_information_list.append(described_group_information)
+ # Version 3 of the DescribeGroups API introduced the "authorized_operations" field. This will cause the namedtuple to fail
+ # Therefore, appending a placeholder of None in it.
+ if response.API_VERSION <=2:
+ described_group_information_list.append(None)
+ group_description = GroupInformation._make(described_group_information_list)
+ error_code = group_description.error_code
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"DescribeGroupsResponse failed with response '{}'."
.format(response))
- # TODO Java checks the group protocol type, and if consumer
- # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
- # the members' partition assignments... that hasn't yet been
- # implemented here so just return the raw struct results
else:
raise NotImplementedError(
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."