summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/admin/client.py79
1 files changed, 47 insertions, 32 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 4fd8a1b..433ea98 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -271,7 +271,48 @@ class KafkaAdminClient(object):
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
.format(version))
- def _find_group_coordinator_id(self, group_id):
+ def _find_coordinator_id_send_request(self, group_id):
+ """Send a FindCoordinatorRequest to a broker.
+
+ :param group_id: The consumer group ID. This is typically the group
+ name as a string.
+ :return: A message future
+ """
+ # TODO add support for dynamically picking version of
+ # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
+ # When I experimented with this, the coordinator value returned in
+ # GroupCoordinatorResponse_v1 didn't match the value returned by
+ # GroupCoordinatorResponse_v0 and I couldn't figure out why.
+ version = 0 # version = self._matching_api_version(GroupCoordinatorRequest)
+ if version <= 0:
+ request = GroupCoordinatorRequest[version](group_id)
+ else:
+ raise NotImplementedError(
+ "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_node(self._client.least_loaded_node(), request)
+
+ def _find_coordinator_id_process_response(self, response):
+ """Process a FindCoordinatorResponse.
+
+ :param response: a FindCoordinatorResponse.
+ :return: The node_id of the broker that is the coordinator.
+ """
+ if response.API_VERSION <= 0:
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ # Note: When error_type.retriable, Java will retry... see
+ # KafkaAdminClient's handleFindCoordinatorError method
+ raise error_type(
+ "FindCoordinatorRequest failed with response '{}'."
+ .format(response))
+ else:
+ raise NotImplementedError(
+ "Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(response.API_VERSION))
+ return response.coordinator_id
+
+ def _find_coordinator_id(self, group_id):
"""Find the broker node_id of the coordinator of the given group.
Sends a FindCoordinatorRequest message to the cluster. Will block until
@@ -283,35 +324,10 @@ class KafkaAdminClient(object):
:return: The node_id of the broker that is the coordinator.
"""
# Note: Java may change how this is implemented in KAFKA-6791.
- #
- # TODO add support for dynamically picking version of
- # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
- # When I experimented with this, GroupCoordinatorResponse_v1 didn't
- # match GroupCoordinatorResponse_v0 and I couldn't figure out why.
- gc_request = GroupCoordinatorRequest[0](group_id)
- future = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
-
+ future = self._find_coordinator_id_send_request(group_id)
self._wait_for_futures([future])
-
- gc_response = future.value
- # use the extra error checking in add_group_coordinator() rather than
- # immediately returning the group coordinator.
- success = self._client.cluster.add_group_coordinator(group_id, gc_response)
- if not success:
- error_type = Errors.for_code(gc_response.error_code)
- assert error_type is not Errors.NoError
- # Note: When error_type.retriable, Java will retry... see
- # KafkaAdminClient's handleFindCoordinatorError method
- raise error_type(
- "Could not identify group coordinator for group_id '{}' from response '{}'."
- .format(group_id, gc_response))
- group_coordinator = self._client.cluster.coordinator_for_group(group_id)
- # will be None if the coordinator was never populated, which should never happen here
- assert group_coordinator is not None
- # will be -1 if add_group_coordinator() failed... but by this point the
- # error should have been raised.
- assert group_coordinator != -1
- return group_coordinator
+ response = future.value
+ return self._find_coordinator_id_process_response(response)
def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.
@@ -329,7 +345,6 @@ class KafkaAdminClient(object):
self._client.poll()
return self._client.send(node_id, request)
-
def _send_request_to_controller(self, request):
"""Send a Kafka protocol message to the cluster controller.
@@ -678,7 +693,7 @@ class KafkaAdminClient(object):
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
- this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
+ this_groups_coordinator_id = self._find_coordinator_id(group_id)
f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id)
futures.append(f)
@@ -853,7 +868,7 @@ class KafkaAdminClient(object):
explicitly specified.
"""
if group_coordinator_id is None:
- group_coordinator_id = self._find_group_coordinator_id(group_id)
+ group_coordinator_id = self._find_coordinator_id(group_id)
future = self._list_consumer_group_offsets_send_request(
group_id, group_coordinator_id, partitions)
self._wait_for_futures([future])