diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/producer/buffer.py | 17 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 2 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 6 |
3 files changed, 17 insertions, 8 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 5f41bac..422d47c 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -9,6 +9,7 @@ from ..codec import (has_gzip, has_snappy, has_lz4, gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka) from .. import errors as Errors +from ..metrics.stats import Rate from ..protocol.types import Int32, Int64 from ..protocol.message import MessageSet, Message @@ -135,7 +136,7 @@ class MessageSetBuffer(object): class SimpleBufferPool(object): """A simple pool of BytesIO objects with a weak memory ceiling.""" - def __init__(self, memory, poolable_size): + def __init__(self, memory, poolable_size, metrics=None, metric_group_prefix='producer-metrics'): """Create a new buffer pool. Arguments: @@ -150,10 +151,13 @@ class SimpleBufferPool(object): self._free = collections.deque([io.BytesIO() for _ in range(buffers)]) self._waiters = collections.deque() - #self.metrics = metrics; - #self.waitTime = this.metrics.sensor("bufferpool-wait-time"); - #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation."); - #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); + self.wait_time = None + if metrics: + self.wait_time = metrics.sensor('bufferpool-wait-time') + self.wait_time.add(metrics.metric_name( + 'bufferpool-wait-ratio', metric_group_prefix, + 'The fraction of time an appender waits for space allocation.'), + Rate()) def allocate(self, size, max_time_to_block_ms): """ @@ -187,7 +191,8 @@ class SimpleBufferPool(object): start_wait = time.time() more_memory.wait(max_time_to_block_ms / 1000.0) end_wait = time.time() - #this.waitTime.record(endWait - startWait, time.milliseconds()); + if self.wait_time: + self.wait_time.record(end_wait - start_wait) if self._free: buf = self._free.popleft() diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e3b0d69..84039f6 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -335,7 +335,7 @@ class KafkaProducer(object): assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' message_version = 1 if self.config['api_version'] >= (0, 10) else 0 - self._accumulator = RecordAccumulator(message_version=message_version, **self.config) + self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) self._sender = Sender(client, self._metadata, diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 3e2d903..8fe6abb 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -162,6 +162,8 @@ class RecordAccumulator(object): 'linger_ms': 0, 'retry_backoff_ms': 100, 'message_version': 0, + 'metrics': None, + 'metric_group_prefix': 'producer-metrics', } def __init__(self, **configs): @@ -176,7 +178,9 @@ class RecordAccumulator(object): self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch] self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries self._free = SimpleBufferPool(self.config['buffer_memory'], - self.config['batch_size']) + self.config['batch_size'], + metrics=self.config['metrics'], + metric_group_prefix=self.config['metric_group_prefix']) self._incomplete = IncompleteRecordBatches() # The following variables should only be accessed by the sender thread, # so we don't need to protect them w/ locking. |