diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-06 06:24:59 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-03-07 16:52:04 -0800 |
commit | 492b7d27f5cc4ecfde7af1e380162c5fd9e4206a (patch) | |
tree | 1fb9bee8bfba75a441d0533681b1440c481a2ca0 | |
parent | 3c873794f04d968895b0452a26ac061862686abf (diff) | |
download | kafka-python-492b7d27f5cc4ecfde7af1e380162c5fd9e4206a.tar.gz |
Send network requests during KafkaClient.poll() rather than in KafkaClient.send()
-rw-r--r-- | kafka/client_async.py | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index e2bdda9..e9d5919 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -522,7 +522,17 @@ class KafkaClient(object): if not self._maybe_connect(node_id): return Future().failure(Errors.NodeNotReadyError(node_id)) - return self._conns[node_id].send(request) + # conn.send will queue the request internally + # we will need to call send_pending_requests() + # to trigger network I/O + future = self._conns[node_id].send(request, blocking=False) + + # Wakeup signal is useful in case another thread is + # blocked waiting for incoming network traffic while holding + # the client lock in poll(). + self.wakeup() + + return future def poll(self, timeout_ms=None, future=None): """Try to read and write to sockets. @@ -640,6 +650,8 @@ class KafkaClient(object): conn.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % conn.config['request_timeout_ms'])) + else: + conn.send_pending_requests() if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) |