summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-22 08:58:17 -0700
committerDana Powers <dana.powers@gmail.com>2019-03-23 08:55:26 -0700
commit3c5dfef52da589250e069e82ff3f12e013ead4db (patch)
tree1623ebcfb76e52fe6581c9df8a2b7afbcacd994d
parent8e2ed3ebb45f98e71b7c77fdd52472b815bb7ad2 (diff)
downloadkafka-python-3c5dfef52da589250e069e82ff3f12e013ead4db.tar.gz
Dont do client wakeup when sending from sender / same thread
-rw-r--r--kafka/client_async.py6
-rw-r--r--kafka/producer/sender.py2
2 files changed, 5 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ebd4af7..cff172f 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -499,7 +499,7 @@ class KafkaClient(object):
return False
return conn.connected() and conn.can_send_more()
- def send(self, node_id, request):
+ def send(self, node_id, request, wakeup=True):
"""Send a request to a specific node. Bytes are placed on an
internal per-connection send-queue. Actual network I/O will be
triggered in a subsequent call to .poll()
@@ -507,6 +507,7 @@ class KafkaClient(object):
Arguments:
node_id (int): destination node
request (Struct): request object (not-encoded)
+ wakeup (bool): optional flag to disable thread-wakeup
Raises:
AssertionError: if node_id is not in current cluster metadata
@@ -526,7 +527,8 @@ class KafkaClient(object):
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
# the client lock in poll().
- self.wakeup()
+ if wakeup:
+ self.wakeup()
return future
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 895045d..17d6255 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -144,7 +144,7 @@ class Sender(threading.Thread):
for node_id, request in six.iteritems(requests):
batches = batches_by_node[node_id]
log.debug('Sending Produce Request: %r', request)
- (self._client.send(node_id, request)
+ (self._client.send(node_id, request, wakeup=False)
.add_callback(
self._handle_produce_response, node_id, time.time(), batches)
.add_errback(