diff options
| -rw-r--r-- | kafka/client_async.py | 6 | ||||
| -rw-r--r-- | kafka/producer/sender.py | 2 |
2 files changed, 5 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ebd4af7..cff172f 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -499,7 +499,7 @@ class KafkaClient(object): return False return conn.connected() and conn.can_send_more() - def send(self, node_id, request): + def send(self, node_id, request, wakeup=True): """Send a request to a specific node. Bytes are placed on an internal per-connection send-queue. Actual network I/O will be triggered in a subsequent call to .poll() @@ -507,6 +507,7 @@ class KafkaClient(object): Arguments: node_id (int): destination node request (Struct): request object (not-encoded) + wakeup (bool): optional flag to disable thread-wakeup Raises: AssertionError: if node_id is not in current cluster metadata @@ -526,7 +527,8 @@ class KafkaClient(object): # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding # the client lock in poll(). - self.wakeup() + if wakeup: + self.wakeup() return future diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 895045d..17d6255 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -144,7 +144,7 @@ class Sender(threading.Thread): for node_id, request in six.iteritems(requests): batches = batches_by_node[node_id] log.debug('Sending Produce Request: %r', request) - (self._client.send(node_id, request) + (self._client.send(node_id, request, wakeup=False) .add_callback( self._handle_produce_response, node_id, time.time(), batches) .add_errback( |
