From a6b9b135077f7b578f05470afb814d5df5b93ba7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 4 Aug 2016 12:01:48 -0700 Subject: Add connection-creation-rate / connection-close-rate to KafkaClientMetrics --- kafka/client_async.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index 8af4acc..708d03d 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -273,6 +273,8 @@ class KafkaClient(object): except KeyError: pass self._selector.register(conn._sock, selectors.EVENT_READ, conn) + if self._sensors: + self._sensors.connection_created.record() if 'bootstrap' in self._conns and node_id != 'bootstrap': bootstrap = self._conns.pop('bootstrap') @@ -289,6 +291,8 @@ class KafkaClient(object): self._selector.unregister(conn._sock) except KeyError: pass + if self._sensors: + self._sensors.connection_closed.record() if self._refresh_on_disconnects and not self._closed: log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() @@ -892,6 +896,15 @@ class KafkaClientMetrics(object): self.metrics = metrics self.metric_group_name = metric_group_prefix + '-metrics' + self.connection_closed = metrics.sensor('connections-closed') + self.connection_closed.add(metrics.metric_name( + 'connection-close-rate', self.metric_group_name, + 'Connections closed per second in the window.'), Rate()) + self.connection_created = metrics.sensor('connections-created') + self.connection_created.add(metrics.metric_name( + 'connection-creation-rate', self.metric_group_name, + 'New connections established per second in the window.'), Rate()) + self.select_time = metrics.sensor('select-time') self.select_time.add(metrics.metric_name( 'select-rate', self.metric_group_name, -- cgit v1.2.1 From 3ed27c60eccee0a9a20307ee1c9fd58720cf9d4b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 4 Aug 2016 12:02:24 -0700 Subject: Add connection-count to KafkaClient metrics --- kafka/client_async.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index 708d03d..ff566ca 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -24,6 +24,7 @@ from .cluster import ClusterMetadata from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi from . import errors as Errors from .future import Future +from .metrics import AnonMeasurable from .metrics.stats import Avg, Count, Rate from .metrics.stats.rate import TimeUnit from .protocol.metadata import MetadataRequest @@ -187,10 +188,13 @@ class KafkaClient(object): self._wake_lock = threading.Lock() self._selector.register(self._wake_r, selectors.EVENT_READ) self._closed = False - self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._sensors = None if self.config['metrics']: - self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix']) + self._sensors = KafkaClientMetrics(self.config['metrics'], + self.config['metric_group_prefix'], + self._conns) + + self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) # Check Broker Version if not set explicitly if self.config['api_version'] is None: @@ -892,7 +896,7 @@ class DelayedTaskQueue(object): class KafkaClientMetrics(object): - def __init__(self, metrics, metric_group_prefix): + def __init__(self, metrics, metric_group_prefix, conns): self.metrics = metrics self.metric_group_name = metric_group_prefix + '-metrics' @@ -928,3 +932,8 @@ class KafkaClientMetrics(object): 'io-ratio', self.metric_group_name, 'The fraction of time the I/O thread spent doing I/O'), Rate(time_unit=TimeUnit.NANOSECONDS)) + + metrics.add_metric(metrics.metric_name( + 'connection-count', self.metric_group_name, + 'The current number of active connections.'), AnonMeasurable( + lambda config, now: len(conns))) -- cgit v1.2.1 From 460f0784a30f303b4543763ca330cce52d6054eb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 4 Aug 2016 12:21:40 -0700 Subject: Instrument metrics in BrokerConnection --- kafka/client_async.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index ff566ca..ce1d13b 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -222,6 +222,7 @@ class KafkaClient(object): cb = functools.partial(self._conn_state_change, 'bootstrap') bootstrap = BrokerConnection(host, port, afi, state_change_callback=cb, + node_id='bootstrap', **self.config) bootstrap.connect() while bootstrap.connecting(): @@ -313,6 +314,7 @@ class KafkaClient(object): cb = functools.partial(self._conn_state_change, node_id) self._conns[node_id] = BrokerConnection(host, broker.port, afi, state_change_callback=cb, + node_id=node_id, **self.config) conn = self._conns[node_id] if conn.connected(): -- cgit v1.2.1