diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 23 |
1 files changed, 16 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 5824e7a..c0cdc43 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -333,17 +333,19 @@ class KafkaClient(object): conn.connect() return conn.connected() - def ready(self, node_id): + def ready(self, node_id, metadata_priority=True): """Check whether a node is connected and ok to send more requests. Arguments: node_id (int): the id of the node to check + metadata_priority (bool): Mark node as not-ready if a metadata + refresh is required. Default: True Returns: bool: True if we are ready to send to the given node """ self._maybe_connect(node_id) - return self.is_ready(node_id) + return self.is_ready(node_id, metadata_priority=metadata_priority) def connected(self, node_id): """Return True iff the node_id is connected.""" @@ -414,7 +416,7 @@ class KafkaClient(object): else: return 999999999 - def is_ready(self, node_id): + def is_ready(self, node_id, metadata_priority=True): """Check whether a node is ready to send more requests. In addition to connection-level checks, this method also is used to @@ -422,16 +424,23 @@ class KafkaClient(object): Arguments: node_id (int): id of the node to check + metadata_priority (bool): Mark node as not-ready if a metadata + refresh is required. Default: True Returns: bool: True if the node is ready and metadata is not refreshing """ + if not self._can_send_request(node_id): + return False + # if we need to update our metadata now declare all requests unready to # make metadata requests first priority - if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0: - if self._can_send_request(node_id): - return True - return False + if metadata_priority: + if self._metadata_refresh_in_progress: + return False + if self.cluster.ttl() == 0: + return False + return True def _can_send_request(self, node_id): if node_id not in self._conns: |