diff options
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r-- | kafka/producer/kafka.py | 25 |
1 files changed, 23 insertions, 2 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f5c5d19..61cdc8b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -9,6 +9,7 @@ import weakref from .. import errors as Errors from ..client_async import KafkaClient +from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet from ..structs import TopicPartition @@ -220,6 +221,13 @@ class KafkaProducer(object): api_version_auto_timeout_ms (int): number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version set to 'auto' + metric_reporters (list): A list of classes to use as metrics reporters. + Implementing the AbstractMetricsReporter interface allows plugging + in classes that will be notified of new metric creation. Default: [] + metrics_num_samples (int): The number of samples maintained to compute + metrics. Default: 2 + metrics_sample_window_ms (int): The maximum age in milliseconds of + samples used to compute metrics. Default: 30000 Note: Configuration parameters are described in more detail at @@ -255,7 +263,10 @@ class KafkaProducer(object): 'ssl_keyfile': None, 'ssl_crlfile': None, 'api_version': None, - 'api_version_auto_timeout_ms': 2000 + 'api_version_auto_timeout_ms': 2000, + 'metric_reporters': [], + 'metrics_num_samples': 2, + 'metrics_sample_window_ms': 30000, } def __init__(self, **configs): @@ -285,6 +296,14 @@ class KafkaProducer(object): log.warning('use api_version=%s (%s is deprecated)', str(self.config['api_version']), deprecated) + # Configure metrics + metrics_tags = {'client-id': self.config['client_id']} + metric_config = MetricConfig(samples=self.config['metrics_num_samples'], + time_window_ms=self.config['metrics_sample_window_ms'], + tags=metrics_tags) + reporters = [reporter() for reporter in self.config['metric_reporters']] + self._metrics = Metrics(metric_config, reporters) + client = KafkaClient(**self.config) # Get auto-discovered version from client if necessary @@ -298,7 +317,8 @@ class KafkaProducer(object): self._accumulator = RecordAccumulator(message_version=message_version, **self.config) self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) - self._sender = Sender(client, self._metadata, self._accumulator, + self._sender = Sender(client, self._metadata, + self._accumulator, self._metrics, guarantee_message_order=guarantee_message_order, **self.config) self._sender.daemon = True @@ -382,6 +402,7 @@ class KafkaProducer(object): if not invoked_from_callback: self._sender.join() + self._metrics.close() try: self.config['key_serializer'].close() except AttributeError: |