summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-06 06:24:59 -0800
committerJeff Widman <jeff@jeffwidman.com>2019-03-07 16:52:04 -0800
commit492b7d27f5cc4ecfde7af1e380162c5fd9e4206a (patch)
tree1fb9bee8bfba75a441d0533681b1440c481a2ca0
parent3c873794f04d968895b0452a26ac061862686abf (diff)
downloadkafka-python-492b7d27f5cc4ecfde7af1e380162c5fd9e4206a.tar.gz
Send network requests during KafkaClient.poll() rather than in KafkaClient.send()
-rw-r--r--kafka/client_async.py14
1 files changed, 13 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e2bdda9..e9d5919 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -522,7 +522,17 @@ class KafkaClient(object):
if not self._maybe_connect(node_id):
return Future().failure(Errors.NodeNotReadyError(node_id))
- return self._conns[node_id].send(request)
+ # conn.send will queue the request internally
+ # we will need to call send_pending_requests()
+ # to trigger network I/O
+ future = self._conns[node_id].send(request, blocking=False)
+
+ # Wakeup signal is useful in case another thread is
+ # blocked waiting for incoming network traffic while holding
+ # the client lock in poll().
+ self.wakeup()
+
+ return future
def poll(self, timeout_ms=None, future=None):
"""Try to read and write to sockets.
@@ -640,6 +650,8 @@ class KafkaClient(object):
conn.close(error=Errors.RequestTimedOutError(
'Request timed out after %s ms' %
conn.config['request_timeout_ms']))
+ else:
+ conn.send_pending_requests()
if self._sensors:
self._sensors.io_time.record((time.time() - end_select) * 1000000000)