diff options
author | Lou-Cipher <dmheitman@gmail.com> | 2019-05-21 14:33:30 -0600 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-05-21 13:33:30 -0700 |
commit | f145e37c1992da71d5c65c0d86ae971cda62e058 (patch) | |
tree | f0bb22315da263a9797f09d77e5bb674005b4073 | |
parent | 1b6c9c76bcf753c4dd28dc2d865d6d43246cf4af (diff) | |
download | kafka-python-f145e37c1992da71d5c65c0d86ae971cda62e058.tar.gz |
Use futures to parallelize calls to _send_request_to_node() (#1807)
Use `futures` to parallelize calls to `_send_request_to_node()`
This allows queries that need to go to multiple brokers to be run in parallel.
-rw-r--r-- | kafka/admin/client.py | 109 |
1 files changed, 75 insertions, 34 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index e4219e9..155ad21 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -249,7 +249,11 @@ class KafkaAdminClient(object): version = self._matching_api_version(MetadataRequest) if 1 <= version <= 6: request = MetadataRequest[version]() - response = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_node(self._client.least_loaded_node(), request) + + self._wait_for_futures([future]) + + response = future.value controller_id = response.controller_id # verify the controller is new enough to support our requests controller_version = self._client.check_version(controller_id) @@ -281,7 +285,11 @@ class KafkaAdminClient(object): # When I experimented with this, GroupCoordinatorResponse_v1 didn't # match GroupCoordinatorResponse_v0 and I couldn't figure out why. gc_request = GroupCoordinatorRequest[0](group_id) - gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request) + future = self._send_request_to_node(self._client.least_loaded_node(), gc_request) + + self._wait_for_futures([future]) + + gc_response = future.value # use the extra error checking in add_group_coordinator() rather than # immediately returning the group coordinator. success = self._client.cluster.add_group_coordinator(group_id, gc_response) @@ -304,23 +312,19 @@ class KafkaAdminClient(object): def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. - Will block until the message result is received. + Returns a future that may be polled for status and results. :param node_id: The broker id to which to send the message. :param request: The message to send. - :return: The Kafka protocol response for the message. + :return: A future object that may be polled for status and results. :exception: The exception if the message could not be sent. """ while not self._client.ready(node_id): # poll until the connection to broker is ready, otherwise send() # will fail with NodeNotReadyError self._client.poll() - future = self._client.send(node_id, request) - self._client.poll(future=future) - if future.succeeded(): - return future.value - else: - raise future.exception # pylint: disable-msg=raising-bad-type + return self._client.send(node_id, request) + def _send_request_to_controller(self, request): """Send a Kafka protocol message to the cluster controller. @@ -333,7 +337,11 @@ class KafkaAdminClient(object): tries = 2 # in case our cached self._controller_id is outdated while tries: tries -= 1 - response = self._send_request_to_node(self._controller_id, request) + future = self._send_request_to_node(self._controller_id, request) + + self._wait_for_futures([future]) + + response = future.value # In Java, the error fieldname is inconsistent: # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors # - DeleteTopicsResponse uses topic_error_codes @@ -490,7 +498,11 @@ class KafkaAdminClient(object): raise NotImplementedError( "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient." .format(version)) - return self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_node(self._client.least_loaded_node(), request) + + self._wait_for_futures([future]) + + return future.value @staticmethod def _convert_alter_config_resource_request(config_resource): @@ -529,7 +541,11 @@ class KafkaAdminClient(object): # // a single request that may be sent to any broker. # # So this is currently broken as it always sends to the least_loaded_node() - return self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_node(self._client.least_loaded_node(), request) + + self._wait_for_futures([future]) + + return future.value # alter replica logs dir protocol not yet implemented # Note: have to lookup the broker with the replica assignment and send the request to that broker @@ -605,12 +621,14 @@ class KafkaAdminClient(object): partition assignments. """ group_descriptions = [] + futures = [] version = self._matching_api_version(DescribeGroupsRequest) for group_id in group_ids: if group_coordinator_id is not None: this_groups_coordinator_id = group_coordinator_id else: this_groups_coordinator_id = self._find_group_coordinator_id(group_id) + if version <= 1: # Note: KAFKA-6788 A potential optimization is to group the # request per coordinator and send one request with a list of @@ -618,29 +636,35 @@ class KafkaAdminClient(object): # because the error checking is hard to get right when some # groups error and others don't. request = DescribeGroupsRequest[version](groups=(group_id,)) - response = self._send_request_to_node(this_groups_coordinator_id, request) - assert len(response.groups) == 1 - # TODO need to implement converting the response tuple into - # a more accessible interface like a namedtuple and then stop - # hardcoding tuple indices here. Several Java examples, - # including KafkaAdminClient.java - group_description = response.groups[0] - error_code = group_description[0] - error_type = Errors.for_code(error_code) - # Java has the note: KAFKA-6789, we can retry based on the error code - if error_type is not Errors.NoError: - raise error_type( - "Request '{}' failed with response '{}'." - .format(request, response)) - # TODO Java checks the group protocol type, and if consumer - # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes - # the members' partition assignments... that hasn't yet been - # implemented here so just return the raw struct results - group_descriptions.append(group_description) + futures.append(self._send_request_to_node(this_groups_coordinator_id, request)) else: raise NotImplementedError( "Support for DescribeGroups v{} has not yet been added to KafkaAdminClient." .format(version)) + + self._wait_for_futures(futures) + + for future in futures: + response = future.value + assert len(response.groups) == 1 + # TODO need to implement converting the response tuple into + # a more accessible interface like a namedtuple and then stop + # hardcoding tuple indices here. Several Java examples, + # including KafkaAdminClient.java + group_description = response.groups[0] + error_code = group_description[0] + error_type = Errors.for_code(error_code) + # Java has the note: KAFKA-6789, we can retry based on the error code + if error_type is not Errors.NoError: + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + # TODO Java checks the group protocol type, and if consumer + # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes + # the members' partition assignments... that hasn't yet been + # implemented here so just return the raw struct results + group_descriptions.append(group_description) + return group_descriptions def list_consumer_groups(self, broker_ids=None): @@ -673,13 +697,19 @@ class KafkaAdminClient(object): # consumer groups move to new brokers that haven't yet been queried, # then the same group could be returned by multiple brokers. consumer_groups = set() + futures = [] if broker_ids is None: broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()] version = self._matching_api_version(ListGroupsRequest) if version <= 2: request = ListGroupsRequest[version]() for broker_id in broker_ids: - response = self._send_request_to_node(broker_id, request) + futures.append(self._send_request_to_node(broker_id, request)) + + self._wait_for_futures(futures) + + for future in futures: + response = future.value error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: raise error_type( @@ -738,7 +768,10 @@ class KafkaAdminClient(object): topics_partitions_dict[topic].add(partition) topics_partitions = list(six.iteritems(topics_partitions_dict)) request = OffsetFetchRequest[version](group_id, topics_partitions) - response = self._send_request_to_node(group_coordinator_id, request) + future = self._send_request_to_node(group_coordinator_id, request) + self._wait_for_futures([future]) + response = future.value + if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: @@ -764,3 +797,11 @@ class KafkaAdminClient(object): # delete groups protocol not yet implemented # Note: send the request to the group's coordinator. + + def _wait_for_futures(self, futures): + while not all(future.succeeded() for future in futures): + for future in futures: + self._client.poll(future=future) + + if future.failed(): + raise future.exception # pylint: disable-msg=raising-bad-type |