diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 190 |
1 files changed, 189 insertions, 1 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 05b0acb..6c4e476 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -14,8 +14,9 @@ from kafka.vendor import six import kafka.errors as Errors from kafka.future import Future +from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.api import RequestHeader -from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse +from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -58,6 +59,7 @@ InFlightRequest = collections.namedtuple('InFlightRequest', class BrokerConnection(object): DEFAULT_CONFIG = { 'client_id': 'kafka-python-' + __version__, + 'node_id': 0, 'request_timeout_ms': 40000, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, @@ -74,6 +76,8 @@ class BrokerConnection(object): 'ssl_password': None, 'api_version': (0, 8, 2), # default to most restrictive 'state_change_callback': lambda conn: True, + 'metrics': None, + 'metric_group_prefix': '', 'sasl_mechanism': 'PLAIN', 'sasl_plain_username': None, 'sasl_plain_password': None @@ -81,6 +85,74 @@ class BrokerConnection(object): SASL_MECHANISMS = ('PLAIN',) def __init__(self, host, port, afi, **configs): + """Initialize a kafka broker connection + + Keyword Arguments: + client_id (str): a name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. Also + submitted to GroupCoordinator for logging with respect to + consumer group administration. Default: 'kafka-python-{version}' + reconnect_backoff_ms (int): The amount of time in milliseconds to + wait before attempting to reconnect to a given host. + Default: 50. + request_timeout_ms (int): Client request timeout in milliseconds. + Default: 40000. + max_in_flight_requests_per_connection (int): Requests are pipelined + to kafka brokers up to this number of maximum requests per + broker connection. Default: 5. + receive_buffer_bytes (int): The size of the TCP receive buffer + (SO_RCVBUF) to use when reading data. Default: None (relies on + system defaults). Java client defaults to 32768. + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: None (relies on + system defaults). Java client defaults to 131072. + socket_options (list): List of tuple-arguments to socket.setsockopt + to apply to broker connection sockets. Default: + [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: True. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: None. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: None. + ssl_keyfile (str): optional filename containing the client private key. + default: None. + ssl_password (callable, str, bytes, bytearray): optional password or + callable function that returns a password, for decrypting the + client private key. Default: None. + ssl_crlfile (str): optional filename containing the CRL to check for + certificate expiration. By default, no CRL check is done. When + providing a file, only the leaf certificate will be checked against + this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. + default: None. + api_version (tuple): specify which kafka API version to use. Accepted + values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10) + If None, KafkaClient will attempt to infer the broker + version by probing various APIs. Default: None + api_version_auto_timeout_ms (int): number of milliseconds to throw a + timeout exception from the constructor when checking the broker + api version. Only applies if api_version is None + state_chance_callback (callable): function to be called when the + connection state changes from CONNECTING to CONNECTED etc. + metrics (kafka.metrics.Metrics): Optionally provide a metrics + instance for capturing network IO stats. Default: None. + metric_group_prefix (str): Prefix for metric names. Default: '' + sasl_mechanism (str): string picking sasl mechanism when security_protocol + is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. + Default: None + sasl_plain_username (str): username for sasl PLAIN authentication. + Default: None + sasl_plain_password (str): passowrd for sasl PLAIN authentication. + Defualt: None + """ self.host = host self.hostname = host self.port = port @@ -123,6 +195,11 @@ class BrokerConnection(object): self._correlation_id = 0 self._gai = None self._gai_index = 0 + self._sensors = None + if self.config['metrics']: + self._sensors = BrokerConnectionMetrics(self.config['metrics'], + self.config['metric_group_prefix'], + self.config['node_id']) def connect(self): """Attempt to connect and return ConnectionState""" @@ -453,6 +530,8 @@ class BrokerConnection(object): sent_bytes = self._sock.send(data[total_sent:]) total_sent += sent_bytes assert total_sent == len(data) + if self._sensors: + self._sensors.bytes_sent.record(total_sent) self._sock.setblocking(False) except (AssertionError, ConnectionError) as e: log.exception("Error sending %s to %s", request, self) @@ -583,6 +662,8 @@ class BrokerConnection(object): self._receiving = False self._next_payload_bytes = 0 + if self._sensors: + self._sensors.bytes_received.record(4 + self._rbuffer.tell()) self._rbuffer.seek(0) response = self._process_response(self._rbuffer) self._rbuffer.seek(0) @@ -593,6 +674,8 @@ class BrokerConnection(object): assert not self._processing, 'Recursion not supported' self._processing = True ifr = self.in_flight_requests.popleft() + if self._sensors: + self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000) # verify send/recv correlation ids match recv_correlation_id = Int32.decode(read_buffer) @@ -762,6 +845,111 @@ class BrokerConnection(object): self.port) +class BrokerConnectionMetrics(object): + def __init__(self, metrics, metric_group_prefix, node_id): + self.metrics = metrics + + # Any broker may have registered summary metrics already + # but if not, we need to create them so we can set as parents below + all_conns_transferred = metrics.get_sensor('bytes-sent-received') + if not all_conns_transferred: + metric_group_name = metric_group_prefix + '-metrics' + + bytes_transferred = metrics.sensor('bytes-sent-received') + bytes_transferred.add(metrics.metric_name( + 'network-io-rate', metric_group_name, + 'The average number of network operations (reads or writes) on all' + ' connections per second.'), Rate(sampled_stat=Count())) + + bytes_sent = metrics.sensor('bytes-sent', + parents=[bytes_transferred]) + bytes_sent.add(metrics.metric_name( + 'outgoing-byte-rate', metric_group_name, + 'The average number of outgoing bytes sent per second to all' + ' servers.'), Rate()) + bytes_sent.add(metrics.metric_name( + 'request-rate', metric_group_name, + 'The average number of requests sent per second.'), + Rate(sampled_stat=Count())) + bytes_sent.add(metrics.metric_name( + 'request-size-avg', metric_group_name, + 'The average size of all requests in the window.'), Avg()) + bytes_sent.add(metrics.metric_name( + 'request-size-max', metric_group_name, + 'The maximum size of any request sent in the window.'), Max()) + + bytes_received = metrics.sensor('bytes-received', + parents=[bytes_transferred]) + bytes_received.add(metrics.metric_name( + 'incoming-byte-rate', metric_group_name, + 'Bytes/second read off all sockets'), Rate()) + bytes_received.add(metrics.metric_name( + 'response-rate', metric_group_name, + 'Responses received sent per second.'), + Rate(sampled_stat=Count())) + + request_latency = metrics.sensor('request-latency') + request_latency.add(metrics.metric_name( + 'request-latency-avg', metric_group_name, + 'The average request latency in ms.'), + Avg()) + request_latency.add(metrics.metric_name( + 'request-latency-max', metric_group_name, + 'The maximum request latency in ms.'), + Max()) + + # if one sensor of the metrics has been registered for the connection, + # then all other sensors should have been registered; and vice versa + node_str = 'node-{0}'.format(node_id) + node_sensor = metrics.get_sensor(node_str + '.bytes-sent') + if not node_sensor: + metric_group_name = metric_group_prefix + '-node-metrics.' + node_str + + self.bytes_sent = metrics.sensor( + node_str + '.bytes-sent', + parents=[metrics.get_sensor('bytes-sent')]) + self.bytes_sent.add(metrics.metric_name( + 'outgoing-byte-rate', metric_group_name, + 'The average number of outgoing bytes sent per second.'), + Rate()) + self.bytes_sent.add(metrics.metric_name( + 'request-rate', metric_group_name, + 'The average number of requests sent per second.'), + Rate(sampled_stat=Count())) + self.bytes_sent.add(metrics.metric_name( + 'request-size-avg', metric_group_name, + 'The average size of all requests in the window.'), + Avg()) + self.bytes_sent.add(metrics.metric_name( + 'request-size-max', metric_group_name, + 'The maximum size of any request sent in the window.'), + Max()) + + self.bytes_received = metrics.sensor( + node_str + '.bytes-received', + parents=[metrics.get_sensor('bytes-received')]) + self.bytes_received.add(metrics.metric_name( + 'incoming-byte-rate', metric_group_name, + 'Bytes/second read off node-connection socket'), + Rate()) + self.bytes_received.add(metrics.metric_name( + 'response-rate', metric_group_name, + 'The average number of responses received per second.'), + Rate(sampled_stat=Count())) + + self.request_time = self.metrics.sensor( + node_str + '.latency', + parents=[metrics.get_sensor('request-latency')]) + self.request_time.add(metrics.metric_name( + 'request-latency-avg', metric_group_name, + 'The average request latency in ms.'), + Avg()) + self.request_time.add(metrics.metric_name( + 'request-latency-max', metric_group_name, + 'The maximum request latency in ms.'), + Max()) + + def _address_family(address): """ Attempt to determine the family of an address (or hostname) |