summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-10 21:36:41 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-16 14:05:49 -0700
commitc34d13879641d27cceb9403a4e6617152dfda0f3 (patch)
tree5e7208f13a59352097829fda76cf007e9aec2d53
parent20f4c95289c694f81a60228a9820601eb57402f4 (diff)
downloadkafka-python-c34d13879641d27cceb9403a4e6617152dfda0f3.tar.gz
Add initial producer-sender metrics
-rw-r--r--kafka/producer/buffer.py20
-rw-r--r--kafka/producer/kafka.py25
-rw-r--r--kafka/producer/record_accumulator.py6
-rw-r--r--kafka/producer/sender.py212
-rw-r--r--test/test_sender.py16
5 files changed, 261 insertions, 18 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 5fcb35f..de5f0e7 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import collections
import io
@@ -55,6 +55,8 @@ class MessageSetBuffer(object):
self._batch_size = batch_size
self._closed = False
self._messages = 0
+ self._bytes_written = 4 # Int32 header is 4 bytes
+ self._final_size = None
def append(self, offset, message):
"""Apend a Message to the MessageSet.
@@ -62,6 +64,8 @@ class MessageSetBuffer(object):
Arguments:
offset (int): offset of the message
message (Message or bytes): message struct or encoded bytes
+
+ Returns: bytes written
"""
if isinstance(message, Message):
encoded = message.encode()
@@ -70,6 +74,8 @@ class MessageSetBuffer(object):
msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
self._buffer.write(msg)
self._messages += 1
+ self._bytes_written += len(msg)
+ return len(msg)
def has_room_for(self, key, value):
if self._closed:
@@ -107,16 +113,20 @@ class MessageSetBuffer(object):
self._buffer.write(Int32.encode(len(encoded)))
self._buffer.write(encoded)
- # Update the message set size, and return ready for full read()
- size = self._buffer.tell() - 4
+ # Update the message set size (less the 4 byte header),
+ # and return with buffer ready for full read()
+ self._final_size = self._buffer.tell()
self._buffer.seek(0)
- self._buffer.write(Int32.encode(size))
+ self._buffer.write(Int32.encode(self._final_size - 4))
self._buffer.seek(0)
self._closed = True
def size_in_bytes(self):
- return self._buffer.tell()
+ return self._final_size or self._buffer.tell()
+
+ def compression_rate(self):
+ return self.size_in_bytes() / self._bytes_written
def buffer(self):
return self._buffer
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index f5c5d19..61cdc8b 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -9,6 +9,7 @@ import weakref
from .. import errors as Errors
from ..client_async import KafkaClient
+from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
from ..structs import TopicPartition
@@ -220,6 +221,13 @@ class KafkaProducer(object):
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker
api version. Only applies if api_version set to 'auto'
+ metric_reporters (list): A list of classes to use as metrics reporters.
+ Implementing the AbstractMetricsReporter interface allows plugging
+ in classes that will be notified of new metric creation. Default: []
+ metrics_num_samples (int): The number of samples maintained to compute
+ metrics. Default: 2
+ metrics_sample_window_ms (int): The maximum age in milliseconds of
+ samples used to compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@@ -255,7 +263,10 @@ class KafkaProducer(object):
'ssl_keyfile': None,
'ssl_crlfile': None,
'api_version': None,
- 'api_version_auto_timeout_ms': 2000
+ 'api_version_auto_timeout_ms': 2000,
+ 'metric_reporters': [],
+ 'metrics_num_samples': 2,
+ 'metrics_sample_window_ms': 30000,
}
def __init__(self, **configs):
@@ -285,6 +296,14 @@ class KafkaProducer(object):
log.warning('use api_version=%s (%s is deprecated)',
str(self.config['api_version']), deprecated)
+ # Configure metrics
+ metrics_tags = {'client-id': self.config['client_id']}
+ metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
+ time_window_ms=self.config['metrics_sample_window_ms'],
+ tags=metrics_tags)
+ reporters = [reporter() for reporter in self.config['metric_reporters']]
+ self._metrics = Metrics(metric_config, reporters)
+
client = KafkaClient(**self.config)
# Get auto-discovered version from client if necessary
@@ -298,7 +317,8 @@ class KafkaProducer(object):
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
- self._sender = Sender(client, self._metadata, self._accumulator,
+ self._sender = Sender(client, self._metadata,
+ self._accumulator, self._metrics,
guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
@@ -382,6 +402,7 @@ class KafkaProducer(object):
if not invoked_from_callback:
self._sender.join()
+ self._metrics.close()
try:
self.config['key_serializer'].close()
except AttributeError:
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 566bf6f..7ea579a 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -38,7 +38,7 @@ class AtomicInteger(object):
class RecordBatch(object):
def __init__(self, tp, records, message_version=0):
self.record_count = 0
- #self.max_record_size = 0 # for metrics only
+ self.max_record_size = 0
now = time.time()
self.created = now
self.drained = None
@@ -56,8 +56,8 @@ class RecordBatch(object):
return None
msg = Message(value, key=key, magic=self.message_version)
- self.records.append(self.record_count, msg)
- # self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only
+ record_size = self.records.append(self.record_count, msg)
+ self.max_record_size = max(self.max_record_size, record_size)
self.last_append = time.time()
future = FutureRecordMetadata(self.produce_future, self.record_count,
timestamp_ms)
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 958e165..c1d0905 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import collections
import copy
@@ -8,9 +8,11 @@ import threading
import six
from .. import errors as Errors
+from ..metrics.measurable import AnonMeasurable
+from ..metrics.stats import Avg, Count, Max, Rate
+from ..protocol.produce import ProduceRequest
from ..structs import TopicPartition
from ..version import __version__
-from ..protocol.produce import ProduceRequest
log = logging.getLogger(__name__)
@@ -31,7 +33,7 @@ class Sender(threading.Thread):
'api_version': (0, 8, 0),
}
- def __init__(self, client, metadata, accumulator, **configs):
+ def __init__(self, client, metadata, accumulator, metrics, **configs):
super(Sender, self).__init__()
self.config = copy.copy(self._DEFAULT_CONFIG)
for key in self.config:
@@ -45,6 +47,7 @@ class Sender(threading.Thread):
self._running = True
self._force_close = False
self._topics_to_add = set()
+ self._sensors = SenderMetrics(metrics, self._client, self._metadata)
def run(self):
"""The main run loop for the sender thread."""
@@ -119,7 +122,10 @@ class Sender(threading.Thread):
expired_batches = self._accumulator.abort_expired_batches(
self.config['request_timeout_ms'], self._metadata)
+ for expired_batch in expired_batches:
+ self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count)
+ self._sensors.update_produce_request_metrics(batches_by_node)
requests = self._create_produce_requests(batches_by_node)
# If we have any nodes that are ready to send + have sendable data,
# poll with 0 timeout so this can immediately loop and try sending more
@@ -223,6 +229,7 @@ class Sender(threading.Thread):
self.config['retries'] - batch.attempts - 1,
error)
self._accumulator.reenqueue(batch)
+ self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
else:
if error is Errors.TopicAuthorizationFailedError:
error = error(batch.topic_partition.topic)
@@ -230,6 +237,8 @@ class Sender(threading.Thread):
# tell the user the result of their request
batch.done(base_offset, timestamp_ms, error)
self._accumulator.deallocate(batch)
+ if error is not None:
+ self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
if getattr(error, 'invalid_metadata', False):
self._metadata.request_update()
@@ -296,3 +305,200 @@ class Sender(threading.Thread):
def wakeup(self):
"""Wake up the selector associated with this send thread."""
self._client.wakeup()
+
+
+class SenderMetrics(object):
+
+ def __init__(self, metrics, client, metadata):
+ self.metrics = metrics
+ self._client = client
+ self._metadata = metadata
+
+ sensor_name = 'batch-size'
+ self.batch_size_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('batch-size-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average number of bytes sent per partition per-request.')
+ self.add_metric('batch-size-max', Max(),
+ sensor_name=sensor_name,
+ description='The max number of bytes sent per partition per-request.')
+
+ sensor_name = 'compression-rate'
+ self.compression_rate_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('compression-rate-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average compression rate of record batches.')
+
+ sensor_name = 'queue-time'
+ self.queue_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-queue-time-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average time in ms record batches spent in the record accumulator.')
+ self.add_metric('record-queue-time-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum time in ms record batches spent in the record accumulator.')
+
+ sensor_name = 'request-time'
+ self.request_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('request-latency-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average request latency in ms')
+ self.add_metric('request-latency-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum request latency in ms')
+
+ sensor_name = 'produce-throttle-time'
+ self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('produce-throttle-time-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average throttle time in ms')
+ self.add_metric('produce-throttle-time-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum throttle time in ms')
+
+ sensor_name = 'records-per-request'
+ self.records_per_request_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-send-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average number of records sent per second.')
+ self.add_metric('records-per-request-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average number of records per request.')
+
+ sensor_name = 'bytes'
+ self.byte_rate_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('byte-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average number of bytes sent per second.')
+
+ sensor_name = 'record-retries'
+ self.retry_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-retry-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average per-second number of retried record sends')
+
+ sensor_name = 'errors'
+ self.error_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-error-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average per-second number of record sends that resulted in errors')
+
+ sensor_name = 'record-size-max'
+ self.max_record_size_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-size-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum record size across all batches')
+ self.add_metric('record-size-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average maximum record size per batch')
+
+ self.add_metric('requests-in-flight',
+ AnonMeasurable(lambda *_: self._client.in_flight_request_count()),
+ description='The current number of in-flight requests awaiting a response.')
+
+ self.add_metric('metadata-age',
+ AnonMeasurable(lambda _, now: (now - self._metadata._last_successful_refresh_ms) / 1000),
+ description='The age in seconds of the current producer metadata being used.')
+
+ def add_metric(self, metric_name, measurable, group_name='producer-metrics',
+ description=None, tags=None,
+ sensor_name=None):
+ m = self.metrics
+ metric = m.metric_name(metric_name, group_name, description, tags)
+ if sensor_name:
+ sensor = m.sensor(sensor_name)
+ sensor.add(metric, measurable)
+ else:
+ m.add_metric(metric, measurable)
+
+ def maybe_register_topic_metrics(self, topic):
+
+ def sensor_name(name):
+ return 'topic.{0}.{1}'.format(topic, name)
+
+ # if one sensor of the metrics has been registered for the topic,
+ # then all other sensors should have been registered; and vice versa
+ if not self.metrics.get_sensor(sensor_name('records-per-batch')):
+
+ self.add_metric('record-send-rate', Rate(),
+ sensor_name=sensor_name('records-per-batch'),
+ group_name='producer-topic-metrics.' + topic,
+ description= 'Records sent per second for topic ' + topic)
+
+ self.add_metric('byte-rate', Rate(),
+ sensor_name=sensor_name('bytes'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Bytes per second for topic ' + topic)
+
+ self.add_metric('compression-rate', Avg(),
+ sensor_name=sensor_name('compression-rate'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Average Compression ratio for topic ' + topic)
+
+ self.add_metric('record-retry-rate', Rate(),
+ sensor_name=sensor_name('record-retries'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Record retries per second for topic ' + topic)
+
+ self.add_metric('record-error-rate', Rate(),
+ sensor_name=sensor_name('record-errors'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Record errors per second for topic ' + topic)
+
+ def update_produce_request_metrics(self, batches_map):
+ for node_batch in batches_map.values():
+ records = 0
+ total_bytes = 0
+ for batch in node_batch:
+ # register all per-topic metrics at once
+ topic = batch.topic_partition.topic
+ self.maybe_register_topic_metrics(topic)
+
+ # per-topic record send rate
+ topic_records_count = self.metrics.get_sensor(
+ 'topic.' + topic + '.records-per-batch')
+ topic_records_count.record(batch.record_count)
+
+ # per-topic bytes send rate
+ topic_byte_rate = self.metrics.get_sensor(
+ 'topic.' + topic + '.bytes')
+ topic_byte_rate.record(batch.records.size_in_bytes())
+
+ # per-topic compression rate
+ topic_compression_rate = self.metrics.get_sensor(
+ 'topic.' + topic + '.compression-rate')
+ topic_compression_rate.record(batch.records.compression_rate())
+
+ # global metrics
+ self.batch_size_sensor.record(batch.records.size_in_bytes())
+ if batch.drained:
+ self.queue_time_sensor.record(batch.drained - batch.created)
+ self.compression_rate_sensor.record(batch.records.compression_rate())
+ self.max_record_size_sensor.record(batch.max_record_size)
+ records += batch.record_count
+ total_bytes += batch.records.size_in_bytes()
+
+ self.records_per_request_sensor.record(records)
+ self.byte_rate_sensor.record(total_bytes)
+
+ def record_retries(self, topic, count):
+ self.retry_sensor.record(count)
+ sensor = self.metrics.get_sensor('topic.' + topic + '.record-retries')
+ if sensor:
+ sensor.record(count)
+
+ def record_errors(self, topic, count):
+ self.error_sensor.record(count)
+ sensor = self.metrics.get_sensor('topic.' + topic + '.record-errors')
+ if sensor:
+ sensor.record(count)
+
+ def record_latency(self, latency, node=None):
+ self.request_time_sensor.record(latency)
+ if node:
+ sensor = self.metrics.get_sensor('node-' + node + '.latency')
+ if sensor:
+ sensor.record(latency)
+
+ def record_throttle_time(self, throttle_time_ms, node=None):
+ self.produce_throttle_time_sensor.record(throttle_time_ms)
diff --git a/test/test_sender.py b/test/test_sender.py
index 44105e2..cf911e1 100644
--- a/test/test_sender.py
+++ b/test/test_sender.py
@@ -7,12 +7,13 @@ import pytest
from kafka.client_async import KafkaClient
from kafka.cluster import ClusterMetadata
-from kafka.producer.buffer import MessageSetBuffer
-from kafka.producer.sender import Sender
-from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
import kafka.errors as Errors
from kafka.future import Future
+from kafka.metrics import Metrics
+from kafka.producer.buffer import MessageSetBuffer
from kafka.protocol.produce import ProduceRequest
+from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
+from kafka.producer.sender import Sender
from kafka.structs import TopicPartition, OffsetAndMetadata
@@ -29,8 +30,13 @@ def accumulator():
@pytest.fixture
-def sender(client, accumulator):
- return Sender(client, client.cluster, accumulator)
+def metrics():
+ return Metrics()
+
+
+@pytest.fixture
+def sender(client, accumulator, metrics):
+ return Sender(client, client.cluster, accumulator, metrics)
@pytest.mark.parametrize(("api_version", "produce_version"), [