diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/conn.py | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index c1c4fbb..51a007c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import collections import copy @@ -491,7 +491,7 @@ class BrokerConnection(object): self.config['sasl_mechanism'])) def _send_bytes_blocking(self, data): - self._sock.setblocking(True) + self._sock.settimeout(self.config['request_timeout_ms'] / 1000) total_sent = 0 try: while total_sent < len(data): @@ -501,10 +501,10 @@ class BrokerConnection(object): raise ConnectionError('Buffer overrun during socket send') return total_sent finally: - self._sock.setblocking(False) + self._sock.settimeout(0.0) def _recv_bytes_blocking(self, n): - self._sock.setblocking(True) + self._sock.settimeout(self.config['request_timeout_ms'] / 1000) try: data = b'' while len(data) < n: @@ -514,7 +514,7 @@ class BrokerConnection(object): data += fragment return data finally: - self._sock.setblocking(False) + self._sock.settimeout(0.0) def _try_authenticate_plain(self, future): if self.config['security_protocol'] == 'SASL_PLAINTEXT': @@ -696,6 +696,7 @@ class BrokerConnection(object): # In the future we might manage an internal write buffer # and send bytes asynchronously. For now, just block # sending each request payload + sent_time = time.time() total_bytes = self._send_bytes_blocking(data) if self._sensors: self._sensors.bytes_sent.record(total_bytes) @@ -707,7 +708,7 @@ class BrokerConnection(object): log.debug('%s Request %d: %s', self, correlation_id, request) if request.expect_response(): - ifr = (correlation_id, future, time.time()) + ifr = (correlation_id, future, sent_time) self.in_flight_requests.append(ifr) else: future.success(None) |