summaryrefslogtreecommitdiff
path: root/kafka/admin/kafka.py
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-11-17 02:24:26 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-11-18 08:39:40 -0800
commit98ea78875f04fa8a655780f77a701f409c510696 (patch)
tree894a91eab760b2d91bf174d5d15133a91faa3e1e /kafka/admin/kafka.py
parentd67157cb9a032a6f0493cea128bbcd0528f3e640 (diff)
downloadkafka-python-add-group-coordinator-lookup.tar.gz
Add group coordinator lookupadd-group-coordinator-lookup
We need a way to send a request to the group coordinator. I spent a day and a half trying to implement a `_send_request_to_group_coordinator()` that included: 1. caching the value of the group coordinator so that it wouldn't have to be repeatedly looked up on every call. This is particularly important because the `list_consumer_groups()`, `list_consumer_group_offsets()`, and `describe_consumer_groups()` will frequently be used by monitoring scripts. I know across the production clusters that I support, using a cached value will save ~1M calls per day. 2. clean and consistent error handling. This is difficult because the responses are inconsistent about error codes. Some have a top-level error code, some bury it within the description of the actual item. 3. Avoiding tight coupling between this method and the request/response classes... the custom parsing logic for errors etc, given that it's non-standard, should live in the callers, not here. So finally I gave up and just went with this simpler solution and made it so the callers can optionally bypass this if they somehow already know the group coordinator.
Diffstat (limited to 'kafka/admin/kafka.py')
-rw-r--r--kafka/admin/kafka.py40
1 files changed, 40 insertions, 0 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
index 5ce8630..3dc2e44 100644
--- a/kafka/admin/kafka.py
+++ b/kafka/admin/kafka.py
@@ -4,6 +4,7 @@ import copy
import logging
import socket
from kafka.client_async import KafkaClient, selectors
+import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
NodeNotReadyError, NotControllerError)
@@ -11,6 +12,7 @@ from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest)
+from kafka.protocol.commit import GroupCoordinatorRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.version import __version__
@@ -243,6 +245,44 @@ class KafkaAdmin(object):
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
.format(version))
+ def _find_group_coordinator_id(self, group_id):
+ """Find the broker node_id of the coordinator of the given group.
+
+ Sends a FindCoordinatorRequest message to the cluster. Will block until
+ the FindCoordinatorResponse is received. Any errors are immediately
+ raised.
+
+ :param group_id: The consumer group ID. This is typically the group
+ name as a string.
+ :return: The node_id of the broker that is the coordinator.
+ """
+ # Note: Java may change how this is implemented in KAFKA-6791.
+ #
+ # TODO add support for dynamically picking version of
+ # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
+ # When I experimented with this, GroupCoordinatorResponse_v1 didn't
+ # match GroupCoordinatorResponse_v0 and I couldn't figure out why.
+ gc_request = GroupCoordinatorRequest[0](group_id)
+ gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
+ # use the extra error checking in add_group_coordinator() rather than
+ # immediately returning the group coordinator.
+ success = self._client.cluster.add_group_coordinator(group_id, gc_response)
+ if not success:
+ error_type = Errors.for_code(gc_response.error_code)
+ assert error_type is not Errors.NoError
+ # Note: When error_type.retriable, Java will retry... see
+ # KafkaAdminClient's handleFindCoordinatorError method
+ raise error_type(
+ "Could not identify group coordinator for group_id '{}' from response '{}'."
+ .format(group_id, gc_response))
+ group_coordinator = self._client.cluster.coordinator_for_group(group_id)
+ # will be None if the coordinator was never populated, which should never happen here
+ assert group_coordinator is not None
+ # will be -1 if add_group_coordinator() failed... but by this point the
+ # error should have been raised.
+ assert group_coordinator != -1
+ return group_coordinator
+
def _send_request_to_node(self, node, request):
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.