summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 20:47:15 -0700
committerGitHub <noreply@github.com>2016-07-16 20:47:15 -0700
commit947625bfa4b6462e3f7c0fdad0a0cd69708beb2c (patch)
treeaeae9decba9e1eba0827bcc5dc97c3b85d6f358b /kafka
parent3666b66a21776d620f68d2f7ff2fed1bc18b94e5 (diff)
parent7a2ec3332b0a83dcaaab4a402db13ed9d56d89e8 (diff)
downloadkafka-python-947625bfa4b6462e3f7c0fdad0a0cd69708beb2c.tar.gz
Merge pull request #754 from dpkp/benchmarks
Producer metrics + consumer/producer benchmark scripts
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/fetcher.py8
-rw-r--r--kafka/consumer/group.py22
-rw-r--r--kafka/metrics/stats/sensor.py6
-rw-r--r--kafka/producer/buffer.py20
-rw-r--r--kafka/producer/kafka.py40
-rw-r--r--kafka/producer/record_accumulator.py6
-rw-r--r--kafka/producer/sender.py212
7 files changed, 292 insertions, 22 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 34ff4cb..d615848 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -729,6 +729,8 @@ class Fetcher(six.Iterator):
else:
raise error_type('Unexpected error while fetching data')
+ # Because we are currently decompressing messages lazily, the sensors here
+ # will get compressed bytes / message set stats when compression is enabled
self._sensors.bytes_fetched.record(total_bytes)
self._sensors.records_fetched.record(total_count)
if response.API_VERSION >= 1:
@@ -774,12 +776,12 @@ class FetchManagerMetrics(object):
'The maximum throttle time in ms'), Max())
def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
- metric_tags = {'topic': topic.replace('.', '_')}
-
# record bytes fetched
name = '.'.join(['topic', topic, 'bytes-fetched'])
bytes_fetched = self.metrics.get_sensor(name)
if not bytes_fetched:
+ metric_tags = {'topic': topic.replace('.', '_')}
+
bytes_fetched = self.metrics.sensor(name)
bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
self.group_name,
@@ -799,6 +801,8 @@ class FetchManagerMetrics(object):
name = '.'.join(['topic', topic, 'records-fetched'])
records_fetched = self.metrics.get_sensor(name)
if not records_fetched:
+ metric_tags = {'topic': topic.replace('.', '_')}
+
records_fetched = self.metrics.sensor(name)
records_fetched.add(self.metrics.metric_name('records-per-request-avg',
self.group_name,
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 8fa43bc..982cd7b 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -12,7 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
-from kafka.metrics import DictReporter, MetricConfig, Metrics
+from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import TopicPartition
from kafka.version import __version__
@@ -171,8 +171,8 @@ class KafkaConsumer(six.Iterator):
in classes that will be notified of new metric creation. Default: []
metrics_num_samples (int): The number of samples maintained to compute
metrics. Default: 2
- metrics_sample_window_ms (int): The number of samples maintained to
- compute metrics. Default: 30000
+ metrics_sample_window_ms (int): The maximum age in milliseconds of
+ samples used to compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@@ -241,7 +241,6 @@ class KafkaConsumer(six.Iterator):
time_window_ms=self.config['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
- reporters.append(DictReporter('kafka.consumer'))
self._metrics = Metrics(metric_config, reporters)
metric_group_prefix = 'consumer'
# TODO _metrics likely needs to be passed to KafkaClient, etc.
@@ -760,6 +759,21 @@ class KafkaConsumer(six.Iterator):
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
+ def metrics(self, raw=False):
+ """Warning: this is an unstable interface.
+ It may change in future releases without warning"""
+ if raw:
+ return self._metrics.metrics
+
+ metrics = {}
+ for k, v in self._metrics.metrics.items():
+ if k.group not in metrics:
+ metrics[k.group] = {}
+ if k.name not in metrics[k.group]:
+ metrics[k.group][k.name] = {}
+ metrics[k.group][k.name] = v.value()
+ return metrics
+
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):
diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py
index b0bf4db..72bacfc 100644
--- a/kafka/metrics/stats/sensor.py
+++ b/kafka/metrics/stats/sensor.py
@@ -55,15 +55,15 @@ class Sensor(object):
Record a value at a known time.
Arguments:
value (double): The value we are recording
- time_ms (int): The current POSIX time in milliseconds
+ time_ms (int): A POSIX timestamp in milliseconds.
+ Default: The time when record() is evaluated (now)
Raises:
QuotaViolationException: if recording this value moves a
metric beyond its configured maximum or minimum bound
"""
- now = time.time() * 1000
if time_ms is None:
- time_ms = now
+ time_ms = time.time() * 1000
self._last_record_time = time_ms
with self._lock: # XXX high volume, might be performance issue
# increment all the stats
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 5fcb35f..de5f0e7 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import collections
import io
@@ -55,6 +55,8 @@ class MessageSetBuffer(object):
self._batch_size = batch_size
self._closed = False
self._messages = 0
+ self._bytes_written = 4 # Int32 header is 4 bytes
+ self._final_size = None
def append(self, offset, message):
"""Apend a Message to the MessageSet.
@@ -62,6 +64,8 @@ class MessageSetBuffer(object):
Arguments:
offset (int): offset of the message
message (Message or bytes): message struct or encoded bytes
+
+ Returns: bytes written
"""
if isinstance(message, Message):
encoded = message.encode()
@@ -70,6 +74,8 @@ class MessageSetBuffer(object):
msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
self._buffer.write(msg)
self._messages += 1
+ self._bytes_written += len(msg)
+ return len(msg)
def has_room_for(self, key, value):
if self._closed:
@@ -107,16 +113,20 @@ class MessageSetBuffer(object):
self._buffer.write(Int32.encode(len(encoded)))
self._buffer.write(encoded)
- # Update the message set size, and return ready for full read()
- size = self._buffer.tell() - 4
+ # Update the message set size (less the 4 byte header),
+ # and return with buffer ready for full read()
+ self._final_size = self._buffer.tell()
self._buffer.seek(0)
- self._buffer.write(Int32.encode(size))
+ self._buffer.write(Int32.encode(self._final_size - 4))
self._buffer.seek(0)
self._closed = True
def size_in_bytes(self):
- return self._buffer.tell()
+ return self._final_size or self._buffer.tell()
+
+ def compression_rate(self):
+ return self.size_in_bytes() / self._bytes_written
def buffer(self):
return self._buffer
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index f5c5d19..70c0cd0 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -9,6 +9,7 @@ import weakref
from .. import errors as Errors
from ..client_async import KafkaClient
+from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
from ..structs import TopicPartition
@@ -220,6 +221,13 @@ class KafkaProducer(object):
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker
api version. Only applies if api_version set to 'auto'
+ metric_reporters (list): A list of classes to use as metrics reporters.
+ Implementing the AbstractMetricsReporter interface allows plugging
+ in classes that will be notified of new metric creation. Default: []
+ metrics_num_samples (int): The number of samples maintained to compute
+ metrics. Default: 2
+ metrics_sample_window_ms (int): The maximum age in milliseconds of
+ samples used to compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@@ -255,7 +263,10 @@ class KafkaProducer(object):
'ssl_keyfile': None,
'ssl_crlfile': None,
'api_version': None,
- 'api_version_auto_timeout_ms': 2000
+ 'api_version_auto_timeout_ms': 2000,
+ 'metric_reporters': [],
+ 'metrics_num_samples': 2,
+ 'metrics_sample_window_ms': 30000,
}
def __init__(self, **configs):
@@ -285,6 +296,14 @@ class KafkaProducer(object):
log.warning('use api_version=%s (%s is deprecated)',
str(self.config['api_version']), deprecated)
+ # Configure metrics
+ metrics_tags = {'client-id': self.config['client_id']}
+ metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
+ time_window_ms=self.config['metrics_sample_window_ms'],
+ tags=metrics_tags)
+ reporters = [reporter() for reporter in self.config['metric_reporters']]
+ self._metrics = Metrics(metric_config, reporters)
+
client = KafkaClient(**self.config)
# Get auto-discovered version from client if necessary
@@ -298,7 +317,8 @@ class KafkaProducer(object):
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
- self._sender = Sender(client, self._metadata, self._accumulator,
+ self._sender = Sender(client, self._metadata,
+ self._accumulator, self._metrics,
guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
@@ -382,6 +402,7 @@ class KafkaProducer(object):
if not invoked_from_callback:
self._sender.join()
+ self._metrics.close()
try:
self.config['key_serializer'].close()
except AttributeError:
@@ -581,3 +602,18 @@ class KafkaProducer(object):
return self.config['partitioner'](serialized_key,
all_partitions,
available)
+
+ def metrics(self, raw=False):
+ """Warning: this is an unstable interface.
+ It may change in future releases without warning"""
+ if raw:
+ return self._metrics.metrics
+
+ metrics = {}
+ for k, v in self._metrics.metrics.items():
+ if k.group not in metrics:
+ metrics[k.group] = {}
+ if k.name not in metrics[k.group]:
+ metrics[k.group][k.name] = {}
+ metrics[k.group][k.name] = v.value()
+ return metrics
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 566bf6f..7ea579a 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -38,7 +38,7 @@ class AtomicInteger(object):
class RecordBatch(object):
def __init__(self, tp, records, message_version=0):
self.record_count = 0
- #self.max_record_size = 0 # for metrics only
+ self.max_record_size = 0
now = time.time()
self.created = now
self.drained = None
@@ -56,8 +56,8 @@ class RecordBatch(object):
return None
msg = Message(value, key=key, magic=self.message_version)
- self.records.append(self.record_count, msg)
- # self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only
+ record_size = self.records.append(self.record_count, msg)
+ self.max_record_size = max(self.max_record_size, record_size)
self.last_append = time.time()
future = FutureRecordMetadata(self.produce_future, self.record_count,
timestamp_ms)
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 958e165..c1d0905 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import collections
import copy
@@ -8,9 +8,11 @@ import threading
import six
from .. import errors as Errors
+from ..metrics.measurable import AnonMeasurable
+from ..metrics.stats import Avg, Count, Max, Rate
+from ..protocol.produce import ProduceRequest
from ..structs import TopicPartition
from ..version import __version__
-from ..protocol.produce import ProduceRequest
log = logging.getLogger(__name__)
@@ -31,7 +33,7 @@ class Sender(threading.Thread):
'api_version': (0, 8, 0),
}
- def __init__(self, client, metadata, accumulator, **configs):
+ def __init__(self, client, metadata, accumulator, metrics, **configs):
super(Sender, self).__init__()
self.config = copy.copy(self._DEFAULT_CONFIG)
for key in self.config:
@@ -45,6 +47,7 @@ class Sender(threading.Thread):
self._running = True
self._force_close = False
self._topics_to_add = set()
+ self._sensors = SenderMetrics(metrics, self._client, self._metadata)
def run(self):
"""The main run loop for the sender thread."""
@@ -119,7 +122,10 @@ class Sender(threading.Thread):
expired_batches = self._accumulator.abort_expired_batches(
self.config['request_timeout_ms'], self._metadata)
+ for expired_batch in expired_batches:
+ self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count)
+ self._sensors.update_produce_request_metrics(batches_by_node)
requests = self._create_produce_requests(batches_by_node)
# If we have any nodes that are ready to send + have sendable data,
# poll with 0 timeout so this can immediately loop and try sending more
@@ -223,6 +229,7 @@ class Sender(threading.Thread):
self.config['retries'] - batch.attempts - 1,
error)
self._accumulator.reenqueue(batch)
+ self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
else:
if error is Errors.TopicAuthorizationFailedError:
error = error(batch.topic_partition.topic)
@@ -230,6 +237,8 @@ class Sender(threading.Thread):
# tell the user the result of their request
batch.done(base_offset, timestamp_ms, error)
self._accumulator.deallocate(batch)
+ if error is not None:
+ self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
if getattr(error, 'invalid_metadata', False):
self._metadata.request_update()
@@ -296,3 +305,200 @@ class Sender(threading.Thread):
def wakeup(self):
"""Wake up the selector associated with this send thread."""
self._client.wakeup()
+
+
+class SenderMetrics(object):
+
+ def __init__(self, metrics, client, metadata):
+ self.metrics = metrics
+ self._client = client
+ self._metadata = metadata
+
+ sensor_name = 'batch-size'
+ self.batch_size_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('batch-size-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average number of bytes sent per partition per-request.')
+ self.add_metric('batch-size-max', Max(),
+ sensor_name=sensor_name,
+ description='The max number of bytes sent per partition per-request.')
+
+ sensor_name = 'compression-rate'
+ self.compression_rate_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('compression-rate-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average compression rate of record batches.')
+
+ sensor_name = 'queue-time'
+ self.queue_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-queue-time-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average time in ms record batches spent in the record accumulator.')
+ self.add_metric('record-queue-time-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum time in ms record batches spent in the record accumulator.')
+
+ sensor_name = 'request-time'
+ self.request_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('request-latency-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average request latency in ms')
+ self.add_metric('request-latency-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum request latency in ms')
+
+ sensor_name = 'produce-throttle-time'
+ self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('produce-throttle-time-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average throttle time in ms')
+ self.add_metric('produce-throttle-time-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum throttle time in ms')
+
+ sensor_name = 'records-per-request'
+ self.records_per_request_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-send-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average number of records sent per second.')
+ self.add_metric('records-per-request-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average number of records per request.')
+
+ sensor_name = 'bytes'
+ self.byte_rate_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('byte-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average number of bytes sent per second.')
+
+ sensor_name = 'record-retries'
+ self.retry_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-retry-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average per-second number of retried record sends')
+
+ sensor_name = 'errors'
+ self.error_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-error-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average per-second number of record sends that resulted in errors')
+
+ sensor_name = 'record-size-max'
+ self.max_record_size_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-size-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum record size across all batches')
+ self.add_metric('record-size-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average maximum record size per batch')
+
+ self.add_metric('requests-in-flight',
+ AnonMeasurable(lambda *_: self._client.in_flight_request_count()),
+ description='The current number of in-flight requests awaiting a response.')
+
+ self.add_metric('metadata-age',
+ AnonMeasurable(lambda _, now: (now - self._metadata._last_successful_refresh_ms) / 1000),
+ description='The age in seconds of the current producer metadata being used.')
+
+ def add_metric(self, metric_name, measurable, group_name='producer-metrics',
+ description=None, tags=None,
+ sensor_name=None):
+ m = self.metrics
+ metric = m.metric_name(metric_name, group_name, description, tags)
+ if sensor_name:
+ sensor = m.sensor(sensor_name)
+ sensor.add(metric, measurable)
+ else:
+ m.add_metric(metric, measurable)
+
+ def maybe_register_topic_metrics(self, topic):
+
+ def sensor_name(name):
+ return 'topic.{0}.{1}'.format(topic, name)
+
+ # if one sensor of the metrics has been registered for the topic,
+ # then all other sensors should have been registered; and vice versa
+ if not self.metrics.get_sensor(sensor_name('records-per-batch')):
+
+ self.add_metric('record-send-rate', Rate(),
+ sensor_name=sensor_name('records-per-batch'),
+ group_name='producer-topic-metrics.' + topic,
+ description= 'Records sent per second for topic ' + topic)
+
+ self.add_metric('byte-rate', Rate(),
+ sensor_name=sensor_name('bytes'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Bytes per second for topic ' + topic)
+
+ self.add_metric('compression-rate', Avg(),
+ sensor_name=sensor_name('compression-rate'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Average Compression ratio for topic ' + topic)
+
+ self.add_metric('record-retry-rate', Rate(),
+ sensor_name=sensor_name('record-retries'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Record retries per second for topic ' + topic)
+
+ self.add_metric('record-error-rate', Rate(),
+ sensor_name=sensor_name('record-errors'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Record errors per second for topic ' + topic)
+
+ def update_produce_request_metrics(self, batches_map):
+ for node_batch in batches_map.values():
+ records = 0
+ total_bytes = 0
+ for batch in node_batch:
+ # register all per-topic metrics at once
+ topic = batch.topic_partition.topic
+ self.maybe_register_topic_metrics(topic)
+
+ # per-topic record send rate
+ topic_records_count = self.metrics.get_sensor(
+ 'topic.' + topic + '.records-per-batch')
+ topic_records_count.record(batch.record_count)
+
+ # per-topic bytes send rate
+ topic_byte_rate = self.metrics.get_sensor(
+ 'topic.' + topic + '.bytes')
+ topic_byte_rate.record(batch.records.size_in_bytes())
+
+ # per-topic compression rate
+ topic_compression_rate = self.metrics.get_sensor(
+ 'topic.' + topic + '.compression-rate')
+ topic_compression_rate.record(batch.records.compression_rate())
+
+ # global metrics
+ self.batch_size_sensor.record(batch.records.size_in_bytes())
+ if batch.drained:
+ self.queue_time_sensor.record(batch.drained - batch.created)
+ self.compression_rate_sensor.record(batch.records.compression_rate())
+ self.max_record_size_sensor.record(batch.max_record_size)
+ records += batch.record_count
+ total_bytes += batch.records.size_in_bytes()
+
+ self.records_per_request_sensor.record(records)
+ self.byte_rate_sensor.record(total_bytes)
+
+ def record_retries(self, topic, count):
+ self.retry_sensor.record(count)
+ sensor = self.metrics.get_sensor('topic.' + topic + '.record-retries')
+ if sensor:
+ sensor.record(count)
+
+ def record_errors(self, topic, count):
+ self.error_sensor.record(count)
+ sensor = self.metrics.get_sensor('topic.' + topic + '.record-errors')
+ if sensor:
+ sensor.record(count)
+
+ def record_latency(self, latency, node=None):
+ self.request_time_sensor.record(latency)
+ if node:
+ sensor = self.metrics.get_sensor('node-' + node + '.latency')
+ if sensor:
+ sensor.record(latency)
+
+ def record_throttle_time(self, throttle_time_ms, node=None):
+ self.produce_throttle_time_sensor.record(throttle_time_ms)