summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-08 17:00:47 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-08 17:00:47 -0700
commit810f08b7996a15e65cdd8af6c1a7167c28f94646 (patch)
tree88d8165f61e23344728aa5490395ad2e42076583
parent4323e5c21cb151728b7985e24a1ad44fd36fd9fb (diff)
parent897ca399917baa178390af78870fe4be90c051d5 (diff)
downloadkafka-python-810f08b7996a15e65cdd8af6c1a7167c28f94646.tar.gz
Merge pull request #639 from dpkp/conn_state_callback
Use KafkaClient callback to manage BrokerConnection state changes
-rw-r--r--kafka/client_async.py136
-rw-r--r--kafka/conn.py104
-rw-r--r--test/test_client_async.py44
3 files changed, 174 insertions, 110 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index cfc89fc..0c22f90 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -1,6 +1,7 @@
from __future__ import absolute_import
import copy
+import functools
import heapq
import itertools
import logging
@@ -93,6 +94,7 @@ class KafkaClient(object):
self._metadata_refresh_in_progress = False
self._conns = {}
self._connecting = set()
+ self._refresh_on_disconnects = True
self._delayed_tasks = DelayedTaskQueue()
self._last_bootstrap = 0
self._bootstrap_fails = 0
@@ -117,7 +119,10 @@ class KafkaClient(object):
metadata_request = MetadataRequest[0]([])
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
- bootstrap = BrokerConnection(host, port, afi, **self.config)
+ cb = functools.partial(self._conn_state_change, 'bootstrap')
+ bootstrap = BrokerConnection(host, port, afi,
+ state_change_callback=cb,
+ **self.config)
bootstrap.connect()
while bootstrap.connecting():
bootstrap.connect()
@@ -152,6 +157,29 @@ class KafkaClient(object):
conn = self._conns[node_id]
return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out()
+ def _conn_state_change(self, node_id, conn):
+ if conn.connecting():
+ self._connecting.add(node_id)
+
+ elif conn.connected():
+ log.debug("Node %s connected", node_id)
+ if node_id in self._connecting:
+ self._connecting.remove(node_id)
+ if 'bootstrap' in self._conns and node_id != 'bootstrap':
+ bootstrap = self._conns.pop('bootstrap')
+ # XXX: make conn.close() require error to cause refresh
+ self._refresh_on_disconnects = False
+ bootstrap.close()
+ self._refresh_on_disconnects = True
+
+ # Connection failures imply that our metadata is stale, so let's refresh
+ elif conn.state is ConnectionStates.DISCONNECTING:
+ if node_id in self._connecting:
+ self._connecting.remove(node_id)
+ if self._refresh_on_disconnects:
+ log.warning("Node %s connect failed -- refreshing metadata", node_id)
+ self.cluster.request_update()
+
def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
if node_id not in self._conns:
@@ -160,32 +188,15 @@ class KafkaClient(object):
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
-
host, port, afi = get_ip_port_afi(broker.host)
+ cb = functools.partial(self._conn_state_change, node_id)
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
+ state_change_callback=cb,
**self.config)
conn = self._conns[node_id]
if conn.connected():
return True
-
conn.connect()
-
- if conn.connecting():
- if node_id not in self._connecting:
- self._connecting.add(node_id)
-
- # Whether CONNECTED or DISCONNECTED, we need to remove from connecting
- elif node_id in self._connecting:
- self._connecting.remove(node_id)
-
- if conn.connected():
- log.debug("Node %s connected", node_id)
-
- # Connection failures imply that our metadata is stale, so let's refresh
- elif conn.disconnected():
- log.warning("Node %s connect failed -- refreshing metadata", node_id)
- self.cluster.request_update()
-
return conn.connected()
def ready(self, node_id):
@@ -597,84 +608,13 @@ class KafkaClient(object):
if node_id is None:
raise Errors.NoBrokersAvailable()
- def connect(node_id):
- timeout_at = time.time() + timeout
- # brokers < 0.9 do not return any broker metadata if there are no topics
- # so we're left with a single bootstrap connection
- while not self.ready(node_id):
- if time.time() >= timeout_at:
- raise Errors.NodeNotReadyError(node_id)
- time.sleep(0.025)
-
- # Monkeypatch the connection request timeout
- # Generally this timeout should not get triggered
- # but in case it does, we want it to be reasonably short
- self._conns[node_id].config['request_timeout_ms'] = timeout * 1000
-
- # kafka kills the connection when it doesnt recognize an API request
- # so we can send a test request and then follow immediately with a
- # vanilla MetadataRequest. If the server did not recognize the first
- # request, both will be failed with a ConnectionError that wraps
- # socket.error (32, 54, or 104)
- import socket
- from .protocol.admin import ListGroupsRequest
- from .protocol.commit import (
- OffsetFetchRequest, GroupCoordinatorRequest)
- from .protocol.metadata import MetadataRequest
-
- # Socket errors are logged as exceptions and can alarm users. Mute them
- from logging import Filter
- class ConnFilter(Filter):
- def filter(self, record):
- if record.funcName in ('recv', 'send'):
- return False
- return True
- log_filter = ConnFilter()
-
- test_cases = [
- ('0.9', ListGroupsRequest[0]()),
- ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
- ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
- ('0.8.0', MetadataRequest[0]([])),
- ]
-
- logging.getLogger('kafka.conn').addFilter(log_filter)
- for version, request in test_cases:
- connect(node_id)
- f = self.send(node_id, request)
- time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes
- metadata = self.send(node_id, MetadataRequest[0]([]))
- self.poll(future=f)
- self.poll(future=metadata)
-
- assert f.is_done, 'Future is not done? Please file bug report'
-
- if f.succeeded():
- log.info('Broker version identifed as %s', version)
- break
-
- # Only enable strict checking to verify that we understand failure
- # modes. For most users, the fact that the request failed should be
- # enough to rule out a particular broker version.
- if strict:
- # If the socket flush hack did not work (which should force the
- # connection to close and fail all pending requests), then we
- # get a basic Request Timeout. Thisisn
- if isinstance(f.exception, Errors.RequestTimedOutError):
- pass
- elif six.PY2:
- assert isinstance(f.exception.args[0], socket.error)
- assert f.exception.args[0].errno in (32, 54, 104)
- else:
- assert isinstance(f.exception.args[0], ConnectionError)
- log.info("Broker is not v%s -- it did not recognize %s",
- version, request.__class__.__name__)
- else:
-
- raise Errors.UnrecognizedBrokerVersion()
-
- logging.getLogger('kafka.conn').removeFilter(log_filter)
- self._conns[node_id].config['request_timeout_ms'] = self.config['request_timeout_ms']
+ # We will be intentionally causing socket failures
+ # and should not trigger metadata refresh
+ self._refresh_on_disconnects = False
+ self._maybe_connect(node_id)
+ conn = self._conns[node_id]
+ version = conn.check_version()
+ self._refresh_on_disconnects = True
return version
def wakeup(self):
diff --git a/kafka/conn.py b/kafka/conn.py
index 92b2fd3..28c09d9 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -31,6 +31,7 @@ DEFAULT_KAFKA_PORT = 9092
class ConnectionStates(object):
+ DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
CONNECTING = '<connecting>'
CONNECTED = '<connected>'
@@ -49,6 +50,7 @@ class BrokerConnection(object):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'api_version': (0, 8, 2), # default to most restrictive
+ 'state_change_callback': lambda conn: True,
}
def __init__(self, host, port, afi, **configs):
@@ -87,6 +89,7 @@ class BrokerConnection(object):
self._sock.setblocking(False)
self.state = ConnectionStates.CONNECTING
self.last_attempt = time.time()
+ self.config['state_change_callback'](self)
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -101,6 +104,7 @@ class BrokerConnection(object):
if not ret or ret == errno.EISCONN:
log.debug('%s: established TCP connection', str(self))
self.state = ConnectionStates.CONNECTED
+ self.config['state_change_callback'](self)
# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -151,6 +155,9 @@ class BrokerConnection(object):
will be failed with this exception.
Default: kafka.errors.ConnectionError.
"""
+ if self.state is not ConnectionStates.DISCONNECTED:
+ self.state = ConnectionStates.DISCONNECTING
+ self.config['state_change_callback'](self)
if self._sock:
self._sock.close()
self._sock = None
@@ -165,6 +172,7 @@ class BrokerConnection(object):
while self.in_flight_requests:
ifr = self.in_flight_requests.popleft()
ifr.future.failure(error)
+ self.config['state_change_callback'](self)
def send(self, request, expect_response=True):
"""send request, return Future()
@@ -352,6 +360,102 @@ class BrokerConnection(object):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id
+ def check_version(self, timeout=2, strict=False):
+ """Attempt to guess the broker version. This is a blocking call."""
+
+ # Monkeypatch the connection request timeout
+ # Generally this timeout should not get triggered
+ # but in case it does, we want it to be reasonably short
+ stashed_request_timeout_ms = self.config['request_timeout_ms']
+ self.config['request_timeout_ms'] = timeout * 1000
+
+ # kafka kills the connection when it doesnt recognize an API request
+ # so we can send a test request and then follow immediately with a
+ # vanilla MetadataRequest. If the server did not recognize the first
+ # request, both will be failed with a ConnectionError that wraps
+ # socket.error (32, 54, or 104)
+ from .protocol.admin import ListGroupsRequest
+ from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
+ from .protocol.metadata import MetadataRequest
+
+ # Socket errors are logged as exceptions and can alarm users. Mute them
+ from logging import Filter
+ class ConnFilter(Filter):
+ def filter(self, record):
+ if record.funcName in ('recv', 'send'):
+ return False
+ return True
+ log_filter = ConnFilter()
+ log.addFilter(log_filter)
+
+ test_cases = [
+ ('0.9', ListGroupsRequest[0]()),
+ ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
+ ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
+ ('0.8.0', MetadataRequest[0]([])),
+ ]
+
+ def connect():
+ self.connect()
+ if self.connected():
+ return
+ timeout_at = time.time() + timeout
+ while time.time() < timeout_at and self.connecting():
+ if self.connect() is ConnectionStates.CONNECTED:
+ return
+ time.sleep(0.05)
+ raise Errors.NodeNotReadyError()
+
+ for version, request in test_cases:
+ connect()
+ f = self.send(request)
+ # HACK: sleeping to wait for socket to send bytes
+ time.sleep(0.1)
+ # when broker receives an unrecognized request API
+ # it abruptly closes our socket.
+ # so we attempt to send a second request immediately
+ # that we believe it will definitely recognize (metadata)
+ # the attempt to write to a disconnected socket should
+ # immediately fail and allow us to infer that the prior
+ # request was unrecognized
+ metadata = self.send(MetadataRequest[0]([]))
+
+ if self._sock:
+ self._sock.setblocking(True)
+ resp_1 = self.recv()
+ resp_2 = self.recv()
+ if self._sock:
+ self._sock.setblocking(False)
+
+ assert f.is_done, 'Future is not done? Please file bug report'
+
+ if f.succeeded():
+ log.info('Broker version identifed as %s', version)
+ break
+
+ # Only enable strict checking to verify that we understand failure
+ # modes. For most users, the fact that the request failed should be
+ # enough to rule out a particular broker version.
+ if strict:
+ # If the socket flush hack did not work (which should force the
+ # connection to close and fail all pending requests), then we
+ # get a basic Request Timeout. This is not ideal, but we'll deal
+ if isinstance(f.exception, Errors.RequestTimedOutError):
+ pass
+ elif six.PY2:
+ assert isinstance(f.exception.args[0], socket.error)
+ assert f.exception.args[0].errno in (32, 54, 104)
+ else:
+ assert isinstance(f.exception.args[0], ConnectionError)
+ log.info("Broker is not v%s -- it did not recognize %s",
+ version, request.__class__.__name__)
+ else:
+ raise Errors.UnrecognizedBrokerVersion()
+
+ log.removeFilter(log_filter)
+ self.config['request_timeout_ms'] = stashed_request_timeout_ms
+ return version
+
def __repr__(self):
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 6da5394..ad76aad 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -1,5 +1,5 @@
-import time
import socket
+import time
import pytest
@@ -34,7 +34,10 @@ def test_bootstrap_servers(mocker, bootstrap, expected_hosts):
def test_bootstrap_success(conn):
conn.state = ConnectionStates.CONNECTED
cli = KafkaClient()
- conn.assert_called_once_with('localhost', 9092, socket.AF_INET, **cli.config)
+ args, kwargs = conn.call_args
+ assert args == ('localhost', 9092, socket.AF_INET)
+ kwargs.pop('state_change_callback')
+ assert kwargs == cli.config
conn.connect.assert_called_with()
conn.send.assert_called_once_with(MetadataRequest[0]([]))
assert cli._bootstrap_fails == 0
@@ -44,7 +47,10 @@ def test_bootstrap_success(conn):
def test_bootstrap_failure(conn):
conn.state = ConnectionStates.DISCONNECTED
cli = KafkaClient()
- conn.assert_called_once_with('localhost', 9092, socket.AF_INET, **cli.config)
+ args, kwargs = conn.call_args
+ assert args == ('localhost', 9092, socket.AF_INET)
+ kwargs.pop('state_change_callback')
+ assert kwargs == cli.config
conn.connect.assert_called_with()
conn.close.assert_called_with()
assert cli._bootstrap_fails == 1
@@ -83,26 +89,40 @@ def test_maybe_connect(conn):
else:
assert False, 'Exception not raised'
+ # New node_id creates a conn object
assert 0 not in cli._conns
conn.state = ConnectionStates.DISCONNECTED
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING)
assert cli._maybe_connect(0) is False
assert cli._conns[0] is conn
- assert 0 in cli._connecting
- conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTED)
- assert cli._maybe_connect(0) is True
- assert 0 not in cli._connecting
+
+def test_conn_state_change(mocker, conn):
+ cli = KafkaClient()
+
+ node_id = 0
+ conn.state = ConnectionStates.CONNECTING
+ cli._conn_state_change(node_id, conn)
+ assert node_id in cli._connecting
+
+ conn.state = ConnectionStates.CONNECTED
+ cli._conn_state_change(node_id, conn)
+ assert node_id not in cli._connecting
# Failure to connect should trigger metadata update
assert cli.cluster._need_update is False
- conn.state = ConnectionStates.CONNECTING
- cli._connecting.add(0)
- conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.DISCONNECTED)
- assert cli._maybe_connect(0) is False
- assert 0 not in cli._connecting
+ conn.state = ConnectionStates.DISCONNECTING
+ cli._conn_state_change(node_id, conn)
+ assert node_id not in cli._connecting
assert cli.cluster._need_update is True
+ conn.state = ConnectionStates.CONNECTING
+ cli._conn_state_change(node_id, conn)
+ assert node_id in cli._connecting
+ conn.state = ConnectionStates.DISCONNECTING
+ cli._conn_state_change(node_id, conn)
+ assert node_id not in cli._connecting
+
def test_ready(mocker, conn):
cli = KafkaClient()