diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 708d03d..ff566ca 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -24,6 +24,7 @@ from .cluster import ClusterMetadata from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi from . import errors as Errors from .future import Future +from .metrics import AnonMeasurable from .metrics.stats import Avg, Count, Rate from .metrics.stats.rate import TimeUnit from .protocol.metadata import MetadataRequest @@ -187,10 +188,13 @@ class KafkaClient(object): self._wake_lock = threading.Lock() self._selector.register(self._wake_r, selectors.EVENT_READ) self._closed = False - self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._sensors = None if self.config['metrics']: - self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix']) + self._sensors = KafkaClientMetrics(self.config['metrics'], + self.config['metric_group_prefix'], + self._conns) + + self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) # Check Broker Version if not set explicitly if self.config['api_version'] is None: @@ -892,7 +896,7 @@ class DelayedTaskQueue(object): class KafkaClientMetrics(object): - def __init__(self, metrics, metric_group_prefix): + def __init__(self, metrics, metric_group_prefix, conns): self.metrics = metrics self.metric_group_name = metric_group_prefix + '-metrics' @@ -928,3 +932,8 @@ class KafkaClientMetrics(object): 'io-ratio', self.metric_group_name, 'The fraction of time the I/O thread spent doing I/O'), Rate(time_unit=TimeUnit.NANOSECONDS)) + + metrics.add_metric(metrics.metric_name( + 'connection-count', self.metric_group_name, + 'The current number of active connections.'), AnonMeasurable( + lambda config, now: len(conns))) |