summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2017-10-08 10:00:39 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-08 10:00:39 -0700
commit87e5d1625968c214d2ad6c198ec526ef484f3688 (patch)
tree8a7ba1952b19a92c3abeb26cf65e7ebc6a4cd1e2 /kafka
parent8f211805c40a446c74c62a8b3558c75a33eaa161 (diff)
downloadkafka-python-87e5d1625968c214d2ad6c198ec526ef484f3688.tar.gz
Expand metrics docs (#1243)
* Expand metrics docstrings * Document metrics interface in readme * Use six.iteritems(d) rather than d.items() * Use Sphinx warning syntax
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/group.py15
-rw-r--r--kafka/producer/kafka.py21
2 files changed, 26 insertions, 10 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index b7fbd83..a83d5da 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -846,13 +846,20 @@ class KafkaConsumer(six.Iterator):
log.debug("Unsubscribed all topics or patterns and assigned partitions")
def metrics(self, raw=False):
- """Warning: this is an unstable interface.
- It may change in future releases without warning"""
+ """Get metrics on consumer performance.
+
+ This is ported from the Java Consumer, for details see:
+ https://kafka.apache.org/documentation/#new_consumer_monitoring
+
+ Warning:
+ This is an unstable interface. It may change in future
+ releases without warning.
+ """
if raw:
return self._metrics.metrics
metrics = {}
- for k, v in self._metrics.metrics.items():
+ for k, v in six.iteritems(self._metrics.metrics):
if k.group not in metrics:
metrics[k.group] = {}
if k.name not in metrics[k.group]:
@@ -897,7 +904,7 @@ class KafkaConsumer(six.Iterator):
raise UnsupportedVersionError(
"offsets_for_times API not supported for cluster version {}"
.format(self.config['api_version']))
- for tp, ts in timestamps.items():
+ for tp, ts in six.iteritems(timestamps):
timestamps[tp] = int(ts)
if ts < 0:
raise ValueError(
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 09ca744..de9dcd2 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -8,6 +8,8 @@ import threading
import time
import weakref
+from ..vendor import six
+
from .. import errors as Errors
from ..client_async import KafkaClient, selectors
from ..metrics import MetricConfig, Metrics
@@ -566,10 +568,10 @@ class KafkaProducer(object):
Arguments:
timeout (float, optional): timeout in seconds to wait for completion.
-
+
Raises:
- KafkaTimeoutError: failure to flush buffered records within the
- provided timeout
+ KafkaTimeoutError: failure to flush buffered records within the
+ provided timeout
"""
log.debug("Flushing accumulated records in producer.") # trace
self._accumulator.begin_flush()
@@ -655,13 +657,20 @@ class KafkaProducer(object):
available)
def metrics(self, raw=False):
- """Warning: this is an unstable interface.
- It may change in future releases without warning"""
+ """Get metrics on producer performance.
+
+ This is ported from the Java Producer, for details see:
+ https://kafka.apache.org/documentation/#producer_monitoring
+
+ Warning:
+ This is an unstable interface. It may change in future
+ releases without warning.
+ """
if raw:
return self._metrics.metrics
metrics = {}
- for k, v in self._metrics.metrics.items():
+ for k, v in six.iteritems(self._metrics.metrics):
if k.group not in metrics:
metrics[k.group] = {}
if k.name not in metrics[k.group]: