diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2018-11-17 02:24:26 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-11-18 08:39:40 -0800 |
commit | 98ea78875f04fa8a655780f77a701f409c510696 (patch) | |
tree | 894a91eab760b2d91bf174d5d15133a91faa3e1e /kafka/admin/kafka.py | |
parent | d67157cb9a032a6f0493cea128bbcd0528f3e640 (diff) | |
download | kafka-python-add-group-coordinator-lookup.tar.gz |
Add group coordinator lookupadd-group-coordinator-lookup
We need a way to send a request to the group coordinator.
I spent a day and a half trying to implement a `_send_request_to_group_coordinator()`
that included:
1. caching the value of the group coordinator so that it wouldn't have
to be repeatedly looked up on every call. This is particularly important
because the `list_consumer_groups()`, `list_consumer_group_offsets()`,
and `describe_consumer_groups()` will frequently be used by monitoring
scripts. I know across the production clusters that I support, using a
cached value will save ~1M calls per day.
2. clean and consistent error handling. This is difficult because the
responses are inconsistent about error codes. Some have a top-level
error code, some bury it within the description of the actual item.
3. Avoiding tight coupling between this method and the request/response
classes... the custom parsing logic for errors etc, given that it's
non-standard, should live in the callers, not here.
So finally I gave up and just went with this simpler solution and made
it so the callers can optionally bypass this if they somehow already
know the group coordinator.
Diffstat (limited to 'kafka/admin/kafka.py')
-rw-r--r-- | kafka/admin/kafka.py | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index 5ce8630..3dc2e44 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -4,6 +4,7 @@ import copy import logging import socket from kafka.client_async import KafkaClient, selectors +import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError, NodeNotReadyError, NotControllerError) @@ -11,6 +12,7 @@ from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest) +from kafka.protocol.commit import GroupCoordinatorRequest from kafka.protocol.metadata import MetadataRequest from kafka.version import __version__ @@ -243,6 +245,44 @@ class KafkaAdmin(object): "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0." .format(version)) + def _find_group_coordinator_id(self, group_id): + """Find the broker node_id of the coordinator of the given group. + + Sends a FindCoordinatorRequest message to the cluster. Will block until + the FindCoordinatorResponse is received. Any errors are immediately + raised. + + :param group_id: The consumer group ID. This is typically the group + name as a string. + :return: The node_id of the broker that is the coordinator. + """ + # Note: Java may change how this is implemented in KAFKA-6791. + # + # TODO add support for dynamically picking version of + # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest. + # When I experimented with this, GroupCoordinatorResponse_v1 didn't + # match GroupCoordinatorResponse_v0 and I couldn't figure out why. + gc_request = GroupCoordinatorRequest[0](group_id) + gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request) + # use the extra error checking in add_group_coordinator() rather than + # immediately returning the group coordinator. + success = self._client.cluster.add_group_coordinator(group_id, gc_response) + if not success: + error_type = Errors.for_code(gc_response.error_code) + assert error_type is not Errors.NoError + # Note: When error_type.retriable, Java will retry... see + # KafkaAdminClient's handleFindCoordinatorError method + raise error_type( + "Could not identify group coordinator for group_id '{}' from response '{}'." + .format(group_id, gc_response)) + group_coordinator = self._client.cluster.coordinator_for_group(group_id) + # will be None if the coordinator was never populated, which should never happen here + assert group_coordinator is not None + # will be -1 if add_group_coordinator() failed... but by this point the + # error should have been raised. + assert group_coordinator != -1 + return group_coordinator + def _send_request_to_node(self, node, request): """Send a kafka protocol message to a specific broker. Will block until the message result is received. |