summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-24 08:44:46 -0700
committerGitHub <noreply@github.com>2019-03-24 08:44:46 -0700
commitd388b48951327955a9a9031a229f02880e2c6f05 (patch)
treea0f7fb764114f9573a24d7e91a1aa4ba103be928
parentce9c1d2e2b8d85b2f6c3b2a2ebd280246cfea07f (diff)
downloadkafka-python-d388b48951327955a9a9031a229f02880e2c6f05.tar.gz
Dont do client wakeup when sending from sender thread (#1761)
-rw-r--r--kafka/client_async.py11
-rw-r--r--kafka/producer/sender.py5
2 files changed, 10 insertions, 6 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 369dc3e..682fd7c 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -321,14 +321,15 @@ class KafkaClient(object):
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
- def maybe_connect(self, node_id):
+ def maybe_connect(self, node_id, wakeup=True):
"""Queues a node for asynchronous connection during the next .poll()"""
if self._can_connect(node_id):
self._connecting.add(node_id)
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
# the client lock in poll().
- self.wakeup()
+ if wakeup:
+ self.wakeup()
return True
return False
@@ -499,7 +500,7 @@ class KafkaClient(object):
return False
return conn.connected() and conn.can_send_more()
- def send(self, node_id, request):
+ def send(self, node_id, request, wakeup=True):
"""Send a request to a specific node. Bytes are placed on an
internal per-connection send-queue. Actual network I/O will be
triggered in a subsequent call to .poll()
@@ -507,6 +508,7 @@ class KafkaClient(object):
Arguments:
node_id (int): destination node
request (Struct): request object (not-encoded)
+ wakeup (bool): optional flag to disable thread-wakeup
Raises:
AssertionError: if node_id is not in current cluster metadata
@@ -526,7 +528,8 @@ class KafkaClient(object):
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
# the client lock in poll().
- self.wakeup()
+ if wakeup:
+ self.wakeup()
return future
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 895045d..064fee4 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -105,8 +105,9 @@ class Sender(threading.Thread):
# remove any nodes we aren't ready to send to
not_ready_timeout = float('inf')
for node in list(ready_nodes):
- if not self._client.ready(node):
+ if not self._client.is_ready(node):
log.debug('Node %s not ready; delaying produce of accumulated batch', node)
+ self._client.maybe_connect(node, wakeup=False)
ready_nodes.remove(node)
not_ready_timeout = min(not_ready_timeout,
self._client.connection_delay(node))
@@ -144,7 +145,7 @@ class Sender(threading.Thread):
for node_id, request in six.iteritems(requests):
batches = batches_by_node[node_id]
log.debug('Sending Produce Request: %r', request)
- (self._client.send(node_id, request)
+ (self._client.send(node_id, request, wakeup=False)
.add_callback(
self._handle_produce_response, node_id, time.time(), batches)
.add_errback(