diff options
-rw-r--r-- | kafka/client_async.py | 95 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 2 | ||||
-rw-r--r-- | test/test_client_async.py | 38 |
3 files changed, 131 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 16ebb99..e1b10b3 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -135,6 +135,7 @@ class KafkaClient(object): 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, 'request_timeout_ms': 40000, + 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, @@ -194,6 +195,7 @@ class KafkaClient(object): self._wake_r.setblocking(False) self._wake_lock = threading.Lock() self._selector.register(self._wake_r, selectors.EVENT_READ) + self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms']) self._closed = False self._sensors = None if self.config['metrics']: @@ -291,6 +293,8 @@ class KafkaClient(object): if self._sensors: self._sensors.connection_created.record() + self._idle_expiry_manager.update(node_id) + if 'bootstrap' in self._conns and node_id != 'bootstrap': bootstrap = self._conns.pop('bootstrap') # XXX: make conn.close() require error to cause refresh @@ -308,7 +312,13 @@ class KafkaClient(object): pass if self._sensors: self._sensors.connection_closed.record() - if self._refresh_on_disconnects and not self._closed: + + idle_disconnect = False + if self._idle_expiry_manager.is_expired(node_id): + idle_disconnect = True + self._idle_expiry_manager.remove(node_id) + + if self._refresh_on_disconnects and not self._closed and not idle_disconnect: log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() @@ -514,10 +524,12 @@ class KafkaClient(object): if future and future.is_done: timeout = 0 else: + idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() timeout = min( timeout_ms, metadata_timeout_ms, self._delayed_tasks.next_at() * 1000, + idle_connection_timeout_ms, self.config['request_timeout_ms']) timeout = max(0, timeout / 1000.0) # avoid negative timeouts @@ -572,6 +584,8 @@ class KafkaClient(object): conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests')) continue + self._idle_expiry_manager.update(conn.node_id) + # Accumulate as many responses as the connection has pending while conn.in_flight_requests: response = conn.recv() # Note: conn.recv runs callbacks / errbacks @@ -601,6 +615,7 @@ class KafkaClient(object): if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) + self._maybe_close_oldest_connection() return responses def in_flight_request_count(self, node_id=None): @@ -846,6 +861,14 @@ class KafkaClient(object): except socket.error: break + def _maybe_close_oldest_connection(self): + expired_connection = self._idle_expiry_manager.poll_expired_connection() + if expired_connection: + conn_id, ts = expired_connection + idle_ms = (time.time() - ts) * 1000 + log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms) + self.close(node_id=conn_id) + class DelayedTaskQueue(object): # see https://docs.python.org/2/library/heapq.html @@ -920,6 +943,76 @@ class DelayedTaskQueue(object): return ready_tasks +# OrderedDict requires python2.7+ +try: + from collections import OrderedDict +except ImportError: + # If we dont have OrderedDict, we'll fallback to dict with O(n) priority reads + OrderedDict = dict + + +class IdleConnectionManager(object): + def __init__(self, connections_max_idle_ms): + if connections_max_idle_ms > 0: + self.connections_max_idle = connections_max_idle_ms / 1000 + else: + self.connections_max_idle = float('inf') + self.next_idle_close_check_time = None + self.update_next_idle_close_check_time(time.time()) + self.lru_connections = OrderedDict() + + def update(self, conn_id): + # order should reflect last-update + if conn_id in self.lru_connections: + del self.lru_connections[conn_id] + self.lru_connections[conn_id] = time.time() + + def remove(self, conn_id): + if conn_id in self.lru_connections: + del self.lru_connections[conn_id] + + def is_expired(self, conn_id): + if conn_id not in self.lru_connections: + return None + return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle + + def next_check_ms(self): + now = time.time() + if not self.lru_connections: + return float('inf') + elif self.next_idle_close_check_time <= now: + return 0 + else: + return int((self.next_idle_close_check_time - now) * 1000) + + def update_next_idle_close_check_time(self, ts): + self.next_idle_close_check_time = ts + self.connections_max_idle + + def poll_expired_connection(self): + if time.time() < self.next_idle_close_check_time: + return None + + if not len(self.lru_connections): + return None + + oldest_conn_id = None + oldest_ts = None + if OrderedDict is dict: + for conn_id, ts in self.lru_connections.items(): + if oldest_conn_id is None or ts < oldest_ts: + oldest_conn_id = conn_id + oldest_ts = ts + else: + (oldest_conn_id, oldest_ts) = next(iter(self.lru_connections.items())) + + self.update_next_idle_close_check_time(oldest_ts) + + if time.time() >= oldest_ts + self.connections_max_idle: + return (oldest_conn_id, oldest_ts) + else: + return None + + class KafkaClientMetrics(object): def __init__(self, metrics, metric_group_prefix, conns): self.metrics = metrics diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 91e253b..22f60bd 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -266,7 +266,7 @@ class KafkaProducer(object): 'linger_ms': 0, 'partitioner': DefaultPartitioner(), 'buffer_memory': 33554432, - 'connections_max_idle_ms': 600000, # not implemented yet + 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet 'max_block_ms': 60000, 'max_request_size': 1048576, 'metadata_max_age_ms': 300000, diff --git a/test/test_client_async.py b/test/test_client_async.py index 8f6ac3f..d4e6d37 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import, division + # selectors in stdlib as of py3.4 try: import selectors # pylint: disable=import-error @@ -10,7 +12,7 @@ import time import pytest -from kafka.client_async import KafkaClient +from kafka.client_async import KafkaClient, IdleConnectionManager from kafka.conn import ConnectionStates import kafka.errors as Errors from kafka.future import Future @@ -319,7 +321,10 @@ def client(mocker): mocker.patch.object(KafkaClient, '_bootstrap') _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9)) + cli = KafkaClient(request_timeout_ms=9999999, + reconnect_backoff_ms=2222, + connections_max_idle_ms=float('inf'), + api_version=(0, 9)) tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') tasks.return_value = 9999999 @@ -395,3 +400,32 @@ def test_schedule(): def test_unschedule(): pass + + +def test_idle_connection_manager(mocker): + t = mocker.patch.object(time, 'time') + t.return_value = 0 + + idle = IdleConnectionManager(100) + assert idle.next_check_ms() == float('inf') + + idle.update('foo') + assert not idle.is_expired('foo') + assert idle.poll_expired_connection() is None + assert idle.next_check_ms() == 100 + + t.return_value = 90 / 1000 + assert not idle.is_expired('foo') + assert idle.poll_expired_connection() is None + assert idle.next_check_ms() == 10 + + t.return_value = 100 / 1000 + assert idle.is_expired('foo') + assert idle.next_check_ms() == 0 + + conn_id, conn_ts = idle.poll_expired_connection() + assert conn_id == 'foo' + assert conn_ts == 0 + + idle.remove('foo') + assert idle.next_check_ms() == float('inf') |