diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 52 |
1 files changed, 51 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index c081f07..dee4a12 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -24,6 +24,8 @@ from .cluster import ClusterMetadata from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi from . import errors as Errors from .future import Future +from .metrics.stats import Avg, Count, Rate +from .metrics.stats.rate import TimeUnit from .protocol.metadata import MetadataRequest from .protocol.produce import ProduceRequest from . import socketpair @@ -65,6 +67,8 @@ class KafkaClient(object): 'api_version': None, 'api_version_auto_timeout_ms': 2000, 'selector': selectors.DefaultSelector, + 'metrics': None, + 'metric_group_prefix': '', } API_VERSIONS = [ (0, 10), @@ -139,6 +143,9 @@ class KafkaClient(object): selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector + metrics (kafka.metrics.Metrics): Optionally provide a metrics + instance for capturing network IO stats. Default: None. + metric_group_prefix (str): Prefix for metric names. Default: '' """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -167,6 +174,9 @@ class KafkaClient(object): self._selector.register(self._wake_r, selectors.EVENT_READ) self._closed = False self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) + self._sensors = None + if self.config['metrics']: + self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix']) # Check Broker Version if not set explicitly if self.config['api_version'] is None: @@ -487,7 +497,14 @@ class KafkaClient(object): responses = [] processed = set() - for key, events in self._selector.select(timeout): + + start_select = time.time() + ready = self._selector.select(timeout) + end_select = time.time() + if self._sensors: + self._sensors.select_time.record((end_select - start_select) * 1000000000) + + for key, events in ready: if key.fileobj is self._wake_r: self._clear_wake_fd() continue @@ -531,6 +548,9 @@ class KafkaClient(object): response = conn.recv() if response: responses.append(response) + + if self._sensors: + self._sensors.io_time.record((time.time() - end_select) * 1000000000) return responses def in_flight_request_count(self, node_id=None): @@ -848,3 +868,33 @@ class DelayedTaskQueue(object): break ready_tasks.append(task) return ready_tasks + + +class KafkaClientMetrics(object): + def __init__(self, metrics, metric_group_prefix): + self.metrics = metrics + self.metric_group_name = metric_group_prefix + '-metrics' + + self.select_time = metrics.sensor('select-time') + self.select_time.add(metrics.metric_name( + 'select-rate', self.metric_group_name, + 'Number of times the I/O layer checked for new I/O to perform per' + ' second'), Rate(sampled_stat=Count())) + self.select_time.add(metrics.metric_name( + 'io-wait-time-ns-avg', self.metric_group_name, + 'The average length of time the I/O thread spent waiting for a' + ' socket ready for reads or writes in nanoseconds.'), Avg()) + self.select_time.add(metrics.metric_name( + 'io-wait-ratio', self.metric_group_name, + 'The fraction of time the I/O thread spent waiting.'), + Rate(time_unit=TimeUnit.NANOSECONDS)) + + self.io_time = metrics.sensor('io-time') + self.io_time.add(metrics.metric_name( + 'io-time-ns-avg', self.metric_group_name, + 'The average length of time for I/O per select call in nanoseconds.'), + Avg()) + self.io_time.add(metrics.metric_name( + 'io-ratio', self.metric_group_name, + 'The fraction of time the I/O thread spent doing I/O'), + Rate(time_unit=TimeUnit.NANOSECONDS)) |