diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-10 21:36:41 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-16 14:05:49 -0700 |
commit | c34d13879641d27cceb9403a4e6617152dfda0f3 (patch) | |
tree | 5e7208f13a59352097829fda76cf007e9aec2d53 | |
parent | 20f4c95289c694f81a60228a9820601eb57402f4 (diff) | |
download | kafka-python-c34d13879641d27cceb9403a4e6617152dfda0f3.tar.gz |
Add initial producer-sender metrics
-rw-r--r-- | kafka/producer/buffer.py | 20 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 25 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 6 | ||||
-rw-r--r-- | kafka/producer/sender.py | 212 | ||||
-rw-r--r-- | test/test_sender.py | 16 |
5 files changed, 261 insertions, 18 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 5fcb35f..de5f0e7 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import collections import io @@ -55,6 +55,8 @@ class MessageSetBuffer(object): self._batch_size = batch_size self._closed = False self._messages = 0 + self._bytes_written = 4 # Int32 header is 4 bytes + self._final_size = None def append(self, offset, message): """Apend a Message to the MessageSet. @@ -62,6 +64,8 @@ class MessageSetBuffer(object): Arguments: offset (int): offset of the message message (Message or bytes): message struct or encoded bytes + + Returns: bytes written """ if isinstance(message, Message): encoded = message.encode() @@ -70,6 +74,8 @@ class MessageSetBuffer(object): msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded self._buffer.write(msg) self._messages += 1 + self._bytes_written += len(msg) + return len(msg) def has_room_for(self, key, value): if self._closed: @@ -107,16 +113,20 @@ class MessageSetBuffer(object): self._buffer.write(Int32.encode(len(encoded))) self._buffer.write(encoded) - # Update the message set size, and return ready for full read() - size = self._buffer.tell() - 4 + # Update the message set size (less the 4 byte header), + # and return with buffer ready for full read() + self._final_size = self._buffer.tell() self._buffer.seek(0) - self._buffer.write(Int32.encode(size)) + self._buffer.write(Int32.encode(self._final_size - 4)) self._buffer.seek(0) self._closed = True def size_in_bytes(self): - return self._buffer.tell() + return self._final_size or self._buffer.tell() + + def compression_rate(self): + return self.size_in_bytes() / self._bytes_written def buffer(self): return self._buffer diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f5c5d19..61cdc8b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -9,6 +9,7 @@ import weakref from .. import errors as Errors from ..client_async import KafkaClient +from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet from ..structs import TopicPartition @@ -220,6 +221,13 @@ class KafkaProducer(object): api_version_auto_timeout_ms (int): number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version set to 'auto' + metric_reporters (list): A list of classes to use as metrics reporters. + Implementing the AbstractMetricsReporter interface allows plugging + in classes that will be notified of new metric creation. Default: [] + metrics_num_samples (int): The number of samples maintained to compute + metrics. Default: 2 + metrics_sample_window_ms (int): The maximum age in milliseconds of + samples used to compute metrics. Default: 30000 Note: Configuration parameters are described in more detail at @@ -255,7 +263,10 @@ class KafkaProducer(object): 'ssl_keyfile': None, 'ssl_crlfile': None, 'api_version': None, - 'api_version_auto_timeout_ms': 2000 + 'api_version_auto_timeout_ms': 2000, + 'metric_reporters': [], + 'metrics_num_samples': 2, + 'metrics_sample_window_ms': 30000, } def __init__(self, **configs): @@ -285,6 +296,14 @@ class KafkaProducer(object): log.warning('use api_version=%s (%s is deprecated)', str(self.config['api_version']), deprecated) + # Configure metrics + metrics_tags = {'client-id': self.config['client_id']} + metric_config = MetricConfig(samples=self.config['metrics_num_samples'], + time_window_ms=self.config['metrics_sample_window_ms'], + tags=metrics_tags) + reporters = [reporter() for reporter in self.config['metric_reporters']] + self._metrics = Metrics(metric_config, reporters) + client = KafkaClient(**self.config) # Get auto-discovered version from client if necessary @@ -298,7 +317,8 @@ class KafkaProducer(object): self._accumulator = RecordAccumulator(message_version=message_version, **self.config) self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) - self._sender = Sender(client, self._metadata, self._accumulator, + self._sender = Sender(client, self._metadata, + self._accumulator, self._metrics, guarantee_message_order=guarantee_message_order, **self.config) self._sender.daemon = True @@ -382,6 +402,7 @@ class KafkaProducer(object): if not invoked_from_callback: self._sender.join() + self._metrics.close() try: self.config['key_serializer'].close() except AttributeError: diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 566bf6f..7ea579a 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -38,7 +38,7 @@ class AtomicInteger(object): class RecordBatch(object): def __init__(self, tp, records, message_version=0): self.record_count = 0 - #self.max_record_size = 0 # for metrics only + self.max_record_size = 0 now = time.time() self.created = now self.drained = None @@ -56,8 +56,8 @@ class RecordBatch(object): return None msg = Message(value, key=key, magic=self.message_version) - self.records.append(self.record_count, msg) - # self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only + record_size = self.records.append(self.record_count, msg) + self.max_record_size = max(self.max_record_size, record_size) self.last_append = time.time() future = FutureRecordMetadata(self.produce_future, self.record_count, timestamp_ms) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 958e165..c1d0905 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import collections import copy @@ -8,9 +8,11 @@ import threading import six from .. import errors as Errors +from ..metrics.measurable import AnonMeasurable +from ..metrics.stats import Avg, Count, Max, Rate +from ..protocol.produce import ProduceRequest from ..structs import TopicPartition from ..version import __version__ -from ..protocol.produce import ProduceRequest log = logging.getLogger(__name__) @@ -31,7 +33,7 @@ class Sender(threading.Thread): 'api_version': (0, 8, 0), } - def __init__(self, client, metadata, accumulator, **configs): + def __init__(self, client, metadata, accumulator, metrics, **configs): super(Sender, self).__init__() self.config = copy.copy(self._DEFAULT_CONFIG) for key in self.config: @@ -45,6 +47,7 @@ class Sender(threading.Thread): self._running = True self._force_close = False self._topics_to_add = set() + self._sensors = SenderMetrics(metrics, self._client, self._metadata) def run(self): """The main run loop for the sender thread.""" @@ -119,7 +122,10 @@ class Sender(threading.Thread): expired_batches = self._accumulator.abort_expired_batches( self.config['request_timeout_ms'], self._metadata) + for expired_batch in expired_batches: + self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count) + self._sensors.update_produce_request_metrics(batches_by_node) requests = self._create_produce_requests(batches_by_node) # If we have any nodes that are ready to send + have sendable data, # poll with 0 timeout so this can immediately loop and try sending more @@ -223,6 +229,7 @@ class Sender(threading.Thread): self.config['retries'] - batch.attempts - 1, error) self._accumulator.reenqueue(batch) + self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) else: if error is Errors.TopicAuthorizationFailedError: error = error(batch.topic_partition.topic) @@ -230,6 +237,8 @@ class Sender(threading.Thread): # tell the user the result of their request batch.done(base_offset, timestamp_ms, error) self._accumulator.deallocate(batch) + if error is not None: + self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) if getattr(error, 'invalid_metadata', False): self._metadata.request_update() @@ -296,3 +305,200 @@ class Sender(threading.Thread): def wakeup(self): """Wake up the selector associated with this send thread.""" self._client.wakeup() + + +class SenderMetrics(object): + + def __init__(self, metrics, client, metadata): + self.metrics = metrics + self._client = client + self._metadata = metadata + + sensor_name = 'batch-size' + self.batch_size_sensor = self.metrics.sensor(sensor_name) + self.add_metric('batch-size-avg', Avg(), + sensor_name=sensor_name, + description='The average number of bytes sent per partition per-request.') + self.add_metric('batch-size-max', Max(), + sensor_name=sensor_name, + description='The max number of bytes sent per partition per-request.') + + sensor_name = 'compression-rate' + self.compression_rate_sensor = self.metrics.sensor(sensor_name) + self.add_metric('compression-rate-avg', Avg(), + sensor_name=sensor_name, + description='The average compression rate of record batches.') + + sensor_name = 'queue-time' + self.queue_time_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-queue-time-avg', Avg(), + sensor_name=sensor_name, + description='The average time in ms record batches spent in the record accumulator.') + self.add_metric('record-queue-time-max', Max(), + sensor_name=sensor_name, + description='The maximum time in ms record batches spent in the record accumulator.') + + sensor_name = 'request-time' + self.request_time_sensor = self.metrics.sensor(sensor_name) + self.add_metric('request-latency-avg', Avg(), + sensor_name=sensor_name, + description='The average request latency in ms') + self.add_metric('request-latency-max', Max(), + sensor_name=sensor_name, + description='The maximum request latency in ms') + + sensor_name = 'produce-throttle-time' + self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name) + self.add_metric('produce-throttle-time-avg', Avg(), + sensor_name=sensor_name, + description='The average throttle time in ms') + self.add_metric('produce-throttle-time-max', Max(), + sensor_name=sensor_name, + description='The maximum throttle time in ms') + + sensor_name = 'records-per-request' + self.records_per_request_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-send-rate', Rate(), + sensor_name=sensor_name, + description='The average number of records sent per second.') + self.add_metric('records-per-request-avg', Avg(), + sensor_name=sensor_name, + description='The average number of records per request.') + + sensor_name = 'bytes' + self.byte_rate_sensor = self.metrics.sensor(sensor_name) + self.add_metric('byte-rate', Rate(), + sensor_name=sensor_name, + description='The average number of bytes sent per second.') + + sensor_name = 'record-retries' + self.retry_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-retry-rate', Rate(), + sensor_name=sensor_name, + description='The average per-second number of retried record sends') + + sensor_name = 'errors' + self.error_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-error-rate', Rate(), + sensor_name=sensor_name, + description='The average per-second number of record sends that resulted in errors') + + sensor_name = 'record-size-max' + self.max_record_size_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-size-max', Max(), + sensor_name=sensor_name, + description='The maximum record size across all batches') + self.add_metric('record-size-avg', Avg(), + sensor_name=sensor_name, + description='The average maximum record size per batch') + + self.add_metric('requests-in-flight', + AnonMeasurable(lambda *_: self._client.in_flight_request_count()), + description='The current number of in-flight requests awaiting a response.') + + self.add_metric('metadata-age', + AnonMeasurable(lambda _, now: (now - self._metadata._last_successful_refresh_ms) / 1000), + description='The age in seconds of the current producer metadata being used.') + + def add_metric(self, metric_name, measurable, group_name='producer-metrics', + description=None, tags=None, + sensor_name=None): + m = self.metrics + metric = m.metric_name(metric_name, group_name, description, tags) + if sensor_name: + sensor = m.sensor(sensor_name) + sensor.add(metric, measurable) + else: + m.add_metric(metric, measurable) + + def maybe_register_topic_metrics(self, topic): + + def sensor_name(name): + return 'topic.{0}.{1}'.format(topic, name) + + # if one sensor of the metrics has been registered for the topic, + # then all other sensors should have been registered; and vice versa + if not self.metrics.get_sensor(sensor_name('records-per-batch')): + + self.add_metric('record-send-rate', Rate(), + sensor_name=sensor_name('records-per-batch'), + group_name='producer-topic-metrics.' + topic, + description= 'Records sent per second for topic ' + topic) + + self.add_metric('byte-rate', Rate(), + sensor_name=sensor_name('bytes'), + group_name='producer-topic-metrics.' + topic, + description='Bytes per second for topic ' + topic) + + self.add_metric('compression-rate', Avg(), + sensor_name=sensor_name('compression-rate'), + group_name='producer-topic-metrics.' + topic, + description='Average Compression ratio for topic ' + topic) + + self.add_metric('record-retry-rate', Rate(), + sensor_name=sensor_name('record-retries'), + group_name='producer-topic-metrics.' + topic, + description='Record retries per second for topic ' + topic) + + self.add_metric('record-error-rate', Rate(), + sensor_name=sensor_name('record-errors'), + group_name='producer-topic-metrics.' + topic, + description='Record errors per second for topic ' + topic) + + def update_produce_request_metrics(self, batches_map): + for node_batch in batches_map.values(): + records = 0 + total_bytes = 0 + for batch in node_batch: + # register all per-topic metrics at once + topic = batch.topic_partition.topic + self.maybe_register_topic_metrics(topic) + + # per-topic record send rate + topic_records_count = self.metrics.get_sensor( + 'topic.' + topic + '.records-per-batch') + topic_records_count.record(batch.record_count) + + # per-topic bytes send rate + topic_byte_rate = self.metrics.get_sensor( + 'topic.' + topic + '.bytes') + topic_byte_rate.record(batch.records.size_in_bytes()) + + # per-topic compression rate + topic_compression_rate = self.metrics.get_sensor( + 'topic.' + topic + '.compression-rate') + topic_compression_rate.record(batch.records.compression_rate()) + + # global metrics + self.batch_size_sensor.record(batch.records.size_in_bytes()) + if batch.drained: + self.queue_time_sensor.record(batch.drained - batch.created) + self.compression_rate_sensor.record(batch.records.compression_rate()) + self.max_record_size_sensor.record(batch.max_record_size) + records += batch.record_count + total_bytes += batch.records.size_in_bytes() + + self.records_per_request_sensor.record(records) + self.byte_rate_sensor.record(total_bytes) + + def record_retries(self, topic, count): + self.retry_sensor.record(count) + sensor = self.metrics.get_sensor('topic.' + topic + '.record-retries') + if sensor: + sensor.record(count) + + def record_errors(self, topic, count): + self.error_sensor.record(count) + sensor = self.metrics.get_sensor('topic.' + topic + '.record-errors') + if sensor: + sensor.record(count) + + def record_latency(self, latency, node=None): + self.request_time_sensor.record(latency) + if node: + sensor = self.metrics.get_sensor('node-' + node + '.latency') + if sensor: + sensor.record(latency) + + def record_throttle_time(self, throttle_time_ms, node=None): + self.produce_throttle_time_sensor.record(throttle_time_ms) diff --git a/test/test_sender.py b/test/test_sender.py index 44105e2..cf911e1 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -7,12 +7,13 @@ import pytest from kafka.client_async import KafkaClient from kafka.cluster import ClusterMetadata -from kafka.producer.buffer import MessageSetBuffer -from kafka.producer.sender import Sender -from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch import kafka.errors as Errors from kafka.future import Future +from kafka.metrics import Metrics +from kafka.producer.buffer import MessageSetBuffer from kafka.protocol.produce import ProduceRequest +from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch +from kafka.producer.sender import Sender from kafka.structs import TopicPartition, OffsetAndMetadata @@ -29,8 +30,13 @@ def accumulator(): @pytest.fixture -def sender(client, accumulator): - return Sender(client, client.cluster, accumulator) +def metrics(): + return Metrics() + + +@pytest.fixture +def sender(client, accumulator, metrics): + return Sender(client, client.cluster, accumulator, metrics) @pytest.mark.parametrize(("api_version", "produce_version"), [ |