summaryrefslogtreecommitdiff
path: root/kafka/producer/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r--kafka/producer/kafka.py21
1 files changed, 15 insertions, 6 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 09ca744..de9dcd2 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -8,6 +8,8 @@ import threading
import time
import weakref
+from ..vendor import six
+
from .. import errors as Errors
from ..client_async import KafkaClient, selectors
from ..metrics import MetricConfig, Metrics
@@ -566,10 +568,10 @@ class KafkaProducer(object):
Arguments:
timeout (float, optional): timeout in seconds to wait for completion.
-
+
Raises:
- KafkaTimeoutError: failure to flush buffered records within the
- provided timeout
+ KafkaTimeoutError: failure to flush buffered records within the
+ provided timeout
"""
log.debug("Flushing accumulated records in producer.") # trace
self._accumulator.begin_flush()
@@ -655,13 +657,20 @@ class KafkaProducer(object):
available)
def metrics(self, raw=False):
- """Warning: this is an unstable interface.
- It may change in future releases without warning"""
+ """Get metrics on producer performance.
+
+ This is ported from the Java Producer, for details see:
+ https://kafka.apache.org/documentation/#producer_monitoring
+
+ Warning:
+ This is an unstable interface. It may change in future
+ releases without warning.
+ """
if raw:
return self._metrics.metrics
metrics = {}
- for k, v in self._metrics.metrics.items():
+ for k, v in six.iteritems(self._metrics.metrics):
if k.group not in metrics:
metrics[k.group] = {}
if k.name not in metrics[k.group]: