summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-22 08:58:17 -0700
committerDana Powers <dana.powers@gmail.com>2019-03-23 08:55:26 -0700
commit3c5dfef52da589250e069e82ff3f12e013ead4db (patch)
tree1623ebcfb76e52fe6581c9df8a2b7afbcacd994d /kafka/client_async.py
parent8e2ed3ebb45f98e71b7c77fdd52472b815bb7ad2 (diff)
downloadkafka-python-3c5dfef52da589250e069e82ff3f12e013ead4db.tar.gz
Dont do client wakeup when sending from sender / same thread
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ebd4af7..cff172f 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -499,7 +499,7 @@ class KafkaClient(object):
return False
return conn.connected() and conn.can_send_more()
- def send(self, node_id, request):
+ def send(self, node_id, request, wakeup=True):
"""Send a request to a specific node. Bytes are placed on an
internal per-connection send-queue. Actual network I/O will be
triggered in a subsequent call to .poll()
@@ -507,6 +507,7 @@ class KafkaClient(object):
Arguments:
node_id (int): destination node
request (Struct): request object (not-encoded)
+ wakeup (bool): optional flag to disable thread-wakeup
Raises:
AssertionError: if node_id is not in current cluster metadata
@@ -526,7 +527,8 @@ class KafkaClient(object):
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
# the client lock in poll().
- self.wakeup()
+ if wakeup:
+ self.wakeup()
return future