diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-24 21:52:56 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-10-24 21:52:56 -0700 |
commit | b865d107638e1e57e3f88bb4a06826bb098610dd (patch) | |
tree | 3388f0cf664219155b18b8a216e99eb6fa227fdb /kafka | |
parent | 4213d53d4ccfd239addc1db07b5b3913b4c6547c (diff) | |
download | kafka-python-conn_send_timeout.tar.gz |
Use socket timeout of request_timeout_ms to prevent blocking forever on sendconn_send_timeout
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/conn.py | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index c1c4fbb..51a007c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import collections import copy @@ -491,7 +491,7 @@ class BrokerConnection(object): self.config['sasl_mechanism'])) def _send_bytes_blocking(self, data): - self._sock.setblocking(True) + self._sock.settimeout(self.config['request_timeout_ms'] / 1000) total_sent = 0 try: while total_sent < len(data): @@ -501,10 +501,10 @@ class BrokerConnection(object): raise ConnectionError('Buffer overrun during socket send') return total_sent finally: - self._sock.setblocking(False) + self._sock.settimeout(0.0) def _recv_bytes_blocking(self, n): - self._sock.setblocking(True) + self._sock.settimeout(self.config['request_timeout_ms'] / 1000) try: data = b'' while len(data) < n: @@ -514,7 +514,7 @@ class BrokerConnection(object): data += fragment return data finally: - self._sock.setblocking(False) + self._sock.settimeout(0.0) def _try_authenticate_plain(self, future): if self.config['security_protocol'] == 'SASL_PLAINTEXT': @@ -696,6 +696,7 @@ class BrokerConnection(object): # In the future we might manage an internal write buffer # and send bytes asynchronously. For now, just block # sending each request payload + sent_time = time.time() total_bytes = self._send_bytes_blocking(data) if self._sensors: self._sensors.bytes_sent.record(total_bytes) @@ -707,7 +708,7 @@ class BrokerConnection(object): log.debug('%s Request %d: %s', self, correlation_id, request) if request.expect_response(): - ifr = (correlation_id, future, time.time()) + ifr = (correlation_id, future, sent_time) self.in_flight_requests.append(ifr) else: future.success(None) |