summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py52
1 files changed, 51 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index c081f07..dee4a12 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -24,6 +24,8 @@ from .cluster import ClusterMetadata
from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from . import errors as Errors
from .future import Future
+from .metrics.stats import Avg, Count, Rate
+from .metrics.stats.rate import TimeUnit
from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest
from . import socketpair
@@ -65,6 +67,8 @@ class KafkaClient(object):
'api_version': None,
'api_version_auto_timeout_ms': 2000,
'selector': selectors.DefaultSelector,
+ 'metrics': None,
+ 'metric_group_prefix': '',
}
API_VERSIONS = [
(0, 10),
@@ -139,6 +143,9 @@ class KafkaClient(object):
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
+ metrics (kafka.metrics.Metrics): Optionally provide a metrics
+ instance for capturing network IO stats. Default: None.
+ metric_group_prefix (str): Prefix for metric names. Default: ''
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -167,6 +174,9 @@ class KafkaClient(object):
self._selector.register(self._wake_r, selectors.EVENT_READ)
self._closed = False
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
+ self._sensors = None
+ if self.config['metrics']:
+ self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix'])
# Check Broker Version if not set explicitly
if self.config['api_version'] is None:
@@ -487,7 +497,14 @@ class KafkaClient(object):
responses = []
processed = set()
- for key, events in self._selector.select(timeout):
+
+ start_select = time.time()
+ ready = self._selector.select(timeout)
+ end_select = time.time()
+ if self._sensors:
+ self._sensors.select_time.record((end_select - start_select) * 1000000000)
+
+ for key, events in ready:
if key.fileobj is self._wake_r:
self._clear_wake_fd()
continue
@@ -531,6 +548,9 @@ class KafkaClient(object):
response = conn.recv()
if response:
responses.append(response)
+
+ if self._sensors:
+ self._sensors.io_time.record((time.time() - end_select) * 1000000000)
return responses
def in_flight_request_count(self, node_id=None):
@@ -848,3 +868,33 @@ class DelayedTaskQueue(object):
break
ready_tasks.append(task)
return ready_tasks
+
+
+class KafkaClientMetrics(object):
+ def __init__(self, metrics, metric_group_prefix):
+ self.metrics = metrics
+ self.metric_group_name = metric_group_prefix + '-metrics'
+
+ self.select_time = metrics.sensor('select-time')
+ self.select_time.add(metrics.metric_name(
+ 'select-rate', self.metric_group_name,
+ 'Number of times the I/O layer checked for new I/O to perform per'
+ ' second'), Rate(sampled_stat=Count()))
+ self.select_time.add(metrics.metric_name(
+ 'io-wait-time-ns-avg', self.metric_group_name,
+ 'The average length of time the I/O thread spent waiting for a'
+ ' socket ready for reads or writes in nanoseconds.'), Avg())
+ self.select_time.add(metrics.metric_name(
+ 'io-wait-ratio', self.metric_group_name,
+ 'The fraction of time the I/O thread spent waiting.'),
+ Rate(time_unit=TimeUnit.NANOSECONDS))
+
+ self.io_time = metrics.sensor('io-time')
+ self.io_time.add(metrics.metric_name(
+ 'io-time-ns-avg', self.metric_group_name,
+ 'The average length of time for I/O per select call in nanoseconds.'),
+ Avg())
+ self.io_time.add(metrics.metric_name(
+ 'io-ratio', self.metric_group_name,
+ 'The fraction of time the I/O thread spent doing I/O'),
+ Rate(time_unit=TimeUnit.NANOSECONDS))