diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client_async.py | 136 | ||||
-rw-r--r-- | kafka/conn.py | 104 |
2 files changed, 142 insertions, 98 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index cfc89fc..0c22f90 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import copy +import functools import heapq import itertools import logging @@ -93,6 +94,7 @@ class KafkaClient(object): self._metadata_refresh_in_progress = False self._conns = {} self._connecting = set() + self._refresh_on_disconnects = True self._delayed_tasks = DelayedTaskQueue() self._last_bootstrap = 0 self._bootstrap_fails = 0 @@ -117,7 +119,10 @@ class KafkaClient(object): metadata_request = MetadataRequest[0]([]) for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) - bootstrap = BrokerConnection(host, port, afi, **self.config) + cb = functools.partial(self._conn_state_change, 'bootstrap') + bootstrap = BrokerConnection(host, port, afi, + state_change_callback=cb, + **self.config) bootstrap.connect() while bootstrap.connecting(): bootstrap.connect() @@ -152,6 +157,29 @@ class KafkaClient(object): conn = self._conns[node_id] return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out() + def _conn_state_change(self, node_id, conn): + if conn.connecting(): + self._connecting.add(node_id) + + elif conn.connected(): + log.debug("Node %s connected", node_id) + if node_id in self._connecting: + self._connecting.remove(node_id) + if 'bootstrap' in self._conns and node_id != 'bootstrap': + bootstrap = self._conns.pop('bootstrap') + # XXX: make conn.close() require error to cause refresh + self._refresh_on_disconnects = False + bootstrap.close() + self._refresh_on_disconnects = True + + # Connection failures imply that our metadata is stale, so let's refresh + elif conn.state is ConnectionStates.DISCONNECTING: + if node_id in self._connecting: + self._connecting.remove(node_id) + if self._refresh_on_disconnects: + log.warning("Node %s connect failed -- refreshing metadata", node_id) + self.cluster.request_update() + def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" if node_id not in self._conns: @@ -160,32 +188,15 @@ class KafkaClient(object): log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) - host, port, afi = get_ip_port_afi(broker.host) + cb = functools.partial(self._conn_state_change, node_id) self._conns[node_id] = BrokerConnection(host, broker.port, afi, + state_change_callback=cb, **self.config) conn = self._conns[node_id] if conn.connected(): return True - conn.connect() - - if conn.connecting(): - if node_id not in self._connecting: - self._connecting.add(node_id) - - # Whether CONNECTED or DISCONNECTED, we need to remove from connecting - elif node_id in self._connecting: - self._connecting.remove(node_id) - - if conn.connected(): - log.debug("Node %s connected", node_id) - - # Connection failures imply that our metadata is stale, so let's refresh - elif conn.disconnected(): - log.warning("Node %s connect failed -- refreshing metadata", node_id) - self.cluster.request_update() - return conn.connected() def ready(self, node_id): @@ -597,84 +608,13 @@ class KafkaClient(object): if node_id is None: raise Errors.NoBrokersAvailable() - def connect(node_id): - timeout_at = time.time() + timeout - # brokers < 0.9 do not return any broker metadata if there are no topics - # so we're left with a single bootstrap connection - while not self.ready(node_id): - if time.time() >= timeout_at: - raise Errors.NodeNotReadyError(node_id) - time.sleep(0.025) - - # Monkeypatch the connection request timeout - # Generally this timeout should not get triggered - # but in case it does, we want it to be reasonably short - self._conns[node_id].config['request_timeout_ms'] = timeout * 1000 - - # kafka kills the connection when it doesnt recognize an API request - # so we can send a test request and then follow immediately with a - # vanilla MetadataRequest. If the server did not recognize the first - # request, both will be failed with a ConnectionError that wraps - # socket.error (32, 54, or 104) - import socket - from .protocol.admin import ListGroupsRequest - from .protocol.commit import ( - OffsetFetchRequest, GroupCoordinatorRequest) - from .protocol.metadata import MetadataRequest - - # Socket errors are logged as exceptions and can alarm users. Mute them - from logging import Filter - class ConnFilter(Filter): - def filter(self, record): - if record.funcName in ('recv', 'send'): - return False - return True - log_filter = ConnFilter() - - test_cases = [ - ('0.9', ListGroupsRequest[0]()), - ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), - ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), - ('0.8.0', MetadataRequest[0]([])), - ] - - logging.getLogger('kafka.conn').addFilter(log_filter) - for version, request in test_cases: - connect(node_id) - f = self.send(node_id, request) - time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes - metadata = self.send(node_id, MetadataRequest[0]([])) - self.poll(future=f) - self.poll(future=metadata) - - assert f.is_done, 'Future is not done? Please file bug report' - - if f.succeeded(): - log.info('Broker version identifed as %s', version) - break - - # Only enable strict checking to verify that we understand failure - # modes. For most users, the fact that the request failed should be - # enough to rule out a particular broker version. - if strict: - # If the socket flush hack did not work (which should force the - # connection to close and fail all pending requests), then we - # get a basic Request Timeout. Thisisn - if isinstance(f.exception, Errors.RequestTimedOutError): - pass - elif six.PY2: - assert isinstance(f.exception.args[0], socket.error) - assert f.exception.args[0].errno in (32, 54, 104) - else: - assert isinstance(f.exception.args[0], ConnectionError) - log.info("Broker is not v%s -- it did not recognize %s", - version, request.__class__.__name__) - else: - - raise Errors.UnrecognizedBrokerVersion() - - logging.getLogger('kafka.conn').removeFilter(log_filter) - self._conns[node_id].config['request_timeout_ms'] = self.config['request_timeout_ms'] + # We will be intentionally causing socket failures + # and should not trigger metadata refresh + self._refresh_on_disconnects = False + self._maybe_connect(node_id) + conn = self._conns[node_id] + version = conn.check_version() + self._refresh_on_disconnects = True return version def wakeup(self): diff --git a/kafka/conn.py b/kafka/conn.py index 92b2fd3..28c09d9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -31,6 +31,7 @@ DEFAULT_KAFKA_PORT = 9092 class ConnectionStates(object): + DISCONNECTING = '<disconnecting>' DISCONNECTED = '<disconnected>' CONNECTING = '<connecting>' CONNECTED = '<connected>' @@ -49,6 +50,7 @@ class BrokerConnection(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'api_version': (0, 8, 2), # default to most restrictive + 'state_change_callback': lambda conn: True, } def __init__(self, host, port, afi, **configs): @@ -87,6 +89,7 @@ class BrokerConnection(object): self._sock.setblocking(False) self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() + self.config['state_change_callback'](self) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -101,6 +104,7 @@ class BrokerConnection(object): if not ret or ret == errno.EISCONN: log.debug('%s: established TCP connection', str(self)) self.state = ConnectionStates.CONNECTED + self.config['state_change_callback'](self) # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems @@ -151,6 +155,9 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.ConnectionError. """ + if self.state is not ConnectionStates.DISCONNECTED: + self.state = ConnectionStates.DISCONNECTING + self.config['state_change_callback'](self) if self._sock: self._sock.close() self._sock = None @@ -165,6 +172,7 @@ class BrokerConnection(object): while self.in_flight_requests: ifr = self.in_flight_requests.popleft() ifr.future.failure(error) + self.config['state_change_callback'](self) def send(self, request, expect_response=True): """send request, return Future() @@ -352,6 +360,102 @@ class BrokerConnection(object): self._correlation_id = (self._correlation_id + 1) % 2**31 return self._correlation_id + def check_version(self, timeout=2, strict=False): + """Attempt to guess the broker version. This is a blocking call.""" + + # Monkeypatch the connection request timeout + # Generally this timeout should not get triggered + # but in case it does, we want it to be reasonably short + stashed_request_timeout_ms = self.config['request_timeout_ms'] + self.config['request_timeout_ms'] = timeout * 1000 + + # kafka kills the connection when it doesnt recognize an API request + # so we can send a test request and then follow immediately with a + # vanilla MetadataRequest. If the server did not recognize the first + # request, both will be failed with a ConnectionError that wraps + # socket.error (32, 54, or 104) + from .protocol.admin import ListGroupsRequest + from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest + from .protocol.metadata import MetadataRequest + + # Socket errors are logged as exceptions and can alarm users. Mute them + from logging import Filter + class ConnFilter(Filter): + def filter(self, record): + if record.funcName in ('recv', 'send'): + return False + return True + log_filter = ConnFilter() + log.addFilter(log_filter) + + test_cases = [ + ('0.9', ListGroupsRequest[0]()), + ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), + ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), + ('0.8.0', MetadataRequest[0]([])), + ] + + def connect(): + self.connect() + if self.connected(): + return + timeout_at = time.time() + timeout + while time.time() < timeout_at and self.connecting(): + if self.connect() is ConnectionStates.CONNECTED: + return + time.sleep(0.05) + raise Errors.NodeNotReadyError() + + for version, request in test_cases: + connect() + f = self.send(request) + # HACK: sleeping to wait for socket to send bytes + time.sleep(0.1) + # when broker receives an unrecognized request API + # it abruptly closes our socket. + # so we attempt to send a second request immediately + # that we believe it will definitely recognize (metadata) + # the attempt to write to a disconnected socket should + # immediately fail and allow us to infer that the prior + # request was unrecognized + metadata = self.send(MetadataRequest[0]([])) + + if self._sock: + self._sock.setblocking(True) + resp_1 = self.recv() + resp_2 = self.recv() + if self._sock: + self._sock.setblocking(False) + + assert f.is_done, 'Future is not done? Please file bug report' + + if f.succeeded(): + log.info('Broker version identifed as %s', version) + break + + # Only enable strict checking to verify that we understand failure + # modes. For most users, the fact that the request failed should be + # enough to rule out a particular broker version. + if strict: + # If the socket flush hack did not work (which should force the + # connection to close and fail all pending requests), then we + # get a basic Request Timeout. This is not ideal, but we'll deal + if isinstance(f.exception, Errors.RequestTimedOutError): + pass + elif six.PY2: + assert isinstance(f.exception.args[0], socket.error) + assert f.exception.args[0].errno in (32, 54, 104) + else: + assert isinstance(f.exception.args[0], ConnectionError) + log.info("Broker is not v%s -- it did not recognize %s", + version, request.__class__.__name__) + else: + raise Errors.UnrecognizedBrokerVersion() + + log.removeFilter(log_filter) + self.config['request_timeout_ms'] = stashed_request_timeout_ms + return version + def __repr__(self): return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) |