diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-24 08:44:46 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-24 08:44:46 -0700 |
commit | d388b48951327955a9a9031a229f02880e2c6f05 (patch) | |
tree | a0f7fb764114f9573a24d7e91a1aa4ba103be928 | |
parent | ce9c1d2e2b8d85b2f6c3b2a2ebd280246cfea07f (diff) | |
download | kafka-python-d388b48951327955a9a9031a229f02880e2c6f05.tar.gz |
Dont do client wakeup when sending from sender thread (#1761)
-rw-r--r-- | kafka/client_async.py | 11 | ||||
-rw-r--r-- | kafka/producer/sender.py | 5 |
2 files changed, 10 insertions, 6 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 369dc3e..682fd7c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -321,14 +321,15 @@ class KafkaClient(object): log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() - def maybe_connect(self, node_id): + def maybe_connect(self, node_id, wakeup=True): """Queues a node for asynchronous connection during the next .poll()""" if self._can_connect(node_id): self._connecting.add(node_id) # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding # the client lock in poll(). - self.wakeup() + if wakeup: + self.wakeup() return True return False @@ -499,7 +500,7 @@ class KafkaClient(object): return False return conn.connected() and conn.can_send_more() - def send(self, node_id, request): + def send(self, node_id, request, wakeup=True): """Send a request to a specific node. Bytes are placed on an internal per-connection send-queue. Actual network I/O will be triggered in a subsequent call to .poll() @@ -507,6 +508,7 @@ class KafkaClient(object): Arguments: node_id (int): destination node request (Struct): request object (not-encoded) + wakeup (bool): optional flag to disable thread-wakeup Raises: AssertionError: if node_id is not in current cluster metadata @@ -526,7 +528,8 @@ class KafkaClient(object): # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding # the client lock in poll(). - self.wakeup() + if wakeup: + self.wakeup() return future diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 895045d..064fee4 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -105,8 +105,9 @@ class Sender(threading.Thread): # remove any nodes we aren't ready to send to not_ready_timeout = float('inf') for node in list(ready_nodes): - if not self._client.ready(node): + if not self._client.is_ready(node): log.debug('Node %s not ready; delaying produce of accumulated batch', node) + self._client.maybe_connect(node, wakeup=False) ready_nodes.remove(node) not_ready_timeout = min(not_ready_timeout, self._client.connection_delay(node)) @@ -144,7 +145,7 @@ class Sender(threading.Thread): for node_id, request in six.iteritems(requests): batches = batches_by_node[node_id] log.debug('Sending Produce Request: %r', request) - (self._client.send(node_id, request) + (self._client.send(node_id, request, wakeup=False) .add_callback( self._handle_produce_response, node_id, time.time(), batches) .add_errback( |