summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/group.py29
-rw-r--r--kafka/coordinator/consumer.py71
-rw-r--r--test/test_coordinator.py19
3 files changed, 66 insertions, 53 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 0a78e7f..afcc996 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -12,6 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.metrics import DictReporter, MetricConfig, Metrics
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import TopicPartition
from kafka.version import __version__
@@ -143,6 +144,13 @@ class KafkaConsumer(six.Iterator):
offset commits; 0.8.0 is what is left. If set to 'auto', will
attempt to infer the broker version by probing various APIs.
Default: auto
+ metric_reporters (list): A list of classes to use as metrics reporters.
+ Implementing the AbstractMetricsReporter interface allows plugging
+ in classes that will be notified of new metric creation. Default: []
+ metrics_num_samples (int): The number of samples maintained to compute
+ metrics. Default: 2
+ metrics_sample_window_ms (int): The number of samples maintained to
+ compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@@ -181,9 +189,9 @@ class KafkaConsumer(six.Iterator):
'ssl_keyfile': None,
'api_version': 'auto',
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
- #'metric_reporters': None,
- #'metrics_num_samples': 2,
- #'metrics_sample_window_ms': 30000,
+ 'metric_reporters': [],
+ 'metrics_num_samples': 2,
+ 'metrics_sample_window_ms': 30000,
}
def __init__(self, *topics, **configs):
@@ -202,6 +210,16 @@ class KafkaConsumer(six.Iterator):
new_config, self.config['auto_offset_reset'])
self.config['auto_offset_reset'] = new_config
+ metrics_tags = {'client-id': self.config['client_id']}
+ metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
+ time_window_ms=self.config['metrics_sample_window_ms'],
+ tags=metrics_tags)
+ reporters = [reporter() for reporter in self.config['metric_reporters']]
+ reporters.append(DictReporter('kafka.consumer'))
+ self._metrics = Metrics(metric_config, reporters)
+ metric_group_prefix = 'consumer'
+ # TODO _metrics likely needs to be passed to KafkaClient, Fetcher, etc.
+
self._client = KafkaClient(**self.config)
# Check Broker Version if not set explicitly
@@ -217,14 +235,13 @@ class KafkaConsumer(six.Iterator):
self._fetcher = Fetcher(
self._client, self._subscription, **self.config)
self._coordinator = ConsumerCoordinator(
- self._client, self._subscription,
+ self._client, self._subscription, self._metrics, metric_group_prefix,
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
self._iterator = None
self._consumer_timeout = float('inf')
- #self.metrics = None
if topics:
self._subscription.subscribe(topics=topics)
self._client.set_topics(topics)
@@ -277,7 +294,7 @@ class KafkaConsumer(six.Iterator):
log.debug("Closing the KafkaConsumer.")
self._closed = True
self._coordinator.close()
- #self.metrics.close()
+ self._metrics.close()
self._client.close()
try:
self.config['key_deserializer'].close()
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index cd3d48a..50d2806 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -14,6 +14,8 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ConsumerProtocol
from .. import errors as Errors
from ..future import Future
+from ..metrics import AnonMeasurable
+from ..metrics.stats import Avg, Count, Max, Rate
from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from ..structs import OffsetAndMetadata, TopicPartition
from ..util import WeakMethod
@@ -36,7 +38,8 @@ class ConsumerCoordinator(BaseCoordinator):
'api_version': (0, 9),
}
- def __init__(self, client, subscription, **configs):
+ def __init__(self, client, subscription, metrics, metric_group_prefix,
+ **configs):
"""Initialize the coordination manager.
Keyword Arguments:
@@ -97,10 +100,8 @@ class ConsumerCoordinator(BaseCoordinator):
interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
- # metrics=None,
- # metric_group_prefix=None,
- # metric_tags=None,
- # self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+ self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
+ self._subscription)
def __del__(self):
if hasattr(self, '_auto_commit_task') and self._auto_commit_task:
@@ -470,12 +471,13 @@ class ConsumerCoordinator(BaseCoordinator):
future = Future()
_f = self._client.send(node_id, request)
- _f.add_callback(self._handle_offset_commit_response, offsets, future)
+ _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time())
_f.add_errback(self._failed_request, node_id, request, future)
return future
- def _handle_offset_commit_response(self, offsets, future, response):
- #self.sensors.commit_latency.record(response.requestLatencyMs())
+ def _handle_offset_commit_response(self, offsets, future, send_time, response):
+ # TODO look at adding request_latency_ms to response (like java kafka)
+ self._sensors.commit_latency.record((time.time() - send_time) * 1000)
unauthorized_topics = set()
for topic, partitions in response.topics:
@@ -720,38 +722,25 @@ class AutoCommitTask(object):
self._reschedule(next_at)
-# TODO
-"""
class ConsumerCoordinatorMetrics(object):
- def __init__(self, metrics, prefix, tags):
+ def __init__(self, metrics, metric_group_prefix, subscription):
self.metrics = metrics
- self.group_name = prefix + "-coordinator-metrics"
-
- self.commit_latency = metrics.sensor("commit-latency")
- self.commit_latency.add(metrics.MetricName(
- "commit-latency-avg", self.group_name,
- "The average time taken for a commit request",
- tags), metrics.Avg())
- self.commit_latency.add(metrics.MetricName(
- "commit-latency-max", self.group_name,
- "The max time taken for a commit request",
- tags), metrics.Max())
- self.commit_latency.add(metrics.MetricName(
- "commit-rate", self.group_name,
- "The number of commit calls per second",
- tags), metrics.Rate(metrics.Count()))
-
- '''
- def _num_partitions(config, now):
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return subscriptions.assignedPartitions().size();
- }
- };
- metrics.addMetric(new MetricName("assigned-partitions",
- this.metricGrpName,
- "The number of partitions currently assigned to this consumer",
- tags),
- numParts);
- '''
-"""
+ self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix
+
+ self.commit_latency = metrics.sensor('commit-latency')
+ self.commit_latency.add(metrics.metric_name(
+ 'commit-latency-avg', self.metric_group_name,
+ 'The average time taken for a commit request'), Avg())
+ self.commit_latency.add(metrics.metric_name(
+ 'commit-latency-max', self.metric_group_name,
+ 'The max time taken for a commit request'), Max())
+ self.commit_latency.add(metrics.metric_name(
+ 'commit-rate', self.metric_group_name,
+ 'The number of commit calls per second'), Rate(sampled_stat=Count()))
+
+ num_parts = AnonMeasurable(lambda config, now:
+ len(subscription.assigned_partitions()))
+ metrics.add_metric(metrics.metric_name(
+ 'assigned-partitions', self.metric_group_name,
+ 'The number of partitions currently assigned to this consumer'),
+ num_parts)
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 399609d..4b90f30 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -1,5 +1,6 @@
# pylint: skip-file
from __future__ import absolute_import
+import time
import pytest
@@ -14,6 +15,7 @@ from kafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
import kafka.errors as Errors
from kafka.future import Future
+from kafka.metrics import Metrics
from kafka.protocol.commit import (
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse)
@@ -23,12 +25,14 @@ from kafka.util import WeakMethod
@pytest.fixture
def coordinator(conn):
- return ConsumerCoordinator(KafkaClient(), SubscriptionState())
+ return ConsumerCoordinator(KafkaClient(), SubscriptionState(), Metrics(),
+ 'consumer')
def test_init(conn):
cli = KafkaClient()
- coordinator = ConsumerCoordinator(cli, SubscriptionState())
+ coordinator = ConsumerCoordinator(cli, SubscriptionState(), Metrics(),
+ 'consumer')
# metadata update on init
assert cli.cluster._need_update is True
@@ -38,6 +42,7 @@ def test_init(conn):
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
def test_autocommit_enable_api_version(conn, api_version):
coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(),
+ Metrics(), 'consumer',
enable_auto_commit=True,
group_id='foobar',
api_version=api_version)
@@ -354,6 +359,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning')
mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(),
+ Metrics(), 'consumer',
api_version=api_version,
enable_auto_commit=enable,
group_id=group_id)
@@ -441,7 +447,7 @@ def test_send_offset_commit_request_failure(patched_coord, offsets):
assert future.exception is error
-def test_send_offset_commit_request_success(patched_coord, offsets):
+def test_send_offset_commit_request_success(mocker, patched_coord, offsets):
_f = Future()
patched_coord._client.send.return_value = _f
future = patched_coord._send_offset_commit_request(offsets)
@@ -449,7 +455,7 @@ def test_send_offset_commit_request_success(patched_coord, offsets):
response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])])
_f.success(response)
patched_coord._handle_offset_commit_response.assert_called_with(
- offsets, future, response)
+ offsets, future, mocker.ANY, response)
@pytest.mark.parametrize('response,error,dead,reassign', [
@@ -478,10 +484,11 @@ def test_send_offset_commit_request_success(patched_coord, offsets):
(OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]),
Errors.TopicAuthorizationFailedError, False, False),
])
-def test_handle_offset_commit_response(patched_coord, offsets,
+def test_handle_offset_commit_response(mocker, patched_coord, offsets,
response, error, dead, reassign):
future = Future()
- patched_coord._handle_offset_commit_response(offsets, future, response)
+ patched_coord._handle_offset_commit_response(offsets, future, time.time(),
+ response)
assert isinstance(future.exception, error)
assert patched_coord.coordinator_id is (None if dead else 0)
assert patched_coord._subscription.needs_partition_assignment is reassign