diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 95 |
1 files changed, 94 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 16ebb99..e1b10b3 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -135,6 +135,7 @@ class KafkaClient(object): 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, 'request_timeout_ms': 40000, + 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, @@ -194,6 +195,7 @@ class KafkaClient(object): self._wake_r.setblocking(False) self._wake_lock = threading.Lock() self._selector.register(self._wake_r, selectors.EVENT_READ) + self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms']) self._closed = False self._sensors = None if self.config['metrics']: @@ -291,6 +293,8 @@ class KafkaClient(object): if self._sensors: self._sensors.connection_created.record() + self._idle_expiry_manager.update(node_id) + if 'bootstrap' in self._conns and node_id != 'bootstrap': bootstrap = self._conns.pop('bootstrap') # XXX: make conn.close() require error to cause refresh @@ -308,7 +312,13 @@ class KafkaClient(object): pass if self._sensors: self._sensors.connection_closed.record() - if self._refresh_on_disconnects and not self._closed: + + idle_disconnect = False + if self._idle_expiry_manager.is_expired(node_id): + idle_disconnect = True + self._idle_expiry_manager.remove(node_id) + + if self._refresh_on_disconnects and not self._closed and not idle_disconnect: log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() @@ -514,10 +524,12 @@ class KafkaClient(object): if future and future.is_done: timeout = 0 else: + idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() timeout = min( timeout_ms, metadata_timeout_ms, self._delayed_tasks.next_at() * 1000, + idle_connection_timeout_ms, self.config['request_timeout_ms']) timeout = max(0, timeout / 1000.0) # avoid negative timeouts @@ -572,6 +584,8 @@ class KafkaClient(object): conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests')) continue + self._idle_expiry_manager.update(conn.node_id) + # Accumulate as many responses as the connection has pending while conn.in_flight_requests: response = conn.recv() # Note: conn.recv runs callbacks / errbacks @@ -601,6 +615,7 @@ class KafkaClient(object): if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) + self._maybe_close_oldest_connection() return responses def in_flight_request_count(self, node_id=None): @@ -846,6 +861,14 @@ class KafkaClient(object): except socket.error: break + def _maybe_close_oldest_connection(self): + expired_connection = self._idle_expiry_manager.poll_expired_connection() + if expired_connection: + conn_id, ts = expired_connection + idle_ms = (time.time() - ts) * 1000 + log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms) + self.close(node_id=conn_id) + class DelayedTaskQueue(object): # see https://docs.python.org/2/library/heapq.html @@ -920,6 +943,76 @@ class DelayedTaskQueue(object): return ready_tasks +# OrderedDict requires python2.7+ +try: + from collections import OrderedDict +except ImportError: + # If we dont have OrderedDict, we'll fallback to dict with O(n) priority reads + OrderedDict = dict + + +class IdleConnectionManager(object): + def __init__(self, connections_max_idle_ms): + if connections_max_idle_ms > 0: + self.connections_max_idle = connections_max_idle_ms / 1000 + else: + self.connections_max_idle = float('inf') + self.next_idle_close_check_time = None + self.update_next_idle_close_check_time(time.time()) + self.lru_connections = OrderedDict() + + def update(self, conn_id): + # order should reflect last-update + if conn_id in self.lru_connections: + del self.lru_connections[conn_id] + self.lru_connections[conn_id] = time.time() + + def remove(self, conn_id): + if conn_id in self.lru_connections: + del self.lru_connections[conn_id] + + def is_expired(self, conn_id): + if conn_id not in self.lru_connections: + return None + return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle + + def next_check_ms(self): + now = time.time() + if not self.lru_connections: + return float('inf') + elif self.next_idle_close_check_time <= now: + return 0 + else: + return int((self.next_idle_close_check_time - now) * 1000) + + def update_next_idle_close_check_time(self, ts): + self.next_idle_close_check_time = ts + self.connections_max_idle + + def poll_expired_connection(self): + if time.time() < self.next_idle_close_check_time: + return None + + if not len(self.lru_connections): + return None + + oldest_conn_id = None + oldest_ts = None + if OrderedDict is dict: + for conn_id, ts in self.lru_connections.items(): + if oldest_conn_id is None or ts < oldest_ts: + oldest_conn_id = conn_id + oldest_ts = ts + else: + (oldest_conn_id, oldest_ts) = next(iter(self.lru_connections.items())) + + self.update_next_idle_close_check_time(oldest_ts) + + if time.time() >= oldest_ts + self.connections_max_idle: + return (oldest_conn_id, oldest_ts) + else: + return None + + class KafkaClientMetrics(object): def __init__(self, metrics, metric_group_prefix, conns): self.metrics = metrics |