diff options
author | Zack Dever <zdever@pandora.com> | 2016-04-07 17:46:55 -0700 |
---|---|---|
committer | Zack Dever <zdever@pandora.com> | 2016-04-13 17:26:39 -0700 |
commit | e010669b602ffdfddde6fa2a381dad6c3be1f05d (patch) | |
tree | 453f771664ac89d18a1962891580c1a1d7b19b3f | |
parent | caf4cdefe4f41b444d44ddef8f40f5ddeccf65b9 (diff) | |
download | kafka-python-e010669b602ffdfddde6fa2a381dad6c3be1f05d.tar.gz |
Beginnings of metrics instrumentation in kafka consumer.
This adds the parent metrics instance to kafka consumer, which will
eventually be used to instrument everything under consumer. To start
I ported the java consumer coordinator metrics.
-rw-r--r-- | kafka/consumer/group.py | 29 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 71 | ||||
-rw-r--r-- | test/test_coordinator.py | 19 |
3 files changed, 66 insertions, 53 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 0a78e7f..afcc996 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -12,6 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.metrics import DictReporter, MetricConfig, Metrics from kafka.protocol.offset import OffsetResetStrategy from kafka.structs import TopicPartition from kafka.version import __version__ @@ -143,6 +144,13 @@ class KafkaConsumer(six.Iterator): offset commits; 0.8.0 is what is left. If set to 'auto', will attempt to infer the broker version by probing various APIs. Default: auto + metric_reporters (list): A list of classes to use as metrics reporters. + Implementing the AbstractMetricsReporter interface allows plugging + in classes that will be notified of new metric creation. Default: [] + metrics_num_samples (int): The number of samples maintained to compute + metrics. Default: 2 + metrics_sample_window_ms (int): The number of samples maintained to + compute metrics. Default: 30000 Note: Configuration parameters are described in more detail at @@ -181,9 +189,9 @@ class KafkaConsumer(six.Iterator): 'ssl_keyfile': None, 'api_version': 'auto', 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet - #'metric_reporters': None, - #'metrics_num_samples': 2, - #'metrics_sample_window_ms': 30000, + 'metric_reporters': [], + 'metrics_num_samples': 2, + 'metrics_sample_window_ms': 30000, } def __init__(self, *topics, **configs): @@ -202,6 +210,16 @@ class KafkaConsumer(six.Iterator): new_config, self.config['auto_offset_reset']) self.config['auto_offset_reset'] = new_config + metrics_tags = {'client-id': self.config['client_id']} + metric_config = MetricConfig(samples=self.config['metrics_num_samples'], + time_window_ms=self.config['metrics_sample_window_ms'], + tags=metrics_tags) + reporters = [reporter() for reporter in self.config['metric_reporters']] + reporters.append(DictReporter('kafka.consumer')) + self._metrics = Metrics(metric_config, reporters) + metric_group_prefix = 'consumer' + # TODO _metrics likely needs to be passed to KafkaClient, Fetcher, etc. + self._client = KafkaClient(**self.config) # Check Broker Version if not set explicitly @@ -217,14 +235,13 @@ class KafkaConsumer(six.Iterator): self._fetcher = Fetcher( self._client, self._subscription, **self.config) self._coordinator = ConsumerCoordinator( - self._client, self._subscription, + self._client, self._subscription, self._metrics, metric_group_prefix, assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False self._iterator = None self._consumer_timeout = float('inf') - #self.metrics = None if topics: self._subscription.subscribe(topics=topics) self._client.set_topics(topics) @@ -277,7 +294,7 @@ class KafkaConsumer(six.Iterator): log.debug("Closing the KafkaConsumer.") self._closed = True self._coordinator.close() - #self.metrics.close() + self._metrics.close() self._client.close() try: self.config['key_deserializer'].close() diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cd3d48a..50d2806 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -14,6 +14,8 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor from .protocol import ConsumerProtocol from .. import errors as Errors from ..future import Future +from ..metrics import AnonMeasurable +from ..metrics.stats import Avg, Count, Max, Rate from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest from ..structs import OffsetAndMetadata, TopicPartition from ..util import WeakMethod @@ -36,7 +38,8 @@ class ConsumerCoordinator(BaseCoordinator): 'api_version': (0, 9), } - def __init__(self, client, subscription, **configs): + def __init__(self, client, subscription, metrics, metric_group_prefix, + **configs): """Initialize the coordination manager. Keyword Arguments: @@ -97,10 +100,8 @@ class ConsumerCoordinator(BaseCoordinator): interval = self.config['auto_commit_interval_ms'] / 1000.0 self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) - # metrics=None, - # metric_group_prefix=None, - # metric_tags=None, - # self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) + self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, + self._subscription) def __del__(self): if hasattr(self, '_auto_commit_task') and self._auto_commit_task: @@ -470,12 +471,13 @@ class ConsumerCoordinator(BaseCoordinator): future = Future() _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_commit_response, offsets, future) + _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time()) _f.add_errback(self._failed_request, node_id, request, future) return future - def _handle_offset_commit_response(self, offsets, future, response): - #self.sensors.commit_latency.record(response.requestLatencyMs()) + def _handle_offset_commit_response(self, offsets, future, send_time, response): + # TODO look at adding request_latency_ms to response (like java kafka) + self._sensors.commit_latency.record((time.time() - send_time) * 1000) unauthorized_topics = set() for topic, partitions in response.topics: @@ -720,38 +722,25 @@ class AutoCommitTask(object): self._reschedule(next_at) -# TODO -""" class ConsumerCoordinatorMetrics(object): - def __init__(self, metrics, prefix, tags): + def __init__(self, metrics, metric_group_prefix, subscription): self.metrics = metrics - self.group_name = prefix + "-coordinator-metrics" - - self.commit_latency = metrics.sensor("commit-latency") - self.commit_latency.add(metrics.MetricName( - "commit-latency-avg", self.group_name, - "The average time taken for a commit request", - tags), metrics.Avg()) - self.commit_latency.add(metrics.MetricName( - "commit-latency-max", self.group_name, - "The max time taken for a commit request", - tags), metrics.Max()) - self.commit_latency.add(metrics.MetricName( - "commit-rate", self.group_name, - "The number of commit calls per second", - tags), metrics.Rate(metrics.Count())) - - ''' - def _num_partitions(config, now): - new Measurable() { - public double measure(MetricConfig config, long now) { - return subscriptions.assignedPartitions().size(); - } - }; - metrics.addMetric(new MetricName("assigned-partitions", - this.metricGrpName, - "The number of partitions currently assigned to this consumer", - tags), - numParts); - ''' -""" + self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix + + self.commit_latency = metrics.sensor('commit-latency') + self.commit_latency.add(metrics.metric_name( + 'commit-latency-avg', self.metric_group_name, + 'The average time taken for a commit request'), Avg()) + self.commit_latency.add(metrics.metric_name( + 'commit-latency-max', self.metric_group_name, + 'The max time taken for a commit request'), Max()) + self.commit_latency.add(metrics.metric_name( + 'commit-rate', self.metric_group_name, + 'The number of commit calls per second'), Rate(sampled_stat=Count())) + + num_parts = AnonMeasurable(lambda config, now: + len(subscription.assigned_partitions())) + metrics.add_metric(metrics.metric_name( + 'assigned-partitions', self.metric_group_name, + 'The number of partitions currently assigned to this consumer'), + num_parts) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 399609d..4b90f30 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -1,5 +1,6 @@ # pylint: skip-file from __future__ import absolute_import +import time import pytest @@ -14,6 +15,7 @@ from kafka.coordinator.protocol import ( ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) import kafka.errors as Errors from kafka.future import Future +from kafka.metrics import Metrics from kafka.protocol.commit import ( OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse) @@ -23,12 +25,14 @@ from kafka.util import WeakMethod @pytest.fixture def coordinator(conn): - return ConsumerCoordinator(KafkaClient(), SubscriptionState()) + return ConsumerCoordinator(KafkaClient(), SubscriptionState(), Metrics(), + 'consumer') def test_init(conn): cli = KafkaClient() - coordinator = ConsumerCoordinator(cli, SubscriptionState()) + coordinator = ConsumerCoordinator(cli, SubscriptionState(), Metrics(), + 'consumer') # metadata update on init assert cli.cluster._need_update is True @@ -38,6 +42,7 @@ def test_init(conn): @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) def test_autocommit_enable_api_version(conn, api_version): coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), + Metrics(), 'consumer', enable_auto_commit=True, group_id='foobar', api_version=api_version) @@ -354,6 +359,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning') mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), + Metrics(), 'consumer', api_version=api_version, enable_auto_commit=enable, group_id=group_id) @@ -441,7 +447,7 @@ def test_send_offset_commit_request_failure(patched_coord, offsets): assert future.exception is error -def test_send_offset_commit_request_success(patched_coord, offsets): +def test_send_offset_commit_request_success(mocker, patched_coord, offsets): _f = Future() patched_coord._client.send.return_value = _f future = patched_coord._send_offset_commit_request(offsets) @@ -449,7 +455,7 @@ def test_send_offset_commit_request_success(patched_coord, offsets): response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])]) _f.success(response) patched_coord._handle_offset_commit_response.assert_called_with( - offsets, future, response) + offsets, future, mocker.ANY, response) @pytest.mark.parametrize('response,error,dead,reassign', [ @@ -478,10 +484,11 @@ def test_send_offset_commit_request_success(patched_coord, offsets): (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]), Errors.TopicAuthorizationFailedError, False, False), ]) -def test_handle_offset_commit_response(patched_coord, offsets, +def test_handle_offset_commit_response(mocker, patched_coord, offsets, response, error, dead, reassign): future = Future() - patched_coord._handle_offset_commit_response(offsets, future, response) + patched_coord._handle_offset_commit_response(offsets, future, time.time(), + response) assert isinstance(future.exception, error) assert patched_coord.coordinator_id is (None if dead else 0) assert patched_coord._subscription.needs_partition_assignment is reassign |