summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py30
1 files changed, 27 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 8af4acc..ce1d13b 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -24,6 +24,7 @@ from .cluster import ClusterMetadata
from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from . import errors as Errors
from .future import Future
+from .metrics import AnonMeasurable
from .metrics.stats import Avg, Count, Rate
from .metrics.stats.rate import TimeUnit
from .protocol.metadata import MetadataRequest
@@ -187,10 +188,13 @@ class KafkaClient(object):
self._wake_lock = threading.Lock()
self._selector.register(self._wake_r, selectors.EVENT_READ)
self._closed = False
- self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._sensors = None
if self.config['metrics']:
- self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix'])
+ self._sensors = KafkaClientMetrics(self.config['metrics'],
+ self.config['metric_group_prefix'],
+ self._conns)
+
+ self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
# Check Broker Version if not set explicitly
if self.config['api_version'] is None:
@@ -218,6 +222,7 @@ class KafkaClient(object):
cb = functools.partial(self._conn_state_change, 'bootstrap')
bootstrap = BrokerConnection(host, port, afi,
state_change_callback=cb,
+ node_id='bootstrap',
**self.config)
bootstrap.connect()
while bootstrap.connecting():
@@ -273,6 +278,8 @@ class KafkaClient(object):
except KeyError:
pass
self._selector.register(conn._sock, selectors.EVENT_READ, conn)
+ if self._sensors:
+ self._sensors.connection_created.record()
if 'bootstrap' in self._conns and node_id != 'bootstrap':
bootstrap = self._conns.pop('bootstrap')
@@ -289,6 +296,8 @@ class KafkaClient(object):
self._selector.unregister(conn._sock)
except KeyError:
pass
+ if self._sensors:
+ self._sensors.connection_closed.record()
if self._refresh_on_disconnects and not self._closed:
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
@@ -305,6 +314,7 @@ class KafkaClient(object):
cb = functools.partial(self._conn_state_change, node_id)
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
state_change_callback=cb,
+ node_id=node_id,
**self.config)
conn = self._conns[node_id]
if conn.connected():
@@ -888,10 +898,19 @@ class DelayedTaskQueue(object):
class KafkaClientMetrics(object):
- def __init__(self, metrics, metric_group_prefix):
+ def __init__(self, metrics, metric_group_prefix, conns):
self.metrics = metrics
self.metric_group_name = metric_group_prefix + '-metrics'
+ self.connection_closed = metrics.sensor('connections-closed')
+ self.connection_closed.add(metrics.metric_name(
+ 'connection-close-rate', self.metric_group_name,
+ 'Connections closed per second in the window.'), Rate())
+ self.connection_created = metrics.sensor('connections-created')
+ self.connection_created.add(metrics.metric_name(
+ 'connection-creation-rate', self.metric_group_name,
+ 'New connections established per second in the window.'), Rate())
+
self.select_time = metrics.sensor('select-time')
self.select_time.add(metrics.metric_name(
'select-rate', self.metric_group_name,
@@ -915,3 +934,8 @@ class KafkaClientMetrics(object):
'io-ratio', self.metric_group_name,
'The fraction of time the I/O thread spent doing I/O'),
Rate(time_unit=TimeUnit.NANOSECONDS))
+
+ metrics.add_metric(metrics.metric_name(
+ 'connection-count', self.metric_group_name,
+ 'The current number of active connections.'), AnonMeasurable(
+ lambda config, now: len(conns)))