diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-22 08:58:17 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-03-23 08:55:26 -0700 |
commit | 3c5dfef52da589250e069e82ff3f12e013ead4db (patch) | |
tree | 1623ebcfb76e52fe6581c9df8a2b7afbcacd994d | |
parent | 8e2ed3ebb45f98e71b7c77fdd52472b815bb7ad2 (diff) | |
download | kafka-python-3c5dfef52da589250e069e82ff3f12e013ead4db.tar.gz |
Dont do client wakeup when sending from sender / same thread
-rw-r--r-- | kafka/client_async.py | 6 | ||||
-rw-r--r-- | kafka/producer/sender.py | 2 |
2 files changed, 5 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ebd4af7..cff172f 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -499,7 +499,7 @@ class KafkaClient(object): return False return conn.connected() and conn.can_send_more() - def send(self, node_id, request): + def send(self, node_id, request, wakeup=True): """Send a request to a specific node. Bytes are placed on an internal per-connection send-queue. Actual network I/O will be triggered in a subsequent call to .poll() @@ -507,6 +507,7 @@ class KafkaClient(object): Arguments: node_id (int): destination node request (Struct): request object (not-encoded) + wakeup (bool): optional flag to disable thread-wakeup Raises: AssertionError: if node_id is not in current cluster metadata @@ -526,7 +527,8 @@ class KafkaClient(object): # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding # the client lock in poll(). - self.wakeup() + if wakeup: + self.wakeup() return future diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 895045d..17d6255 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -144,7 +144,7 @@ class Sender(threading.Thread): for node_id, request in six.iteritems(requests): batches = batches_by_node[node_id] log.debug('Sending Produce Request: %r', request) - (self._client.send(node_id, request) + (self._client.send(node_id, request, wakeup=False) .add_callback( self._handle_produce_response, node_id, time.time(), batches) .add_errback( |