diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-06-19 10:16:02 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-19 10:16:02 -0700 |
commit | b1cc966439a65f8be1b3973b16753dfba2b51c37 (patch) | |
tree | fb25aec1147ef9dfc6758c72fcc87130e557b53d /kafka/conn.py | |
parent | cceaf4ae0982a78bdaef39ce1c9635e260bff709 (diff) | |
download | kafka-python-b1cc966439a65f8be1b3973b16753dfba2b51c37.tar.gz |
KIP-144: Exponential backoff for broker reconnections (#1124)
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 40 |
1 files changed, 37 insertions, 3 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 12bd08d..f118345 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,7 +5,7 @@ import copy import errno import logging import io -from random import shuffle +from random import shuffle, uniform import socket import time import traceback @@ -78,6 +78,14 @@ class BrokerConnection(object): reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. + reconnect_backoff_max_ms (int): The maximum amount of time in + milliseconds to wait when reconnecting to a broker that has + repeatedly failed to connect. If provided, the backoff per host + will increase exponentially for each consecutive connection + failure, up to this maximum. To avoid connection storms, a + randomization factor of 0.2 will be applied to the backoff + resulting in a random range between 20% below and 20% above + the computed value. Default: 1000. request_timeout_ms (int): Client request timeout in milliseconds. Default: 40000. max_in_flight_requests_per_connection (int): Requests are pipelined @@ -140,6 +148,7 @@ class BrokerConnection(object): 'node_id': 0, 'request_timeout_ms': 40000, 'reconnect_backoff_ms': 50, + 'reconnect_backoff_max_ms': 1000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, @@ -199,6 +208,7 @@ class BrokerConnection(object): assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' self.state = ConnectionStates.DISCONNECTED + self._reset_reconnect_backoff() self._sock = None self._ssl_context = None if self.config['ssl_context'] is not None: @@ -305,6 +315,7 @@ class BrokerConnection(object): else: log.debug('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED + self._reset_reconnect_backoff() self.config['state_change_callback'](self) # Connection failed @@ -340,6 +351,7 @@ class BrokerConnection(object): log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username']) log.debug('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED + self._reset_reconnect_backoff() self.config['state_change_callback'](self) return self.state @@ -475,11 +487,19 @@ class BrokerConnection(object): re-establish a connection yet """ if self.state is ConnectionStates.DISCONNECTED: - backoff = self.config['reconnect_backoff_ms'] / 1000.0 - if time.time() < self.last_attempt + backoff: + if time.time() < self.last_attempt + self._reconnect_backoff: return True return False + def connection_delay(self): + time_waited_ms = time.time() - (self.last_attempt or 0) + if self.state is ConnectionStates.DISCONNECTED: + return max(self._reconnect_backoff - time_waited_ms, 0) + elif self.connecting(): + return 0 + else: + return 999999999 + def connected(self): """Return True iff socket is connected.""" return self.state is ConnectionStates.CONNECTED @@ -495,6 +515,19 @@ class BrokerConnection(object): """Return True iff socket is closed""" return self.state is ConnectionStates.DISCONNECTED + def _reset_reconnect_backoff(self): + self._failures = 0 + self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0 + + def _update_reconnect_backoff(self): + if self.config['reconnect_backoff_max_ms'] > self.config['reconnect_backoff_ms']: + self._failures += 1 + self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1) + self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms']) + self._reconnect_backoff *= uniform(0.8, 1.2) + self._reconnect_backoff /= 1000.0 + log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures) + def close(self, error=None): """Close socket and fail all in-flight-requests. @@ -512,6 +545,7 @@ class BrokerConnection(object): log.info('%s: Closing connection. %s', self, error or '') self.state = ConnectionStates.DISCONNECTING self.config['state_change_callback'](self) + self._update_reconnect_backoff() if self._sock: self._sock.close() self._sock = None |