summaryrefslogtreecommitdiff
path: root/kafka/producer/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r--kafka/producer/kafka.py25
1 files changed, 23 insertions, 2 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index f5c5d19..61cdc8b 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -9,6 +9,7 @@ import weakref
from .. import errors as Errors
from ..client_async import KafkaClient
+from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
from ..structs import TopicPartition
@@ -220,6 +221,13 @@ class KafkaProducer(object):
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker
api version. Only applies if api_version set to 'auto'
+ metric_reporters (list): A list of classes to use as metrics reporters.
+ Implementing the AbstractMetricsReporter interface allows plugging
+ in classes that will be notified of new metric creation. Default: []
+ metrics_num_samples (int): The number of samples maintained to compute
+ metrics. Default: 2
+ metrics_sample_window_ms (int): The maximum age in milliseconds of
+ samples used to compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@@ -255,7 +263,10 @@ class KafkaProducer(object):
'ssl_keyfile': None,
'ssl_crlfile': None,
'api_version': None,
- 'api_version_auto_timeout_ms': 2000
+ 'api_version_auto_timeout_ms': 2000,
+ 'metric_reporters': [],
+ 'metrics_num_samples': 2,
+ 'metrics_sample_window_ms': 30000,
}
def __init__(self, **configs):
@@ -285,6 +296,14 @@ class KafkaProducer(object):
log.warning('use api_version=%s (%s is deprecated)',
str(self.config['api_version']), deprecated)
+ # Configure metrics
+ metrics_tags = {'client-id': self.config['client_id']}
+ metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
+ time_window_ms=self.config['metrics_sample_window_ms'],
+ tags=metrics_tags)
+ reporters = [reporter() for reporter in self.config['metric_reporters']]
+ self._metrics = Metrics(metric_config, reporters)
+
client = KafkaClient(**self.config)
# Get auto-discovered version from client if necessary
@@ -298,7 +317,8 @@ class KafkaProducer(object):
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
- self._sender = Sender(client, self._metadata, self._accumulator,
+ self._sender = Sender(client, self._metadata,
+ self._accumulator, self._metrics,
guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
@@ -382,6 +402,7 @@ class KafkaProducer(object):
if not invoked_from_callback:
self._sender.join()
+ self._metrics.close()
try:
self.config['key_serializer'].close()
except AttributeError: