diff options
-rw-r--r-- | kafka/coordinator/base.py | 141 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 11 | ||||
-rw-r--r-- | kafka/coordinator/heartbeat.py | 4 |
3 files changed, 80 insertions, 76 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 25dd000..bbdc8ad 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import, division + import abc import copy import logging @@ -6,12 +8,14 @@ import weakref import six -import kafka.errors as Errors -from kafka.future import Future -from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest -from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest, - LeaveGroupRequest, SyncGroupRequest) from .heartbeat import Heartbeat +from .. import errors as Errors +from ..future import Future +from ..metrics import AnonMeasurable +from ..metrics.stats import Avg, Count, Max, Rate +from ..protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest +from ..protocol.group import (HeartbeatRequest, JoinGroupRequest, + LeaveGroupRequest, SyncGroupRequest) log = logging.getLogger('kafka.coordinator') @@ -53,7 +57,7 @@ class BaseCoordinator(object): 'api_version': (0, 9), } - def __init__(self, client, **configs): + def __init__(self, client, metrics, metric_group_prefix, **configs): """ Keyword Arguments: group_id (str): name of the consumer group to join for dynamic @@ -87,7 +91,8 @@ class BaseCoordinator(object): self.needs_join_prepare = True self.heartbeat = Heartbeat(**self.config) self.heartbeat_task = HeartbeatTask(weakref.proxy(self)) - #self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) + self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics, + metric_group_prefix) def __del__(self): if hasattr(self, 'heartbeat_task') and self.heartbeat_task: @@ -254,7 +259,7 @@ class BaseCoordinator(object): continue elif not future.retriable(): raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000) def _send_join_group_request(self): """Join the group and return the assignment for the next generation. @@ -285,7 +290,7 @@ class BaseCoordinator(object): log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_join_group_response, future) + _f.add_callback(self._handle_join_group_response, future, time.time()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future @@ -300,7 +305,7 @@ class BaseCoordinator(object): self.coordinator_dead() future.failure(error) - def _handle_join_group_response(self, future, response): + def _handle_join_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.debug("Received successful JoinGroup response for group %s: %s", @@ -311,7 +316,7 @@ class BaseCoordinator(object): self.protocol = response.group_protocol log.info("Joined group '%s' (generation %s) with member_id %s", self.group_id, self.generation, self.member_id) - #self.sensors.join_latency.record(response.requestLatencyMs()) + self.sensors.join_latency.record((time.time() - send_time) * 1000) if response.leader_id == response.member_id: log.info("Elected group leader -- performing partition" " assignments using %s", self.protocol) @@ -402,17 +407,17 @@ class BaseCoordinator(object): return Future().failure(e) future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_sync_group_response, future) + _f.add_callback(self._handle_sync_group_response, future, time.time()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future - def _handle_sync_group_response(self, future, response): + def _handle_sync_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.info("Successfully joined group %s with generation %s", self.group_id, self.generation) - #self.sensors.syncLatency.record(response.requestLatencyMs()) + self.sensors.sync_latency.record((time.time() - send_time) * 1000) future.success(response.member_assignment) return @@ -540,13 +545,13 @@ class BaseCoordinator(object): log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_heartbeat_response, future) + _f.add_callback(self._handle_heartbeat_response, future, time.time()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future - def _handle_heartbeat_response(self, future, response): - #self.sensors.heartbeat_latency.record(response.requestLatencyMs()) + def _handle_heartbeat_response(self, future, send_time, response): + self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.debug("Received successful heartbeat response for group %s", @@ -651,60 +656,56 @@ class HeartbeatTask(object): def _handle_heartbeat_failure(self, e): log.warning("Heartbeat failed (%s); retrying", e) self._request_in_flight = False - etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0 + etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000 self._client.schedule(self, etd) -''' + class GroupCoordinatorMetrics(object): - def __init__(self, metrics, prefix, tags=None): + def __init__(self, heartbeat, metrics, prefix, tags=None): + self.heartbeat = heartbeat self.metrics = metrics - self.group_name = prefix + "-coordinator-metrics" - - self.heartbeat_latency = metrics.sensor("heartbeat-latency") - self.heartbeat_latency.add(metrics.metricName( - "heartbeat-response-time-max", self.group_name, - "The max time taken to receive a response to a heartbeat request", - tags), metrics.Max()) - self.heartbeat_latency.add(metrics.metricName( - "heartbeat-rate", self.group_name, - "The average number of heartbeats per second", - tags), metrics.Rate(sampled_stat=metrics.Count())) - - self.join_latency = metrics.sensor("join-latency") - self.join_latency.add(metrics.metricName( - "join-time-avg", self.group_name, - "The average time taken for a group rejoin", - tags), metrics.Avg()) - self.join_latency.add(metrics.metricName( - "join-time-max", self.group_name, - "The max time taken for a group rejoin", - tags), metrics.Avg()) - self.join_latency.add(metrics.metricName( - "join-rate", self.group_name, - "The number of group joins per second", - tags), metrics.Rate(sampled_stat=metrics.Count())) - - self.sync_latency = metrics.sensor("sync-latency") - self.sync_latency.add(metrics.metricName( - "sync-time-avg", self.group_name, - "The average time taken for a group sync", - tags), metrics.Avg()) - self.sync_latency.add(metrics.MetricName( - "sync-time-max", self.group_name, - "The max time taken for a group sync", - tags), metrics.Avg()) - self.sync_latency.add(metrics.metricName( - "sync-rate", self.group_name, - "The number of group syncs per second", - tags), metrics.Rate(sampled_stat=metrics.Count())) - - """ - lastHeartbeat = Measurable( - measure=lambda _, value: value - heartbeat.last_heartbeat_send() - ) - metrics.addMetric(metrics.metricName( - "last-heartbeat-seconds-ago", self.group_name, - "The number of seconds since the last controller heartbeat", - tags), lastHeartbeat) - """ -''' + self.metric_group_name = prefix + "-coordinator-metrics" + + self.heartbeat_latency = metrics.sensor('heartbeat-latency') + self.heartbeat_latency.add(metrics.metric_name( + 'heartbeat-response-time-max', self.metric_group_name, + 'The max time taken to receive a response to a heartbeat request', + tags), Max()) + self.heartbeat_latency.add(metrics.metric_name( + 'heartbeat-rate', self.metric_group_name, + 'The average number of heartbeats per second', + tags), Rate(sampled_stat=Count())) + + self.join_latency = metrics.sensor('join-latency') + self.join_latency.add(metrics.metric_name( + 'join-time-avg', self.metric_group_name, + 'The average time taken for a group rejoin', + tags), Avg()) + self.join_latency.add(metrics.metric_name( + 'join-time-max', self.metric_group_name, + 'The max time taken for a group rejoin', + tags), Avg()) + self.join_latency.add(metrics.metric_name( + 'join-rate', self.metric_group_name, + 'The number of group joins per second', + tags), Rate(sampled_stat=Count())) + + self.sync_latency = metrics.sensor('sync-latency') + self.sync_latency.add(metrics.metric_name( + 'sync-time-avg', self.metric_group_name, + 'The average time taken for a group sync', + tags), Avg()) + self.sync_latency.add(metrics.metric_name( + 'sync-time-max', self.metric_group_name, + 'The max time taken for a group sync', + tags), Avg()) + self.sync_latency.add(metrics.metric_name( + 'sync-rate', self.metric_group_name, + 'The number of group syncs per second', + tags), Rate(sampled_stat=Count())) + + metrics.add_metric(metrics.metric_name( + 'last-heartbeat-seconds-ago', self.metric_group_name, + 'The number of seconds since the last controller heartbeat', + tags), AnonMeasurable( + lambda _, now: (now / 1000) - self.heartbeat.last_send)) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 517f66a..d6ad9e6 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -76,7 +76,10 @@ class ConsumerCoordinator(BaseCoordinator): True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True """ - super(ConsumerCoordinator, self).__init__(client, **configs) + super(ConsumerCoordinator, self).__init__(client, + metrics, metric_group_prefix, + **configs) + self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: @@ -107,8 +110,8 @@ class ConsumerCoordinator(BaseCoordinator): self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) self._auto_commit_task.reschedule() - self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, - self._subscription) + self.consumer_sensors = ConsumerCoordinatorMetrics( + metrics, metric_group_prefix, self._subscription) def __del__(self): if hasattr(self, '_cluster') and self._cluster: @@ -485,7 +488,7 @@ class ConsumerCoordinator(BaseCoordinator): def _handle_offset_commit_response(self, offsets, future, send_time, response): # TODO look at adding request_latency_ms to response (like java kafka) - self._sensors.commit_latency.record((time.time() - send_time) * 1000) + self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000) unauthorized_topics = set() for topic, partitions in response.topics: diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index 1cd9863..648cb1f 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -20,8 +20,8 @@ class Heartbeat(object): self.interval = self.config['heartbeat_interval_ms'] / 1000.0 self.timeout = self.config['session_timeout_ms'] / 1000.0 - self.last_send = 0 - self.last_receive = 0 + self.last_send = -1 * float('inf') + self.last_receive = -1 * float('inf') self.last_reset = time.time() def sent_heartbeat(self): |