diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-08 17:00:47 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-08 17:00:47 -0700 |
commit | 810f08b7996a15e65cdd8af6c1a7167c28f94646 (patch) | |
tree | 88d8165f61e23344728aa5490395ad2e42076583 | |
parent | 4323e5c21cb151728b7985e24a1ad44fd36fd9fb (diff) | |
parent | 897ca399917baa178390af78870fe4be90c051d5 (diff) | |
download | kafka-python-810f08b7996a15e65cdd8af6c1a7167c28f94646.tar.gz |
Merge pull request #639 from dpkp/conn_state_callback
Use KafkaClient callback to manage BrokerConnection state changes
-rw-r--r-- | kafka/client_async.py | 136 | ||||
-rw-r--r-- | kafka/conn.py | 104 | ||||
-rw-r--r-- | test/test_client_async.py | 44 |
3 files changed, 174 insertions, 110 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index cfc89fc..0c22f90 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import copy +import functools import heapq import itertools import logging @@ -93,6 +94,7 @@ class KafkaClient(object): self._metadata_refresh_in_progress = False self._conns = {} self._connecting = set() + self._refresh_on_disconnects = True self._delayed_tasks = DelayedTaskQueue() self._last_bootstrap = 0 self._bootstrap_fails = 0 @@ -117,7 +119,10 @@ class KafkaClient(object): metadata_request = MetadataRequest[0]([]) for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) - bootstrap = BrokerConnection(host, port, afi, **self.config) + cb = functools.partial(self._conn_state_change, 'bootstrap') + bootstrap = BrokerConnection(host, port, afi, + state_change_callback=cb, + **self.config) bootstrap.connect() while bootstrap.connecting(): bootstrap.connect() @@ -152,6 +157,29 @@ class KafkaClient(object): conn = self._conns[node_id] return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out() + def _conn_state_change(self, node_id, conn): + if conn.connecting(): + self._connecting.add(node_id) + + elif conn.connected(): + log.debug("Node %s connected", node_id) + if node_id in self._connecting: + self._connecting.remove(node_id) + if 'bootstrap' in self._conns and node_id != 'bootstrap': + bootstrap = self._conns.pop('bootstrap') + # XXX: make conn.close() require error to cause refresh + self._refresh_on_disconnects = False + bootstrap.close() + self._refresh_on_disconnects = True + + # Connection failures imply that our metadata is stale, so let's refresh + elif conn.state is ConnectionStates.DISCONNECTING: + if node_id in self._connecting: + self._connecting.remove(node_id) + if self._refresh_on_disconnects: + log.warning("Node %s connect failed -- refreshing metadata", node_id) + self.cluster.request_update() + def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" if node_id not in self._conns: @@ -160,32 +188,15 @@ class KafkaClient(object): log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) - host, port, afi = get_ip_port_afi(broker.host) + cb = functools.partial(self._conn_state_change, node_id) self._conns[node_id] = BrokerConnection(host, broker.port, afi, + state_change_callback=cb, **self.config) conn = self._conns[node_id] if conn.connected(): return True - conn.connect() - - if conn.connecting(): - if node_id not in self._connecting: - self._connecting.add(node_id) - - # Whether CONNECTED or DISCONNECTED, we need to remove from connecting - elif node_id in self._connecting: - self._connecting.remove(node_id) - - if conn.connected(): - log.debug("Node %s connected", node_id) - - # Connection failures imply that our metadata is stale, so let's refresh - elif conn.disconnected(): - log.warning("Node %s connect failed -- refreshing metadata", node_id) - self.cluster.request_update() - return conn.connected() def ready(self, node_id): @@ -597,84 +608,13 @@ class KafkaClient(object): if node_id is None: raise Errors.NoBrokersAvailable() - def connect(node_id): - timeout_at = time.time() + timeout - # brokers < 0.9 do not return any broker metadata if there are no topics - # so we're left with a single bootstrap connection - while not self.ready(node_id): - if time.time() >= timeout_at: - raise Errors.NodeNotReadyError(node_id) - time.sleep(0.025) - - # Monkeypatch the connection request timeout - # Generally this timeout should not get triggered - # but in case it does, we want it to be reasonably short - self._conns[node_id].config['request_timeout_ms'] = timeout * 1000 - - # kafka kills the connection when it doesnt recognize an API request - # so we can send a test request and then follow immediately with a - # vanilla MetadataRequest. If the server did not recognize the first - # request, both will be failed with a ConnectionError that wraps - # socket.error (32, 54, or 104) - import socket - from .protocol.admin import ListGroupsRequest - from .protocol.commit import ( - OffsetFetchRequest, GroupCoordinatorRequest) - from .protocol.metadata import MetadataRequest - - # Socket errors are logged as exceptions and can alarm users. Mute them - from logging import Filter - class ConnFilter(Filter): - def filter(self, record): - if record.funcName in ('recv', 'send'): - return False - return True - log_filter = ConnFilter() - - test_cases = [ - ('0.9', ListGroupsRequest[0]()), - ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), - ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), - ('0.8.0', MetadataRequest[0]([])), - ] - - logging.getLogger('kafka.conn').addFilter(log_filter) - for version, request in test_cases: - connect(node_id) - f = self.send(node_id, request) - time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes - metadata = self.send(node_id, MetadataRequest[0]([])) - self.poll(future=f) - self.poll(future=metadata) - - assert f.is_done, 'Future is not done? Please file bug report' - - if f.succeeded(): - log.info('Broker version identifed as %s', version) - break - - # Only enable strict checking to verify that we understand failure - # modes. For most users, the fact that the request failed should be - # enough to rule out a particular broker version. - if strict: - # If the socket flush hack did not work (which should force the - # connection to close and fail all pending requests), then we - # get a basic Request Timeout. Thisisn - if isinstance(f.exception, Errors.RequestTimedOutError): - pass - elif six.PY2: - assert isinstance(f.exception.args[0], socket.error) - assert f.exception.args[0].errno in (32, 54, 104) - else: - assert isinstance(f.exception.args[0], ConnectionError) - log.info("Broker is not v%s -- it did not recognize %s", - version, request.__class__.__name__) - else: - - raise Errors.UnrecognizedBrokerVersion() - - logging.getLogger('kafka.conn').removeFilter(log_filter) - self._conns[node_id].config['request_timeout_ms'] = self.config['request_timeout_ms'] + # We will be intentionally causing socket failures + # and should not trigger metadata refresh + self._refresh_on_disconnects = False + self._maybe_connect(node_id) + conn = self._conns[node_id] + version = conn.check_version() + self._refresh_on_disconnects = True return version def wakeup(self): diff --git a/kafka/conn.py b/kafka/conn.py index 92b2fd3..28c09d9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -31,6 +31,7 @@ DEFAULT_KAFKA_PORT = 9092 class ConnectionStates(object): + DISCONNECTING = '<disconnecting>' DISCONNECTED = '<disconnected>' CONNECTING = '<connecting>' CONNECTED = '<connected>' @@ -49,6 +50,7 @@ class BrokerConnection(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'api_version': (0, 8, 2), # default to most restrictive + 'state_change_callback': lambda conn: True, } def __init__(self, host, port, afi, **configs): @@ -87,6 +89,7 @@ class BrokerConnection(object): self._sock.setblocking(False) self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() + self.config['state_change_callback'](self) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -101,6 +104,7 @@ class BrokerConnection(object): if not ret or ret == errno.EISCONN: log.debug('%s: established TCP connection', str(self)) self.state = ConnectionStates.CONNECTED + self.config['state_change_callback'](self) # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems @@ -151,6 +155,9 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.ConnectionError. """ + if self.state is not ConnectionStates.DISCONNECTED: + self.state = ConnectionStates.DISCONNECTING + self.config['state_change_callback'](self) if self._sock: self._sock.close() self._sock = None @@ -165,6 +172,7 @@ class BrokerConnection(object): while self.in_flight_requests: ifr = self.in_flight_requests.popleft() ifr.future.failure(error) + self.config['state_change_callback'](self) def send(self, request, expect_response=True): """send request, return Future() @@ -352,6 +360,102 @@ class BrokerConnection(object): self._correlation_id = (self._correlation_id + 1) % 2**31 return self._correlation_id + def check_version(self, timeout=2, strict=False): + """Attempt to guess the broker version. This is a blocking call.""" + + # Monkeypatch the connection request timeout + # Generally this timeout should not get triggered + # but in case it does, we want it to be reasonably short + stashed_request_timeout_ms = self.config['request_timeout_ms'] + self.config['request_timeout_ms'] = timeout * 1000 + + # kafka kills the connection when it doesnt recognize an API request + # so we can send a test request and then follow immediately with a + # vanilla MetadataRequest. If the server did not recognize the first + # request, both will be failed with a ConnectionError that wraps + # socket.error (32, 54, or 104) + from .protocol.admin import ListGroupsRequest + from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest + from .protocol.metadata import MetadataRequest + + # Socket errors are logged as exceptions and can alarm users. Mute them + from logging import Filter + class ConnFilter(Filter): + def filter(self, record): + if record.funcName in ('recv', 'send'): + return False + return True + log_filter = ConnFilter() + log.addFilter(log_filter) + + test_cases = [ + ('0.9', ListGroupsRequest[0]()), + ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), + ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), + ('0.8.0', MetadataRequest[0]([])), + ] + + def connect(): + self.connect() + if self.connected(): + return + timeout_at = time.time() + timeout + while time.time() < timeout_at and self.connecting(): + if self.connect() is ConnectionStates.CONNECTED: + return + time.sleep(0.05) + raise Errors.NodeNotReadyError() + + for version, request in test_cases: + connect() + f = self.send(request) + # HACK: sleeping to wait for socket to send bytes + time.sleep(0.1) + # when broker receives an unrecognized request API + # it abruptly closes our socket. + # so we attempt to send a second request immediately + # that we believe it will definitely recognize (metadata) + # the attempt to write to a disconnected socket should + # immediately fail and allow us to infer that the prior + # request was unrecognized + metadata = self.send(MetadataRequest[0]([])) + + if self._sock: + self._sock.setblocking(True) + resp_1 = self.recv() + resp_2 = self.recv() + if self._sock: + self._sock.setblocking(False) + + assert f.is_done, 'Future is not done? Please file bug report' + + if f.succeeded(): + log.info('Broker version identifed as %s', version) + break + + # Only enable strict checking to verify that we understand failure + # modes. For most users, the fact that the request failed should be + # enough to rule out a particular broker version. + if strict: + # If the socket flush hack did not work (which should force the + # connection to close and fail all pending requests), then we + # get a basic Request Timeout. This is not ideal, but we'll deal + if isinstance(f.exception, Errors.RequestTimedOutError): + pass + elif six.PY2: + assert isinstance(f.exception.args[0], socket.error) + assert f.exception.args[0].errno in (32, 54, 104) + else: + assert isinstance(f.exception.args[0], ConnectionError) + log.info("Broker is not v%s -- it did not recognize %s", + version, request.__class__.__name__) + else: + raise Errors.UnrecognizedBrokerVersion() + + log.removeFilter(log_filter) + self.config['request_timeout_ms'] = stashed_request_timeout_ms + return version + def __repr__(self): return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) diff --git a/test/test_client_async.py b/test/test_client_async.py index 6da5394..ad76aad 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -1,5 +1,5 @@ -import time import socket +import time import pytest @@ -34,7 +34,10 @@ def test_bootstrap_servers(mocker, bootstrap, expected_hosts): def test_bootstrap_success(conn): conn.state = ConnectionStates.CONNECTED cli = KafkaClient() - conn.assert_called_once_with('localhost', 9092, socket.AF_INET, **cli.config) + args, kwargs = conn.call_args + assert args == ('localhost', 9092, socket.AF_INET) + kwargs.pop('state_change_callback') + assert kwargs == cli.config conn.connect.assert_called_with() conn.send.assert_called_once_with(MetadataRequest[0]([])) assert cli._bootstrap_fails == 0 @@ -44,7 +47,10 @@ def test_bootstrap_success(conn): def test_bootstrap_failure(conn): conn.state = ConnectionStates.DISCONNECTED cli = KafkaClient() - conn.assert_called_once_with('localhost', 9092, socket.AF_INET, **cli.config) + args, kwargs = conn.call_args + assert args == ('localhost', 9092, socket.AF_INET) + kwargs.pop('state_change_callback') + assert kwargs == cli.config conn.connect.assert_called_with() conn.close.assert_called_with() assert cli._bootstrap_fails == 1 @@ -83,26 +89,40 @@ def test_maybe_connect(conn): else: assert False, 'Exception not raised' + # New node_id creates a conn object assert 0 not in cli._conns conn.state = ConnectionStates.DISCONNECTED conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING) assert cli._maybe_connect(0) is False assert cli._conns[0] is conn - assert 0 in cli._connecting - conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTED) - assert cli._maybe_connect(0) is True - assert 0 not in cli._connecting + +def test_conn_state_change(mocker, conn): + cli = KafkaClient() + + node_id = 0 + conn.state = ConnectionStates.CONNECTING + cli._conn_state_change(node_id, conn) + assert node_id in cli._connecting + + conn.state = ConnectionStates.CONNECTED + cli._conn_state_change(node_id, conn) + assert node_id not in cli._connecting # Failure to connect should trigger metadata update assert cli.cluster._need_update is False - conn.state = ConnectionStates.CONNECTING - cli._connecting.add(0) - conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.DISCONNECTED) - assert cli._maybe_connect(0) is False - assert 0 not in cli._connecting + conn.state = ConnectionStates.DISCONNECTING + cli._conn_state_change(node_id, conn) + assert node_id not in cli._connecting assert cli.cluster._need_update is True + conn.state = ConnectionStates.CONNECTING + cli._conn_state_change(node_id, conn) + assert node_id in cli._connecting + conn.state = ConnectionStates.DISCONNECTING + cli._conn_state_change(node_id, conn) + assert node_id not in cli._connecting + def test_ready(mocker, conn): cli = KafkaClient() |