summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index c1c4fbb..51a007c 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import collections
import copy
@@ -491,7 +491,7 @@ class BrokerConnection(object):
self.config['sasl_mechanism']))
def _send_bytes_blocking(self, data):
- self._sock.setblocking(True)
+ self._sock.settimeout(self.config['request_timeout_ms'] / 1000)
total_sent = 0
try:
while total_sent < len(data):
@@ -501,10 +501,10 @@ class BrokerConnection(object):
raise ConnectionError('Buffer overrun during socket send')
return total_sent
finally:
- self._sock.setblocking(False)
+ self._sock.settimeout(0.0)
def _recv_bytes_blocking(self, n):
- self._sock.setblocking(True)
+ self._sock.settimeout(self.config['request_timeout_ms'] / 1000)
try:
data = b''
while len(data) < n:
@@ -514,7 +514,7 @@ class BrokerConnection(object):
data += fragment
return data
finally:
- self._sock.setblocking(False)
+ self._sock.settimeout(0.0)
def _try_authenticate_plain(self, future):
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
@@ -696,6 +696,7 @@ class BrokerConnection(object):
# In the future we might manage an internal write buffer
# and send bytes asynchronously. For now, just block
# sending each request payload
+ sent_time = time.time()
total_bytes = self._send_bytes_blocking(data)
if self._sensors:
self._sensors.bytes_sent.record(total_bytes)
@@ -707,7 +708,7 @@ class BrokerConnection(object):
log.debug('%s Request %d: %s', self, correlation_id, request)
if request.expect_response():
- ifr = (correlation_id, future, time.time())
+ ifr = (correlation_id, future, sent_time)
self.in_flight_requests.append(ifr)
else:
future.success(None)