summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLou-Cipher <dmheitman@gmail.com>2019-05-21 14:33:30 -0600
committerJeff Widman <jeff@jeffwidman.com>2019-05-21 13:33:30 -0700
commitf145e37c1992da71d5c65c0d86ae971cda62e058 (patch)
treef0bb22315da263a9797f09d77e5bb674005b4073
parent1b6c9c76bcf753c4dd28dc2d865d6d43246cf4af (diff)
downloadkafka-python-f145e37c1992da71d5c65c0d86ae971cda62e058.tar.gz
Use futures to parallelize calls to _send_request_to_node() (#1807)
Use `futures` to parallelize calls to `_send_request_to_node()` This allows queries that need to go to multiple brokers to be run in parallel.
-rw-r--r--kafka/admin/client.py109
1 files changed, 75 insertions, 34 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index e4219e9..155ad21 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -249,7 +249,11 @@ class KafkaAdminClient(object):
version = self._matching_api_version(MetadataRequest)
if 1 <= version <= 6:
request = MetadataRequest[version]()
- response = self._send_request_to_node(self._client.least_loaded_node(), request)
+ future = self._send_request_to_node(self._client.least_loaded_node(), request)
+
+ self._wait_for_futures([future])
+
+ response = future.value
controller_id = response.controller_id
# verify the controller is new enough to support our requests
controller_version = self._client.check_version(controller_id)
@@ -281,7 +285,11 @@ class KafkaAdminClient(object):
# When I experimented with this, GroupCoordinatorResponse_v1 didn't
# match GroupCoordinatorResponse_v0 and I couldn't figure out why.
gc_request = GroupCoordinatorRequest[0](group_id)
- gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
+ future = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
+
+ self._wait_for_futures([future])
+
+ gc_response = future.value
# use the extra error checking in add_group_coordinator() rather than
# immediately returning the group coordinator.
success = self._client.cluster.add_group_coordinator(group_id, gc_response)
@@ -304,23 +312,19 @@ class KafkaAdminClient(object):
def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.
- Will block until the message result is received.
+ Returns a future that may be polled for status and results.
:param node_id: The broker id to which to send the message.
:param request: The message to send.
- :return: The Kafka protocol response for the message.
+ :return: A future object that may be polled for status and results.
:exception: The exception if the message could not be sent.
"""
while not self._client.ready(node_id):
# poll until the connection to broker is ready, otherwise send()
# will fail with NodeNotReadyError
self._client.poll()
- future = self._client.send(node_id, request)
- self._client.poll(future=future)
- if future.succeeded():
- return future.value
- else:
- raise future.exception # pylint: disable-msg=raising-bad-type
+ return self._client.send(node_id, request)
+
def _send_request_to_controller(self, request):
"""Send a Kafka protocol message to the cluster controller.
@@ -333,7 +337,11 @@ class KafkaAdminClient(object):
tries = 2 # in case our cached self._controller_id is outdated
while tries:
tries -= 1
- response = self._send_request_to_node(self._controller_id, request)
+ future = self._send_request_to_node(self._controller_id, request)
+
+ self._wait_for_futures([future])
+
+ response = future.value
# In Java, the error fieldname is inconsistent:
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
# - DeleteTopicsResponse uses topic_error_codes
@@ -490,7 +498,11 @@ class KafkaAdminClient(object):
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
.format(version))
- return self._send_request_to_node(self._client.least_loaded_node(), request)
+ future = self._send_request_to_node(self._client.least_loaded_node(), request)
+
+ self._wait_for_futures([future])
+
+ return future.value
@staticmethod
def _convert_alter_config_resource_request(config_resource):
@@ -529,7 +541,11 @@ class KafkaAdminClient(object):
# // a single request that may be sent to any broker.
#
# So this is currently broken as it always sends to the least_loaded_node()
- return self._send_request_to_node(self._client.least_loaded_node(), request)
+ future = self._send_request_to_node(self._client.least_loaded_node(), request)
+
+ self._wait_for_futures([future])
+
+ return future.value
# alter replica logs dir protocol not yet implemented
# Note: have to lookup the broker with the replica assignment and send the request to that broker
@@ -605,12 +621,14 @@ class KafkaAdminClient(object):
partition assignments.
"""
group_descriptions = []
+ futures = []
version = self._matching_api_version(DescribeGroupsRequest)
for group_id in group_ids:
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
+
if version <= 1:
# Note: KAFKA-6788 A potential optimization is to group the
# request per coordinator and send one request with a list of
@@ -618,29 +636,35 @@ class KafkaAdminClient(object):
# because the error checking is hard to get right when some
# groups error and others don't.
request = DescribeGroupsRequest[version](groups=(group_id,))
- response = self._send_request_to_node(this_groups_coordinator_id, request)
- assert len(response.groups) == 1
- # TODO need to implement converting the response tuple into
- # a more accessible interface like a namedtuple and then stop
- # hardcoding tuple indices here. Several Java examples,
- # including KafkaAdminClient.java
- group_description = response.groups[0]
- error_code = group_description[0]
- error_type = Errors.for_code(error_code)
- # Java has the note: KAFKA-6789, we can retry based on the error code
- if error_type is not Errors.NoError:
- raise error_type(
- "Request '{}' failed with response '{}'."
- .format(request, response))
- # TODO Java checks the group protocol type, and if consumer
- # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
- # the members' partition assignments... that hasn't yet been
- # implemented here so just return the raw struct results
- group_descriptions.append(group_description)
+ futures.append(self._send_request_to_node(this_groups_coordinator_id, request))
else:
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
.format(version))
+
+ self._wait_for_futures(futures)
+
+ for future in futures:
+ response = future.value
+ assert len(response.groups) == 1
+ # TODO need to implement converting the response tuple into
+ # a more accessible interface like a namedtuple and then stop
+ # hardcoding tuple indices here. Several Java examples,
+ # including KafkaAdminClient.java
+ group_description = response.groups[0]
+ error_code = group_description[0]
+ error_type = Errors.for_code(error_code)
+ # Java has the note: KAFKA-6789, we can retry based on the error code
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+ # TODO Java checks the group protocol type, and if consumer
+ # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
+ # the members' partition assignments... that hasn't yet been
+ # implemented here so just return the raw struct results
+ group_descriptions.append(group_description)
+
return group_descriptions
def list_consumer_groups(self, broker_ids=None):
@@ -673,13 +697,19 @@ class KafkaAdminClient(object):
# consumer groups move to new brokers that haven't yet been queried,
# then the same group could be returned by multiple brokers.
consumer_groups = set()
+ futures = []
if broker_ids is None:
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
version = self._matching_api_version(ListGroupsRequest)
if version <= 2:
request = ListGroupsRequest[version]()
for broker_id in broker_ids:
- response = self._send_request_to_node(broker_id, request)
+ futures.append(self._send_request_to_node(broker_id, request))
+
+ self._wait_for_futures(futures)
+
+ for future in futures:
+ response = future.value
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
@@ -738,7 +768,10 @@ class KafkaAdminClient(object):
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
request = OffsetFetchRequest[version](group_id, topics_partitions)
- response = self._send_request_to_node(group_coordinator_id, request)
+ future = self._send_request_to_node(group_coordinator_id, request)
+ self._wait_for_futures([future])
+ response = future.value
+
if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
@@ -764,3 +797,11 @@ class KafkaAdminClient(object):
# delete groups protocol not yet implemented
# Note: send the request to the group's coordinator.
+
+ def _wait_for_futures(self, futures):
+ while not all(future.succeeded() for future in futures):
+ for future in futures:
+ self._client.poll(future=future)
+
+ if future.failed():
+ raise future.exception # pylint: disable-msg=raising-bad-type