diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-10 21:36:41 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-16 14:05:49 -0700 |
commit | c34d13879641d27cceb9403a4e6617152dfda0f3 (patch) | |
tree | 5e7208f13a59352097829fda76cf007e9aec2d53 /kafka/producer/sender.py | |
parent | 20f4c95289c694f81a60228a9820601eb57402f4 (diff) | |
download | kafka-python-c34d13879641d27cceb9403a4e6617152dfda0f3.tar.gz |
Add initial producer-sender metrics
Diffstat (limited to 'kafka/producer/sender.py')
-rw-r--r-- | kafka/producer/sender.py | 212 |
1 files changed, 209 insertions, 3 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 958e165..c1d0905 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import collections import copy @@ -8,9 +8,11 @@ import threading import six from .. import errors as Errors +from ..metrics.measurable import AnonMeasurable +from ..metrics.stats import Avg, Count, Max, Rate +from ..protocol.produce import ProduceRequest from ..structs import TopicPartition from ..version import __version__ -from ..protocol.produce import ProduceRequest log = logging.getLogger(__name__) @@ -31,7 +33,7 @@ class Sender(threading.Thread): 'api_version': (0, 8, 0), } - def __init__(self, client, metadata, accumulator, **configs): + def __init__(self, client, metadata, accumulator, metrics, **configs): super(Sender, self).__init__() self.config = copy.copy(self._DEFAULT_CONFIG) for key in self.config: @@ -45,6 +47,7 @@ class Sender(threading.Thread): self._running = True self._force_close = False self._topics_to_add = set() + self._sensors = SenderMetrics(metrics, self._client, self._metadata) def run(self): """The main run loop for the sender thread.""" @@ -119,7 +122,10 @@ class Sender(threading.Thread): expired_batches = self._accumulator.abort_expired_batches( self.config['request_timeout_ms'], self._metadata) + for expired_batch in expired_batches: + self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count) + self._sensors.update_produce_request_metrics(batches_by_node) requests = self._create_produce_requests(batches_by_node) # If we have any nodes that are ready to send + have sendable data, # poll with 0 timeout so this can immediately loop and try sending more @@ -223,6 +229,7 @@ class Sender(threading.Thread): self.config['retries'] - batch.attempts - 1, error) self._accumulator.reenqueue(batch) + self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) else: if error is Errors.TopicAuthorizationFailedError: error = error(batch.topic_partition.topic) @@ -230,6 +237,8 @@ class Sender(threading.Thread): # tell the user the result of their request batch.done(base_offset, timestamp_ms, error) self._accumulator.deallocate(batch) + if error is not None: + self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) if getattr(error, 'invalid_metadata', False): self._metadata.request_update() @@ -296,3 +305,200 @@ class Sender(threading.Thread): def wakeup(self): """Wake up the selector associated with this send thread.""" self._client.wakeup() + + +class SenderMetrics(object): + + def __init__(self, metrics, client, metadata): + self.metrics = metrics + self._client = client + self._metadata = metadata + + sensor_name = 'batch-size' + self.batch_size_sensor = self.metrics.sensor(sensor_name) + self.add_metric('batch-size-avg', Avg(), + sensor_name=sensor_name, + description='The average number of bytes sent per partition per-request.') + self.add_metric('batch-size-max', Max(), + sensor_name=sensor_name, + description='The max number of bytes sent per partition per-request.') + + sensor_name = 'compression-rate' + self.compression_rate_sensor = self.metrics.sensor(sensor_name) + self.add_metric('compression-rate-avg', Avg(), + sensor_name=sensor_name, + description='The average compression rate of record batches.') + + sensor_name = 'queue-time' + self.queue_time_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-queue-time-avg', Avg(), + sensor_name=sensor_name, + description='The average time in ms record batches spent in the record accumulator.') + self.add_metric('record-queue-time-max', Max(), + sensor_name=sensor_name, + description='The maximum time in ms record batches spent in the record accumulator.') + + sensor_name = 'request-time' + self.request_time_sensor = self.metrics.sensor(sensor_name) + self.add_metric('request-latency-avg', Avg(), + sensor_name=sensor_name, + description='The average request latency in ms') + self.add_metric('request-latency-max', Max(), + sensor_name=sensor_name, + description='The maximum request latency in ms') + + sensor_name = 'produce-throttle-time' + self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name) + self.add_metric('produce-throttle-time-avg', Avg(), + sensor_name=sensor_name, + description='The average throttle time in ms') + self.add_metric('produce-throttle-time-max', Max(), + sensor_name=sensor_name, + description='The maximum throttle time in ms') + + sensor_name = 'records-per-request' + self.records_per_request_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-send-rate', Rate(), + sensor_name=sensor_name, + description='The average number of records sent per second.') + self.add_metric('records-per-request-avg', Avg(), + sensor_name=sensor_name, + description='The average number of records per request.') + + sensor_name = 'bytes' + self.byte_rate_sensor = self.metrics.sensor(sensor_name) + self.add_metric('byte-rate', Rate(), + sensor_name=sensor_name, + description='The average number of bytes sent per second.') + + sensor_name = 'record-retries' + self.retry_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-retry-rate', Rate(), + sensor_name=sensor_name, + description='The average per-second number of retried record sends') + + sensor_name = 'errors' + self.error_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-error-rate', Rate(), + sensor_name=sensor_name, + description='The average per-second number of record sends that resulted in errors') + + sensor_name = 'record-size-max' + self.max_record_size_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-size-max', Max(), + sensor_name=sensor_name, + description='The maximum record size across all batches') + self.add_metric('record-size-avg', Avg(), + sensor_name=sensor_name, + description='The average maximum record size per batch') + + self.add_metric('requests-in-flight', + AnonMeasurable(lambda *_: self._client.in_flight_request_count()), + description='The current number of in-flight requests awaiting a response.') + + self.add_metric('metadata-age', + AnonMeasurable(lambda _, now: (now - self._metadata._last_successful_refresh_ms) / 1000), + description='The age in seconds of the current producer metadata being used.') + + def add_metric(self, metric_name, measurable, group_name='producer-metrics', + description=None, tags=None, + sensor_name=None): + m = self.metrics + metric = m.metric_name(metric_name, group_name, description, tags) + if sensor_name: + sensor = m.sensor(sensor_name) + sensor.add(metric, measurable) + else: + m.add_metric(metric, measurable) + + def maybe_register_topic_metrics(self, topic): + + def sensor_name(name): + return 'topic.{0}.{1}'.format(topic, name) + + # if one sensor of the metrics has been registered for the topic, + # then all other sensors should have been registered; and vice versa + if not self.metrics.get_sensor(sensor_name('records-per-batch')): + + self.add_metric('record-send-rate', Rate(), + sensor_name=sensor_name('records-per-batch'), + group_name='producer-topic-metrics.' + topic, + description= 'Records sent per second for topic ' + topic) + + self.add_metric('byte-rate', Rate(), + sensor_name=sensor_name('bytes'), + group_name='producer-topic-metrics.' + topic, + description='Bytes per second for topic ' + topic) + + self.add_metric('compression-rate', Avg(), + sensor_name=sensor_name('compression-rate'), + group_name='producer-topic-metrics.' + topic, + description='Average Compression ratio for topic ' + topic) + + self.add_metric('record-retry-rate', Rate(), + sensor_name=sensor_name('record-retries'), + group_name='producer-topic-metrics.' + topic, + description='Record retries per second for topic ' + topic) + + self.add_metric('record-error-rate', Rate(), + sensor_name=sensor_name('record-errors'), + group_name='producer-topic-metrics.' + topic, + description='Record errors per second for topic ' + topic) + + def update_produce_request_metrics(self, batches_map): + for node_batch in batches_map.values(): + records = 0 + total_bytes = 0 + for batch in node_batch: + # register all per-topic metrics at once + topic = batch.topic_partition.topic + self.maybe_register_topic_metrics(topic) + + # per-topic record send rate + topic_records_count = self.metrics.get_sensor( + 'topic.' + topic + '.records-per-batch') + topic_records_count.record(batch.record_count) + + # per-topic bytes send rate + topic_byte_rate = self.metrics.get_sensor( + 'topic.' + topic + '.bytes') + topic_byte_rate.record(batch.records.size_in_bytes()) + + # per-topic compression rate + topic_compression_rate = self.metrics.get_sensor( + 'topic.' + topic + '.compression-rate') + topic_compression_rate.record(batch.records.compression_rate()) + + # global metrics + self.batch_size_sensor.record(batch.records.size_in_bytes()) + if batch.drained: + self.queue_time_sensor.record(batch.drained - batch.created) + self.compression_rate_sensor.record(batch.records.compression_rate()) + self.max_record_size_sensor.record(batch.max_record_size) + records += batch.record_count + total_bytes += batch.records.size_in_bytes() + + self.records_per_request_sensor.record(records) + self.byte_rate_sensor.record(total_bytes) + + def record_retries(self, topic, count): + self.retry_sensor.record(count) + sensor = self.metrics.get_sensor('topic.' + topic + '.record-retries') + if sensor: + sensor.record(count) + + def record_errors(self, topic, count): + self.error_sensor.record(count) + sensor = self.metrics.get_sensor('topic.' + topic + '.record-errors') + if sensor: + sensor.record(count) + + def record_latency(self, latency, node=None): + self.request_time_sensor.record(latency) + if node: + sensor = self.metrics.get_sensor('node-' + node + '.latency') + if sensor: + sensor.record(latency) + + def record_throttle_time(self, throttle_time_ms, node=None): + self.produce_throttle_time_sensor.record(throttle_time_ms) |