diff options
| author | Dana Powers <dana.powers@gmail.com> | 2019-03-22 08:58:17 -0700 |
|---|---|---|
| committer | Dana Powers <dana.powers@gmail.com> | 2019-03-23 08:55:26 -0700 |
| commit | 3c5dfef52da589250e069e82ff3f12e013ead4db (patch) | |
| tree | 1623ebcfb76e52fe6581c9df8a2b7afbcacd994d /kafka/client_async.py | |
| parent | 8e2ed3ebb45f98e71b7c77fdd52472b815bb7ad2 (diff) | |
| download | kafka-python-3c5dfef52da589250e069e82ff3f12e013ead4db.tar.gz | |
Dont do client wakeup when sending from sender / same thread
Diffstat (limited to 'kafka/client_async.py')
| -rw-r--r-- | kafka/client_async.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ebd4af7..cff172f 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -499,7 +499,7 @@ class KafkaClient(object): return False return conn.connected() and conn.can_send_more() - def send(self, node_id, request): + def send(self, node_id, request, wakeup=True): """Send a request to a specific node. Bytes are placed on an internal per-connection send-queue. Actual network I/O will be triggered in a subsequent call to .poll() @@ -507,6 +507,7 @@ class KafkaClient(object): Arguments: node_id (int): destination node request (Struct): request object (not-encoded) + wakeup (bool): optional flag to disable thread-wakeup Raises: AssertionError: if node_id is not in current cluster metadata @@ -526,7 +527,8 @@ class KafkaClient(object): # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding # the client lock in poll(). - self.wakeup() + if wakeup: + self.wakeup() return future |
