diff options
-rw-r--r-- | kafka/client_async.py | 2 | ||||
-rw-r--r-- | kafka/conn.py | 123 | ||||
-rw-r--r-- | kafka/producer/sender.py | 17 | ||||
-rw-r--r-- | test/test_client_async.py | 2 |
4 files changed, 127 insertions, 17 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ff566ca..ce1d13b 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -222,6 +222,7 @@ class KafkaClient(object): cb = functools.partial(self._conn_state_change, 'bootstrap') bootstrap = BrokerConnection(host, port, afi, state_change_callback=cb, + node_id='bootstrap', **self.config) bootstrap.connect() while bootstrap.connecting(): @@ -313,6 +314,7 @@ class KafkaClient(object): cb = functools.partial(self._conn_state_change, node_id) self._conns[node_id] = BrokerConnection(host, broker.port, afi, state_change_callback=cb, + node_id=node_id, **self.config) conn = self._conns[node_id] if conn.connected(): diff --git a/kafka/conn.py b/kafka/conn.py index 0a5237d..6c4e476 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -14,6 +14,7 @@ from kafka.vendor import six import kafka.errors as Errors from kafka.future import Future +from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.api import RequestHeader from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import GroupCoordinatorResponse @@ -58,6 +59,7 @@ InFlightRequest = collections.namedtuple('InFlightRequest', class BrokerConnection(object): DEFAULT_CONFIG = { 'client_id': 'kafka-python-' + __version__, + 'node_id': 0, 'request_timeout_ms': 40000, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, @@ -74,6 +76,8 @@ class BrokerConnection(object): 'ssl_password': None, 'api_version': (0, 8, 2), # default to most restrictive 'state_change_callback': lambda conn: True, + 'metrics': None, + 'metric_group_prefix': '', 'sasl_mechanism': 'PLAIN', 'sasl_plain_username': None, 'sasl_plain_password': None @@ -138,6 +142,9 @@ class BrokerConnection(object): api version. Only applies if api_version is None state_chance_callback (callable): function to be called when the connection state changes from CONNECTING to CONNECTED etc. + metrics (kafka.metrics.Metrics): Optionally provide a metrics + instance for capturing network IO stats. Default: None. + metric_group_prefix (str): Prefix for metric names. Default: '' sasl_mechanism (str): string picking sasl mechanism when security_protocol is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. Default: None @@ -188,6 +195,11 @@ class BrokerConnection(object): self._correlation_id = 0 self._gai = None self._gai_index = 0 + self._sensors = None + if self.config['metrics']: + self._sensors = BrokerConnectionMetrics(self.config['metrics'], + self.config['metric_group_prefix'], + self.config['node_id']) def connect(self): """Attempt to connect and return ConnectionState""" @@ -518,6 +530,8 @@ class BrokerConnection(object): sent_bytes = self._sock.send(data[total_sent:]) total_sent += sent_bytes assert total_sent == len(data) + if self._sensors: + self._sensors.bytes_sent.record(total_sent) self._sock.setblocking(False) except (AssertionError, ConnectionError) as e: log.exception("Error sending %s to %s", request, self) @@ -648,6 +662,8 @@ class BrokerConnection(object): self._receiving = False self._next_payload_bytes = 0 + if self._sensors: + self._sensors.bytes_received.record(4 + self._rbuffer.tell()) self._rbuffer.seek(0) response = self._process_response(self._rbuffer) self._rbuffer.seek(0) @@ -658,6 +674,8 @@ class BrokerConnection(object): assert not self._processing, 'Recursion not supported' self._processing = True ifr = self.in_flight_requests.popleft() + if self._sensors: + self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000) # verify send/recv correlation ids match recv_correlation_id = Int32.decode(read_buffer) @@ -827,6 +845,111 @@ class BrokerConnection(object): self.port) +class BrokerConnectionMetrics(object): + def __init__(self, metrics, metric_group_prefix, node_id): + self.metrics = metrics + + # Any broker may have registered summary metrics already + # but if not, we need to create them so we can set as parents below + all_conns_transferred = metrics.get_sensor('bytes-sent-received') + if not all_conns_transferred: + metric_group_name = metric_group_prefix + '-metrics' + + bytes_transferred = metrics.sensor('bytes-sent-received') + bytes_transferred.add(metrics.metric_name( + 'network-io-rate', metric_group_name, + 'The average number of network operations (reads or writes) on all' + ' connections per second.'), Rate(sampled_stat=Count())) + + bytes_sent = metrics.sensor('bytes-sent', + parents=[bytes_transferred]) + bytes_sent.add(metrics.metric_name( + 'outgoing-byte-rate', metric_group_name, + 'The average number of outgoing bytes sent per second to all' + ' servers.'), Rate()) + bytes_sent.add(metrics.metric_name( + 'request-rate', metric_group_name, + 'The average number of requests sent per second.'), + Rate(sampled_stat=Count())) + bytes_sent.add(metrics.metric_name( + 'request-size-avg', metric_group_name, + 'The average size of all requests in the window.'), Avg()) + bytes_sent.add(metrics.metric_name( + 'request-size-max', metric_group_name, + 'The maximum size of any request sent in the window.'), Max()) + + bytes_received = metrics.sensor('bytes-received', + parents=[bytes_transferred]) + bytes_received.add(metrics.metric_name( + 'incoming-byte-rate', metric_group_name, + 'Bytes/second read off all sockets'), Rate()) + bytes_received.add(metrics.metric_name( + 'response-rate', metric_group_name, + 'Responses received sent per second.'), + Rate(sampled_stat=Count())) + + request_latency = metrics.sensor('request-latency') + request_latency.add(metrics.metric_name( + 'request-latency-avg', metric_group_name, + 'The average request latency in ms.'), + Avg()) + request_latency.add(metrics.metric_name( + 'request-latency-max', metric_group_name, + 'The maximum request latency in ms.'), + Max()) + + # if one sensor of the metrics has been registered for the connection, + # then all other sensors should have been registered; and vice versa + node_str = 'node-{0}'.format(node_id) + node_sensor = metrics.get_sensor(node_str + '.bytes-sent') + if not node_sensor: + metric_group_name = metric_group_prefix + '-node-metrics.' + node_str + + self.bytes_sent = metrics.sensor( + node_str + '.bytes-sent', + parents=[metrics.get_sensor('bytes-sent')]) + self.bytes_sent.add(metrics.metric_name( + 'outgoing-byte-rate', metric_group_name, + 'The average number of outgoing bytes sent per second.'), + Rate()) + self.bytes_sent.add(metrics.metric_name( + 'request-rate', metric_group_name, + 'The average number of requests sent per second.'), + Rate(sampled_stat=Count())) + self.bytes_sent.add(metrics.metric_name( + 'request-size-avg', metric_group_name, + 'The average size of all requests in the window.'), + Avg()) + self.bytes_sent.add(metrics.metric_name( + 'request-size-max', metric_group_name, + 'The maximum size of any request sent in the window.'), + Max()) + + self.bytes_received = metrics.sensor( + node_str + '.bytes-received', + parents=[metrics.get_sensor('bytes-received')]) + self.bytes_received.add(metrics.metric_name( + 'incoming-byte-rate', metric_group_name, + 'Bytes/second read off node-connection socket'), + Rate()) + self.bytes_received.add(metrics.metric_name( + 'response-rate', metric_group_name, + 'The average number of responses received per second.'), + Rate(sampled_stat=Count())) + + self.request_time = self.metrics.sensor( + node_str + '.latency', + parents=[metrics.get_sensor('request-latency')]) + self.request_time.add(metrics.metric_name( + 'request-latency-avg', metric_group_name, + 'The average request latency in ms.'), + Avg()) + self.request_time.add(metrics.metric_name( + 'request-latency-max', metric_group_name, + 'The maximum request latency in ms.'), + Max()) + + def _address_family(address): """ Attempt to determine the family of an address (or hostname) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index aafa06a..2974faf 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -204,7 +204,6 @@ class Sender(threading.Thread): batch = batches_by_partition[tp] self._complete_batch(batch, error, offset, ts) - self._sensors.record_latency((time.time() - send_time) * 1000, node=node_id) if response.API_VERSION > 0: self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id) @@ -343,15 +342,6 @@ class SenderMetrics(object): sensor_name=sensor_name, description='The maximum time in ms record batches spent in the record accumulator.') - sensor_name = 'request-time' - self.request_time_sensor = self.metrics.sensor(sensor_name) - self.add_metric('request-latency-avg', Avg(), - sensor_name=sensor_name, - description='The average request latency in ms') - self.add_metric('request-latency-max', Max(), - sensor_name=sensor_name, - description='The maximum request latency in ms') - sensor_name = 'produce-throttle-time' self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name) self.add_metric('produce-throttle-time-avg', Avg(), @@ -498,12 +488,5 @@ class SenderMetrics(object): if sensor: sensor.record(count) - def record_latency(self, latency, node=None): - self.request_time_sensor.record(latency) - if node is not None: - sensor = self.metrics.get_sensor('node-' + str(node) + '.latency') - if sensor: - sensor.record(latency) - def record_throttle_time(self, throttle_time_ms, node=None): self.produce_throttle_time_sensor.record(throttle_time_ms) diff --git a/test/test_client_async.py b/test/test_client_async.py index 8b3634a..b165f93 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -49,6 +49,7 @@ def test_bootstrap_success(conn): args, kwargs = conn.call_args assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') + kwargs.pop('node_id') assert kwargs == cli.config conn.connect.assert_called_with() conn.send.assert_called_once_with(MetadataRequest[0]([])) @@ -62,6 +63,7 @@ def test_bootstrap_failure(conn): args, kwargs = conn.call_args assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') + kwargs.pop('node_id') assert kwargs == cli.config conn.connect.assert_called_with() conn.close.assert_called_with() |