diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-08-04 12:54:53 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-08-04 13:11:20 -0700 |
commit | 025b69ef4ae22d1677904e99f924b9ef5a096e75 (patch) | |
tree | 38d12fc11f82c492c68a4e04dbac26664862f541 /kafka/producer/buffer.py | |
parent | 460f0784a30f303b4543763ca330cce52d6054eb (diff) | |
download | kafka-python-conn_metrics.tar.gz |
Instrument bufferpool-wait-ratio metric in KafkaProducerconn_metrics
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r-- | kafka/producer/buffer.py | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 5f41bac..422d47c 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -9,6 +9,7 @@ from ..codec import (has_gzip, has_snappy, has_lz4, gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka) from .. import errors as Errors +from ..metrics.stats import Rate from ..protocol.types import Int32, Int64 from ..protocol.message import MessageSet, Message @@ -135,7 +136,7 @@ class MessageSetBuffer(object): class SimpleBufferPool(object): """A simple pool of BytesIO objects with a weak memory ceiling.""" - def __init__(self, memory, poolable_size): + def __init__(self, memory, poolable_size, metrics=None, metric_group_prefix='producer-metrics'): """Create a new buffer pool. Arguments: @@ -150,10 +151,13 @@ class SimpleBufferPool(object): self._free = collections.deque([io.BytesIO() for _ in range(buffers)]) self._waiters = collections.deque() - #self.metrics = metrics; - #self.waitTime = this.metrics.sensor("bufferpool-wait-time"); - #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation."); - #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); + self.wait_time = None + if metrics: + self.wait_time = metrics.sensor('bufferpool-wait-time') + self.wait_time.add(metrics.metric_name( + 'bufferpool-wait-ratio', metric_group_prefix, + 'The fraction of time an appender waits for space allocation.'), + Rate()) def allocate(self, size, max_time_to_block_ms): """ @@ -187,7 +191,8 @@ class SimpleBufferPool(object): start_wait = time.time() more_memory.wait(max_time_to_block_ms / 1000.0) end_wait = time.time() - #this.waitTime.record(endWait - startWait, time.milliseconds()); + if self.wait_time: + self.wait_time.record(end_wait - start_wait) if self._free: buf = self._free.popleft() |