summaryrefslogtreecommitdiff
path: root/kafka/producer/sender.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-10 21:36:41 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-16 14:05:49 -0700
commitc34d13879641d27cceb9403a4e6617152dfda0f3 (patch)
tree5e7208f13a59352097829fda76cf007e9aec2d53 /kafka/producer/sender.py
parent20f4c95289c694f81a60228a9820601eb57402f4 (diff)
downloadkafka-python-c34d13879641d27cceb9403a4e6617152dfda0f3.tar.gz
Add initial producer-sender metrics
Diffstat (limited to 'kafka/producer/sender.py')
-rw-r--r--kafka/producer/sender.py212
1 files changed, 209 insertions, 3 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 958e165..c1d0905 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import collections
import copy
@@ -8,9 +8,11 @@ import threading
import six
from .. import errors as Errors
+from ..metrics.measurable import AnonMeasurable
+from ..metrics.stats import Avg, Count, Max, Rate
+from ..protocol.produce import ProduceRequest
from ..structs import TopicPartition
from ..version import __version__
-from ..protocol.produce import ProduceRequest
log = logging.getLogger(__name__)
@@ -31,7 +33,7 @@ class Sender(threading.Thread):
'api_version': (0, 8, 0),
}
- def __init__(self, client, metadata, accumulator, **configs):
+ def __init__(self, client, metadata, accumulator, metrics, **configs):
super(Sender, self).__init__()
self.config = copy.copy(self._DEFAULT_CONFIG)
for key in self.config:
@@ -45,6 +47,7 @@ class Sender(threading.Thread):
self._running = True
self._force_close = False
self._topics_to_add = set()
+ self._sensors = SenderMetrics(metrics, self._client, self._metadata)
def run(self):
"""The main run loop for the sender thread."""
@@ -119,7 +122,10 @@ class Sender(threading.Thread):
expired_batches = self._accumulator.abort_expired_batches(
self.config['request_timeout_ms'], self._metadata)
+ for expired_batch in expired_batches:
+ self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count)
+ self._sensors.update_produce_request_metrics(batches_by_node)
requests = self._create_produce_requests(batches_by_node)
# If we have any nodes that are ready to send + have sendable data,
# poll with 0 timeout so this can immediately loop and try sending more
@@ -223,6 +229,7 @@ class Sender(threading.Thread):
self.config['retries'] - batch.attempts - 1,
error)
self._accumulator.reenqueue(batch)
+ self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
else:
if error is Errors.TopicAuthorizationFailedError:
error = error(batch.topic_partition.topic)
@@ -230,6 +237,8 @@ class Sender(threading.Thread):
# tell the user the result of their request
batch.done(base_offset, timestamp_ms, error)
self._accumulator.deallocate(batch)
+ if error is not None:
+ self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
if getattr(error, 'invalid_metadata', False):
self._metadata.request_update()
@@ -296,3 +305,200 @@ class Sender(threading.Thread):
def wakeup(self):
"""Wake up the selector associated with this send thread."""
self._client.wakeup()
+
+
+class SenderMetrics(object):
+
+ def __init__(self, metrics, client, metadata):
+ self.metrics = metrics
+ self._client = client
+ self._metadata = metadata
+
+ sensor_name = 'batch-size'
+ self.batch_size_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('batch-size-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average number of bytes sent per partition per-request.')
+ self.add_metric('batch-size-max', Max(),
+ sensor_name=sensor_name,
+ description='The max number of bytes sent per partition per-request.')
+
+ sensor_name = 'compression-rate'
+ self.compression_rate_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('compression-rate-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average compression rate of record batches.')
+
+ sensor_name = 'queue-time'
+ self.queue_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-queue-time-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average time in ms record batches spent in the record accumulator.')
+ self.add_metric('record-queue-time-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum time in ms record batches spent in the record accumulator.')
+
+ sensor_name = 'request-time'
+ self.request_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('request-latency-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average request latency in ms')
+ self.add_metric('request-latency-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum request latency in ms')
+
+ sensor_name = 'produce-throttle-time'
+ self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('produce-throttle-time-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average throttle time in ms')
+ self.add_metric('produce-throttle-time-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum throttle time in ms')
+
+ sensor_name = 'records-per-request'
+ self.records_per_request_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-send-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average number of records sent per second.')
+ self.add_metric('records-per-request-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average number of records per request.')
+
+ sensor_name = 'bytes'
+ self.byte_rate_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('byte-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average number of bytes sent per second.')
+
+ sensor_name = 'record-retries'
+ self.retry_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-retry-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average per-second number of retried record sends')
+
+ sensor_name = 'errors'
+ self.error_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-error-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average per-second number of record sends that resulted in errors')
+
+ sensor_name = 'record-size-max'
+ self.max_record_size_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-size-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum record size across all batches')
+ self.add_metric('record-size-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average maximum record size per batch')
+
+ self.add_metric('requests-in-flight',
+ AnonMeasurable(lambda *_: self._client.in_flight_request_count()),
+ description='The current number of in-flight requests awaiting a response.')
+
+ self.add_metric('metadata-age',
+ AnonMeasurable(lambda _, now: (now - self._metadata._last_successful_refresh_ms) / 1000),
+ description='The age in seconds of the current producer metadata being used.')
+
+ def add_metric(self, metric_name, measurable, group_name='producer-metrics',
+ description=None, tags=None,
+ sensor_name=None):
+ m = self.metrics
+ metric = m.metric_name(metric_name, group_name, description, tags)
+ if sensor_name:
+ sensor = m.sensor(sensor_name)
+ sensor.add(metric, measurable)
+ else:
+ m.add_metric(metric, measurable)
+
+ def maybe_register_topic_metrics(self, topic):
+
+ def sensor_name(name):
+ return 'topic.{0}.{1}'.format(topic, name)
+
+ # if one sensor of the metrics has been registered for the topic,
+ # then all other sensors should have been registered; and vice versa
+ if not self.metrics.get_sensor(sensor_name('records-per-batch')):
+
+ self.add_metric('record-send-rate', Rate(),
+ sensor_name=sensor_name('records-per-batch'),
+ group_name='producer-topic-metrics.' + topic,
+ description= 'Records sent per second for topic ' + topic)
+
+ self.add_metric('byte-rate', Rate(),
+ sensor_name=sensor_name('bytes'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Bytes per second for topic ' + topic)
+
+ self.add_metric('compression-rate', Avg(),
+ sensor_name=sensor_name('compression-rate'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Average Compression ratio for topic ' + topic)
+
+ self.add_metric('record-retry-rate', Rate(),
+ sensor_name=sensor_name('record-retries'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Record retries per second for topic ' + topic)
+
+ self.add_metric('record-error-rate', Rate(),
+ sensor_name=sensor_name('record-errors'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Record errors per second for topic ' + topic)
+
+ def update_produce_request_metrics(self, batches_map):
+ for node_batch in batches_map.values():
+ records = 0
+ total_bytes = 0
+ for batch in node_batch:
+ # register all per-topic metrics at once
+ topic = batch.topic_partition.topic
+ self.maybe_register_topic_metrics(topic)
+
+ # per-topic record send rate
+ topic_records_count = self.metrics.get_sensor(
+ 'topic.' + topic + '.records-per-batch')
+ topic_records_count.record(batch.record_count)
+
+ # per-topic bytes send rate
+ topic_byte_rate = self.metrics.get_sensor(
+ 'topic.' + topic + '.bytes')
+ topic_byte_rate.record(batch.records.size_in_bytes())
+
+ # per-topic compression rate
+ topic_compression_rate = self.metrics.get_sensor(
+ 'topic.' + topic + '.compression-rate')
+ topic_compression_rate.record(batch.records.compression_rate())
+
+ # global metrics
+ self.batch_size_sensor.record(batch.records.size_in_bytes())
+ if batch.drained:
+ self.queue_time_sensor.record(batch.drained - batch.created)
+ self.compression_rate_sensor.record(batch.records.compression_rate())
+ self.max_record_size_sensor.record(batch.max_record_size)
+ records += batch.record_count
+ total_bytes += batch.records.size_in_bytes()
+
+ self.records_per_request_sensor.record(records)
+ self.byte_rate_sensor.record(total_bytes)
+
+ def record_retries(self, topic, count):
+ self.retry_sensor.record(count)
+ sensor = self.metrics.get_sensor('topic.' + topic + '.record-retries')
+ if sensor:
+ sensor.record(count)
+
+ def record_errors(self, topic, count):
+ self.error_sensor.record(count)
+ sensor = self.metrics.get_sensor('topic.' + topic + '.record-errors')
+ if sensor:
+ sensor.record(count)
+
+ def record_latency(self, latency, node=None):
+ self.request_time_sensor.record(latency)
+ if node:
+ sensor = self.metrics.get_sensor('node-' + node + '.latency')
+ if sensor:
+ sensor.record(latency)
+
+ def record_throttle_time(self, throttle_time_ms, node=None):
+ self.produce_throttle_time_sensor.record(throttle_time_ms)