summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-06-19 10:16:02 -0700
committerGitHub <noreply@github.com>2017-06-19 10:16:02 -0700
commitb1cc966439a65f8be1b3973b16753dfba2b51c37 (patch)
treefb25aec1147ef9dfc6758c72fcc87130e557b53d /kafka/conn.py
parentcceaf4ae0982a78bdaef39ce1c9635e260bff709 (diff)
downloadkafka-python-b1cc966439a65f8be1b3973b16753dfba2b51c37.tar.gz
KIP-144: Exponential backoff for broker reconnections (#1124)
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py40
1 files changed, 37 insertions, 3 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 12bd08d..f118345 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -5,7 +5,7 @@ import copy
import errno
import logging
import io
-from random import shuffle
+from random import shuffle, uniform
import socket
import time
import traceback
@@ -78,6 +78,14 @@ class BrokerConnection(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
+ reconnect_backoff_max_ms (int): The maximum amount of time in
+ milliseconds to wait when reconnecting to a broker that has
+ repeatedly failed to connect. If provided, the backoff per host
+ will increase exponentially for each consecutive connection
+ failure, up to this maximum. To avoid connection storms, a
+ randomization factor of 0.2 will be applied to the backoff
+ resulting in a random range between 20% below and 20% above
+ the computed value. Default: 1000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
max_in_flight_requests_per_connection (int): Requests are pipelined
@@ -140,6 +148,7 @@ class BrokerConnection(object):
'node_id': 0,
'request_timeout_ms': 40000,
'reconnect_backoff_ms': 50,
+ 'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
@@ -199,6 +208,7 @@ class BrokerConnection(object):
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
self.state = ConnectionStates.DISCONNECTED
+ self._reset_reconnect_backoff()
self._sock = None
self._ssl_context = None
if self.config['ssl_context'] is not None:
@@ -305,6 +315,7 @@ class BrokerConnection(object):
else:
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
+ self._reset_reconnect_backoff()
self.config['state_change_callback'](self)
# Connection failed
@@ -340,6 +351,7 @@ class BrokerConnection(object):
log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
+ self._reset_reconnect_backoff()
self.config['state_change_callback'](self)
return self.state
@@ -475,11 +487,19 @@ class BrokerConnection(object):
re-establish a connection yet
"""
if self.state is ConnectionStates.DISCONNECTED:
- backoff = self.config['reconnect_backoff_ms'] / 1000.0
- if time.time() < self.last_attempt + backoff:
+ if time.time() < self.last_attempt + self._reconnect_backoff:
return True
return False
+ def connection_delay(self):
+ time_waited_ms = time.time() - (self.last_attempt or 0)
+ if self.state is ConnectionStates.DISCONNECTED:
+ return max(self._reconnect_backoff - time_waited_ms, 0)
+ elif self.connecting():
+ return 0
+ else:
+ return 999999999
+
def connected(self):
"""Return True iff socket is connected."""
return self.state is ConnectionStates.CONNECTED
@@ -495,6 +515,19 @@ class BrokerConnection(object):
"""Return True iff socket is closed"""
return self.state is ConnectionStates.DISCONNECTED
+ def _reset_reconnect_backoff(self):
+ self._failures = 0
+ self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0
+
+ def _update_reconnect_backoff(self):
+ if self.config['reconnect_backoff_max_ms'] > self.config['reconnect_backoff_ms']:
+ self._failures += 1
+ self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
+ self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms'])
+ self._reconnect_backoff *= uniform(0.8, 1.2)
+ self._reconnect_backoff /= 1000.0
+ log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)
+
def close(self, error=None):
"""Close socket and fail all in-flight-requests.
@@ -512,6 +545,7 @@ class BrokerConnection(object):
log.info('%s: Closing connection. %s', self, error or '')
self.state = ConnectionStates.DISCONNECTING
self.config['state_change_callback'](self)
+ self._update_reconnect_backoff()
if self._sock:
self._sock.close()
self._sock = None