summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py32
1 files changed, 29 insertions, 3 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 12bd08d..687b748 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -5,7 +5,7 @@ import copy
import errno
import logging
import io
-from random import shuffle
+from random import randint, shuffle
import socket
import time
import traceback
@@ -140,6 +140,7 @@ class BrokerConnection(object):
'node_id': 0,
'request_timeout_ms': 40000,
'reconnect_backoff_ms': 50,
+ 'reconnect_backoff_max': 60000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
@@ -199,6 +200,7 @@ class BrokerConnection(object):
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
self.state = ConnectionStates.DISCONNECTED
+ self._reset_reconnect_backoff()
self._sock = None
self._ssl_context = None
if self.config['ssl_context'] is not None:
@@ -305,6 +307,7 @@ class BrokerConnection(object):
else:
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
+ self._reset_reconnect_backoff()
self.config['state_change_callback'](self)
# Connection failed
@@ -340,6 +343,7 @@ class BrokerConnection(object):
log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
+ self._reset_reconnect_backoff()
self.config['state_change_callback'](self)
return self.state
@@ -475,11 +479,19 @@ class BrokerConnection(object):
re-establish a connection yet
"""
if self.state is ConnectionStates.DISCONNECTED:
- backoff = self.config['reconnect_backoff_ms'] / 1000.0
- if time.time() < self.last_attempt + backoff:
+ if time.time() < self.last_attempt + self._reconnect_backoff:
return True
return False
+ def connection_delay(self):
+ time_waited_ms = time.time() - (self.last_attempt or 0)
+ if conn.state is ConnectionStates.DISCONNECTED:
+ return max(self._reconnect_backoff - time_waited_ms, 0)
+ elif conn.connecting():
+ return 0
+ else:
+ return 999999999
+
def connected(self):
"""Return True iff socket is connected."""
return self.state is ConnectionStates.CONNECTED
@@ -495,6 +507,19 @@ class BrokerConnection(object):
"""Return True iff socket is closed"""
return self.state is ConnectionStates.DISCONNECTED
+ def _reset_reconnect_backoff(self):
+ self._failures = 0
+ self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0
+
+ def _update_reconnect_backoff(self):
+ if self.config['reconnect_backoff_max'] > self.config['reconnect_backoff_ms']:
+ self._failures += 1
+ self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
+ self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max'])
+ self._reconnect_backoff = randint(self.config['reconnect_backoff_ms'], self._reconnect_backoff)
+ self._reconnect_backoff /= 1000.0
+ log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)
+
def close(self, error=None):
"""Close socket and fail all in-flight-requests.
@@ -512,6 +537,7 @@ class BrokerConnection(object):
log.info('%s: Closing connection. %s', self, error or '')
self.state = ConnectionStates.DISCONNECTING
self.config['state_change_callback'](self)
+ self._update_reconnect_backoff()
if self._sock:
self._sock.close()
self._sock = None