diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 29 |
1 files changed, 23 insertions, 6 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 0a78e7f..afcc996 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -12,6 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.metrics import DictReporter, MetricConfig, Metrics from kafka.protocol.offset import OffsetResetStrategy from kafka.structs import TopicPartition from kafka.version import __version__ @@ -143,6 +144,13 @@ class KafkaConsumer(six.Iterator): offset commits; 0.8.0 is what is left. If set to 'auto', will attempt to infer the broker version by probing various APIs. Default: auto + metric_reporters (list): A list of classes to use as metrics reporters. + Implementing the AbstractMetricsReporter interface allows plugging + in classes that will be notified of new metric creation. Default: [] + metrics_num_samples (int): The number of samples maintained to compute + metrics. Default: 2 + metrics_sample_window_ms (int): The number of samples maintained to + compute metrics. Default: 30000 Note: Configuration parameters are described in more detail at @@ -181,9 +189,9 @@ class KafkaConsumer(six.Iterator): 'ssl_keyfile': None, 'api_version': 'auto', 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet - #'metric_reporters': None, - #'metrics_num_samples': 2, - #'metrics_sample_window_ms': 30000, + 'metric_reporters': [], + 'metrics_num_samples': 2, + 'metrics_sample_window_ms': 30000, } def __init__(self, *topics, **configs): @@ -202,6 +210,16 @@ class KafkaConsumer(six.Iterator): new_config, self.config['auto_offset_reset']) self.config['auto_offset_reset'] = new_config + metrics_tags = {'client-id': self.config['client_id']} + metric_config = MetricConfig(samples=self.config['metrics_num_samples'], + time_window_ms=self.config['metrics_sample_window_ms'], + tags=metrics_tags) + reporters = [reporter() for reporter in self.config['metric_reporters']] + reporters.append(DictReporter('kafka.consumer')) + self._metrics = Metrics(metric_config, reporters) + metric_group_prefix = 'consumer' + # TODO _metrics likely needs to be passed to KafkaClient, Fetcher, etc. + self._client = KafkaClient(**self.config) # Check Broker Version if not set explicitly @@ -217,14 +235,13 @@ class KafkaConsumer(six.Iterator): self._fetcher = Fetcher( self._client, self._subscription, **self.config) self._coordinator = ConsumerCoordinator( - self._client, self._subscription, + self._client, self._subscription, self._metrics, metric_group_prefix, assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False self._iterator = None self._consumer_timeout = float('inf') - #self.metrics = None if topics: self._subscription.subscribe(topics=topics) self._client.set_topics(topics) @@ -277,7 +294,7 @@ class KafkaConsumer(six.Iterator): log.debug("Closing the KafkaConsumer.") self._closed = True self._coordinator.close() - #self.metrics.close() + self._metrics.close() self._client.close() try: self.config['key_deserializer'].close() |