summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-08-04 14:22:40 -0700
committerGitHub <noreply@github.com>2016-08-04 14:22:40 -0700
commit68c8fa4ad01f8fef38708f257cb1c261cfac01ab (patch)
tree38d12fc11f82c492c68a4e04dbac26664862f541 /kafka/conn.py
parent3c9b1b6fc498f95806ee12f67f84ea548ac1378f (diff)
parent025b69ef4ae22d1677904e99f924b9ef5a096e75 (diff)
downloadkafka-python-68c8fa4ad01f8fef38708f257cb1c261cfac01ab.tar.gz
Merge pull request #794 from dpkp/conn_metrics
Complete metrics instrumentation
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py190
1 files changed, 189 insertions, 1 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 05b0acb..6c4e476 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -14,8 +14,9 @@ from kafka.vendor import six
import kafka.errors as Errors
from kafka.future import Future
+from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api import RequestHeader
-from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse
+from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.types import Int32
from kafka.version import __version__
@@ -58,6 +59,7 @@ InFlightRequest = collections.namedtuple('InFlightRequest',
class BrokerConnection(object):
DEFAULT_CONFIG = {
'client_id': 'kafka-python-' + __version__,
+ 'node_id': 0,
'request_timeout_ms': 40000,
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
@@ -74,6 +76,8 @@ class BrokerConnection(object):
'ssl_password': None,
'api_version': (0, 8, 2), # default to most restrictive
'state_change_callback': lambda conn: True,
+ 'metrics': None,
+ 'metric_group_prefix': '',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': None,
'sasl_plain_password': None
@@ -81,6 +85,74 @@ class BrokerConnection(object):
SASL_MECHANISMS = ('PLAIN',)
def __init__(self, host, port, afi, **configs):
+ """Initialize a kafka broker connection
+
+ Keyword Arguments:
+ client_id (str): a name for this client. This string is passed in
+ each request to servers and can be used to identify specific
+ server-side log entries that correspond to this client. Also
+ submitted to GroupCoordinator for logging with respect to
+ consumer group administration. Default: 'kafka-python-{version}'
+ reconnect_backoff_ms (int): The amount of time in milliseconds to
+ wait before attempting to reconnect to a given host.
+ Default: 50.
+ request_timeout_ms (int): Client request timeout in milliseconds.
+ Default: 40000.
+ max_in_flight_requests_per_connection (int): Requests are pipelined
+ to kafka brokers up to this number of maximum requests per
+ broker connection. Default: 5.
+ receive_buffer_bytes (int): The size of the TCP receive buffer
+ (SO_RCVBUF) to use when reading data. Default: None (relies on
+ system defaults). Java client defaults to 32768.
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: None (relies on
+ system defaults). Java client defaults to 131072.
+ socket_options (list): List of tuple-arguments to socket.setsockopt
+ to apply to broker connection sockets. Default:
+ [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
+ security_protocol (str): Protocol used to communicate with brokers.
+ Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
+ socket connections. If provided, all other ssl_* configurations
+ will be ignored. Default: None.
+ ssl_check_hostname (bool): flag to configure whether ssl handshake
+ should verify that the certificate matches the brokers hostname.
+ default: True.
+ ssl_cafile (str): optional filename of ca file to use in certificate
+ veriication. default: None.
+ ssl_certfile (str): optional filename of file in pem format containing
+ the client certificate, as well as any ca certificates needed to
+ establish the certificate's authenticity. default: None.
+ ssl_keyfile (str): optional filename containing the client private key.
+ default: None.
+ ssl_password (callable, str, bytes, bytearray): optional password or
+ callable function that returns a password, for decrypting the
+ client private key. Default: None.
+ ssl_crlfile (str): optional filename containing the CRL to check for
+ certificate expiration. By default, no CRL check is done. When
+ providing a file, only the leaf certificate will be checked against
+ this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
+ default: None.
+ api_version (tuple): specify which kafka API version to use. Accepted
+ values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10)
+ If None, KafkaClient will attempt to infer the broker
+ version by probing various APIs. Default: None
+ api_version_auto_timeout_ms (int): number of milliseconds to throw a
+ timeout exception from the constructor when checking the broker
+ api version. Only applies if api_version is None
+ state_chance_callback (callable): function to be called when the
+ connection state changes from CONNECTING to CONNECTED etc.
+ metrics (kafka.metrics.Metrics): Optionally provide a metrics
+ instance for capturing network IO stats. Default: None.
+ metric_group_prefix (str): Prefix for metric names. Default: ''
+ sasl_mechanism (str): string picking sasl mechanism when security_protocol
+ is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
+ Default: None
+ sasl_plain_username (str): username for sasl PLAIN authentication.
+ Default: None
+ sasl_plain_password (str): passowrd for sasl PLAIN authentication.
+ Defualt: None
+ """
self.host = host
self.hostname = host
self.port = port
@@ -123,6 +195,11 @@ class BrokerConnection(object):
self._correlation_id = 0
self._gai = None
self._gai_index = 0
+ self._sensors = None
+ if self.config['metrics']:
+ self._sensors = BrokerConnectionMetrics(self.config['metrics'],
+ self.config['metric_group_prefix'],
+ self.config['node_id'])
def connect(self):
"""Attempt to connect and return ConnectionState"""
@@ -453,6 +530,8 @@ class BrokerConnection(object):
sent_bytes = self._sock.send(data[total_sent:])
total_sent += sent_bytes
assert total_sent == len(data)
+ if self._sensors:
+ self._sensors.bytes_sent.record(total_sent)
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)
@@ -583,6 +662,8 @@ class BrokerConnection(object):
self._receiving = False
self._next_payload_bytes = 0
+ if self._sensors:
+ self._sensors.bytes_received.record(4 + self._rbuffer.tell())
self._rbuffer.seek(0)
response = self._process_response(self._rbuffer)
self._rbuffer.seek(0)
@@ -593,6 +674,8 @@ class BrokerConnection(object):
assert not self._processing, 'Recursion not supported'
self._processing = True
ifr = self.in_flight_requests.popleft()
+ if self._sensors:
+ self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000)
# verify send/recv correlation ids match
recv_correlation_id = Int32.decode(read_buffer)
@@ -762,6 +845,111 @@ class BrokerConnection(object):
self.port)
+class BrokerConnectionMetrics(object):
+ def __init__(self, metrics, metric_group_prefix, node_id):
+ self.metrics = metrics
+
+ # Any broker may have registered summary metrics already
+ # but if not, we need to create them so we can set as parents below
+ all_conns_transferred = metrics.get_sensor('bytes-sent-received')
+ if not all_conns_transferred:
+ metric_group_name = metric_group_prefix + '-metrics'
+
+ bytes_transferred = metrics.sensor('bytes-sent-received')
+ bytes_transferred.add(metrics.metric_name(
+ 'network-io-rate', metric_group_name,
+ 'The average number of network operations (reads or writes) on all'
+ ' connections per second.'), Rate(sampled_stat=Count()))
+
+ bytes_sent = metrics.sensor('bytes-sent',
+ parents=[bytes_transferred])
+ bytes_sent.add(metrics.metric_name(
+ 'outgoing-byte-rate', metric_group_name,
+ 'The average number of outgoing bytes sent per second to all'
+ ' servers.'), Rate())
+ bytes_sent.add(metrics.metric_name(
+ 'request-rate', metric_group_name,
+ 'The average number of requests sent per second.'),
+ Rate(sampled_stat=Count()))
+ bytes_sent.add(metrics.metric_name(
+ 'request-size-avg', metric_group_name,
+ 'The average size of all requests in the window.'), Avg())
+ bytes_sent.add(metrics.metric_name(
+ 'request-size-max', metric_group_name,
+ 'The maximum size of any request sent in the window.'), Max())
+
+ bytes_received = metrics.sensor('bytes-received',
+ parents=[bytes_transferred])
+ bytes_received.add(metrics.metric_name(
+ 'incoming-byte-rate', metric_group_name,
+ 'Bytes/second read off all sockets'), Rate())
+ bytes_received.add(metrics.metric_name(
+ 'response-rate', metric_group_name,
+ 'Responses received sent per second.'),
+ Rate(sampled_stat=Count()))
+
+ request_latency = metrics.sensor('request-latency')
+ request_latency.add(metrics.metric_name(
+ 'request-latency-avg', metric_group_name,
+ 'The average request latency in ms.'),
+ Avg())
+ request_latency.add(metrics.metric_name(
+ 'request-latency-max', metric_group_name,
+ 'The maximum request latency in ms.'),
+ Max())
+
+ # if one sensor of the metrics has been registered for the connection,
+ # then all other sensors should have been registered; and vice versa
+ node_str = 'node-{0}'.format(node_id)
+ node_sensor = metrics.get_sensor(node_str + '.bytes-sent')
+ if not node_sensor:
+ metric_group_name = metric_group_prefix + '-node-metrics.' + node_str
+
+ self.bytes_sent = metrics.sensor(
+ node_str + '.bytes-sent',
+ parents=[metrics.get_sensor('bytes-sent')])
+ self.bytes_sent.add(metrics.metric_name(
+ 'outgoing-byte-rate', metric_group_name,
+ 'The average number of outgoing bytes sent per second.'),
+ Rate())
+ self.bytes_sent.add(metrics.metric_name(
+ 'request-rate', metric_group_name,
+ 'The average number of requests sent per second.'),
+ Rate(sampled_stat=Count()))
+ self.bytes_sent.add(metrics.metric_name(
+ 'request-size-avg', metric_group_name,
+ 'The average size of all requests in the window.'),
+ Avg())
+ self.bytes_sent.add(metrics.metric_name(
+ 'request-size-max', metric_group_name,
+ 'The maximum size of any request sent in the window.'),
+ Max())
+
+ self.bytes_received = metrics.sensor(
+ node_str + '.bytes-received',
+ parents=[metrics.get_sensor('bytes-received')])
+ self.bytes_received.add(metrics.metric_name(
+ 'incoming-byte-rate', metric_group_name,
+ 'Bytes/second read off node-connection socket'),
+ Rate())
+ self.bytes_received.add(metrics.metric_name(
+ 'response-rate', metric_group_name,
+ 'The average number of responses received per second.'),
+ Rate(sampled_stat=Count()))
+
+ self.request_time = self.metrics.sensor(
+ node_str + '.latency',
+ parents=[metrics.get_sensor('request-latency')])
+ self.request_time.add(metrics.metric_name(
+ 'request-latency-avg', metric_group_name,
+ 'The average request latency in ms.'),
+ Avg())
+ self.request_time.add(metrics.metric_name(
+ 'request-latency-max', metric_group_name,
+ 'The maximum request latency in ms.'),
+ Max())
+
+
def _address_family(address):
"""
Attempt to determine the family of an address (or hostname)