summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py136
1 files changed, 38 insertions, 98 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index cfc89fc..0c22f90 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -1,6 +1,7 @@
from __future__ import absolute_import
import copy
+import functools
import heapq
import itertools
import logging
@@ -93,6 +94,7 @@ class KafkaClient(object):
self._metadata_refresh_in_progress = False
self._conns = {}
self._connecting = set()
+ self._refresh_on_disconnects = True
self._delayed_tasks = DelayedTaskQueue()
self._last_bootstrap = 0
self._bootstrap_fails = 0
@@ -117,7 +119,10 @@ class KafkaClient(object):
metadata_request = MetadataRequest[0]([])
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
- bootstrap = BrokerConnection(host, port, afi, **self.config)
+ cb = functools.partial(self._conn_state_change, 'bootstrap')
+ bootstrap = BrokerConnection(host, port, afi,
+ state_change_callback=cb,
+ **self.config)
bootstrap.connect()
while bootstrap.connecting():
bootstrap.connect()
@@ -152,6 +157,29 @@ class KafkaClient(object):
conn = self._conns[node_id]
return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out()
+ def _conn_state_change(self, node_id, conn):
+ if conn.connecting():
+ self._connecting.add(node_id)
+
+ elif conn.connected():
+ log.debug("Node %s connected", node_id)
+ if node_id in self._connecting:
+ self._connecting.remove(node_id)
+ if 'bootstrap' in self._conns and node_id != 'bootstrap':
+ bootstrap = self._conns.pop('bootstrap')
+ # XXX: make conn.close() require error to cause refresh
+ self._refresh_on_disconnects = False
+ bootstrap.close()
+ self._refresh_on_disconnects = True
+
+ # Connection failures imply that our metadata is stale, so let's refresh
+ elif conn.state is ConnectionStates.DISCONNECTING:
+ if node_id in self._connecting:
+ self._connecting.remove(node_id)
+ if self._refresh_on_disconnects:
+ log.warning("Node %s connect failed -- refreshing metadata", node_id)
+ self.cluster.request_update()
+
def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
if node_id not in self._conns:
@@ -160,32 +188,15 @@ class KafkaClient(object):
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
-
host, port, afi = get_ip_port_afi(broker.host)
+ cb = functools.partial(self._conn_state_change, node_id)
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
+ state_change_callback=cb,
**self.config)
conn = self._conns[node_id]
if conn.connected():
return True
-
conn.connect()
-
- if conn.connecting():
- if node_id not in self._connecting:
- self._connecting.add(node_id)
-
- # Whether CONNECTED or DISCONNECTED, we need to remove from connecting
- elif node_id in self._connecting:
- self._connecting.remove(node_id)
-
- if conn.connected():
- log.debug("Node %s connected", node_id)
-
- # Connection failures imply that our metadata is stale, so let's refresh
- elif conn.disconnected():
- log.warning("Node %s connect failed -- refreshing metadata", node_id)
- self.cluster.request_update()
-
return conn.connected()
def ready(self, node_id):
@@ -597,84 +608,13 @@ class KafkaClient(object):
if node_id is None:
raise Errors.NoBrokersAvailable()
- def connect(node_id):
- timeout_at = time.time() + timeout
- # brokers < 0.9 do not return any broker metadata if there are no topics
- # so we're left with a single bootstrap connection
- while not self.ready(node_id):
- if time.time() >= timeout_at:
- raise Errors.NodeNotReadyError(node_id)
- time.sleep(0.025)
-
- # Monkeypatch the connection request timeout
- # Generally this timeout should not get triggered
- # but in case it does, we want it to be reasonably short
- self._conns[node_id].config['request_timeout_ms'] = timeout * 1000
-
- # kafka kills the connection when it doesnt recognize an API request
- # so we can send a test request and then follow immediately with a
- # vanilla MetadataRequest. If the server did not recognize the first
- # request, both will be failed with a ConnectionError that wraps
- # socket.error (32, 54, or 104)
- import socket
- from .protocol.admin import ListGroupsRequest
- from .protocol.commit import (
- OffsetFetchRequest, GroupCoordinatorRequest)
- from .protocol.metadata import MetadataRequest
-
- # Socket errors are logged as exceptions and can alarm users. Mute them
- from logging import Filter
- class ConnFilter(Filter):
- def filter(self, record):
- if record.funcName in ('recv', 'send'):
- return False
- return True
- log_filter = ConnFilter()
-
- test_cases = [
- ('0.9', ListGroupsRequest[0]()),
- ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
- ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
- ('0.8.0', MetadataRequest[0]([])),
- ]
-
- logging.getLogger('kafka.conn').addFilter(log_filter)
- for version, request in test_cases:
- connect(node_id)
- f = self.send(node_id, request)
- time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes
- metadata = self.send(node_id, MetadataRequest[0]([]))
- self.poll(future=f)
- self.poll(future=metadata)
-
- assert f.is_done, 'Future is not done? Please file bug report'
-
- if f.succeeded():
- log.info('Broker version identifed as %s', version)
- break
-
- # Only enable strict checking to verify that we understand failure
- # modes. For most users, the fact that the request failed should be
- # enough to rule out a particular broker version.
- if strict:
- # If the socket flush hack did not work (which should force the
- # connection to close and fail all pending requests), then we
- # get a basic Request Timeout. Thisisn
- if isinstance(f.exception, Errors.RequestTimedOutError):
- pass
- elif six.PY2:
- assert isinstance(f.exception.args[0], socket.error)
- assert f.exception.args[0].errno in (32, 54, 104)
- else:
- assert isinstance(f.exception.args[0], ConnectionError)
- log.info("Broker is not v%s -- it did not recognize %s",
- version, request.__class__.__name__)
- else:
-
- raise Errors.UnrecognizedBrokerVersion()
-
- logging.getLogger('kafka.conn').removeFilter(log_filter)
- self._conns[node_id].config['request_timeout_ms'] = self.config['request_timeout_ms']
+ # We will be intentionally causing socket failures
+ # and should not trigger metadata refresh
+ self._refresh_on_disconnects = False
+ self._maybe_connect(node_id)
+ conn = self._conns[node_id]
+ version = conn.check_version()
+ self._refresh_on_disconnects = True
return version
def wakeup(self):