summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-12-21 14:46:10 -0800
committerGitHub <noreply@github.com>2017-12-21 14:46:10 -0800
commitad024d1e897dbf16bd629fa63895bd7af4a8d959 (patch)
treef1993351b2c6487e8e623cefabf42ddf7477f666 /kafka/client_async.py
parent995664c7d407009a0a1030c7541848eb5ad51c97 (diff)
downloadkafka-python-ad024d1e897dbf16bd629fa63895bd7af4a8d959.tar.gz
KAFKA-3888 Use background thread to process consumer heartbeats (#1266)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py465
1 files changed, 182 insertions, 283 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 1350503..24162ad 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -3,8 +3,6 @@ from __future__ import absolute_import, division
import collections
import copy
import functools
-import heapq
-import itertools
import logging
import random
import threading
@@ -202,15 +200,17 @@ class KafkaClient(object):
self._conns = {}
self._connecting = set()
self._refresh_on_disconnects = True
- self._delayed_tasks = DelayedTaskQueue()
self._last_bootstrap = 0
self._bootstrap_fails = 0
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
self._wake_lock = threading.Lock()
+ self._lock = threading.RLock()
+
# when requests complete, they are transferred to this queue prior to
- # invocation.
+ # invocation. The purpose is to avoid invoking them while holding the
+ # lock above.
self._pending_completion = collections.deque()
self._selector.register(self._wake_r, selectors.EVENT_READ)
@@ -296,90 +296,92 @@ class KafkaClient(object):
return conn.disconnected() and not conn.blacked_out()
def _conn_state_change(self, node_id, conn):
- if conn.connecting():
- # SSL connections can enter this state 2x (second during Handshake)
- if node_id not in self._connecting:
- self._connecting.add(node_id)
- self._selector.register(conn._sock, selectors.EVENT_WRITE)
-
- elif conn.connected():
- log.debug("Node %s connected", node_id)
- if node_id in self._connecting:
- self._connecting.remove(node_id)
-
- try:
- self._selector.unregister(conn._sock)
- except KeyError:
- pass
- self._selector.register(conn._sock, selectors.EVENT_READ, conn)
- if self._sensors:
- self._sensors.connection_created.record()
-
- self._idle_expiry_manager.update(node_id)
-
- if 'bootstrap' in self._conns and node_id != 'bootstrap':
- bootstrap = self._conns.pop('bootstrap')
- # XXX: make conn.close() require error to cause refresh
- self._refresh_on_disconnects = False
- bootstrap.close()
- self._refresh_on_disconnects = True
+ with self._lock:
+ if conn.connecting():
+ # SSL connections can enter this state 2x (second during Handshake)
+ if node_id not in self._connecting:
+ self._connecting.add(node_id)
+ self._selector.register(conn._sock, selectors.EVENT_WRITE)
+
+ elif conn.connected():
+ log.debug("Node %s connected", node_id)
+ if node_id in self._connecting:
+ self._connecting.remove(node_id)
- # Connection failures imply that our metadata is stale, so let's refresh
- elif conn.state is ConnectionStates.DISCONNECTING:
- if node_id in self._connecting:
- self._connecting.remove(node_id)
- try:
- self._selector.unregister(conn._sock)
- except KeyError:
- pass
- if self._sensors:
- self._sensors.connection_closed.record()
+ try:
+ self._selector.unregister(conn._sock)
+ except KeyError:
+ pass
+ self._selector.register(conn._sock, selectors.EVENT_READ, conn)
+ if self._sensors:
+ self._sensors.connection_created.record()
+
+ self._idle_expiry_manager.update(node_id)
+
+ if 'bootstrap' in self._conns and node_id != 'bootstrap':
+ bootstrap = self._conns.pop('bootstrap')
+ # XXX: make conn.close() require error to cause refresh
+ self._refresh_on_disconnects = False
+ bootstrap.close()
+ self._refresh_on_disconnects = True
+
+ # Connection failures imply that our metadata is stale, so let's refresh
+ elif conn.state is ConnectionStates.DISCONNECTING:
+ if node_id in self._connecting:
+ self._connecting.remove(node_id)
+ try:
+ self._selector.unregister(conn._sock)
+ except KeyError:
+ pass
+ if self._sensors:
+ self._sensors.connection_closed.record()
- idle_disconnect = False
- if self._idle_expiry_manager.is_expired(node_id):
- idle_disconnect = True
- self._idle_expiry_manager.remove(node_id)
+ idle_disconnect = False
+ if self._idle_expiry_manager.is_expired(node_id):
+ idle_disconnect = True
+ self._idle_expiry_manager.remove(node_id)
- if self._refresh_on_disconnects and not self._closed and not idle_disconnect:
- log.warning("Node %s connection failed -- refreshing metadata", node_id)
- self.cluster.request_update()
+ if self._refresh_on_disconnects and not self._closed and not idle_disconnect:
+ log.warning("Node %s connection failed -- refreshing metadata", node_id)
+ self.cluster.request_update()
def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
- broker = self.cluster.broker_metadata(node_id)
- conn = self._conns.get(node_id)
-
- if conn is None:
- assert broker, 'Broker id %s not in current metadata' % node_id
-
- log.debug("Initiating connection to node %s at %s:%s",
- node_id, broker.host, broker.port)
- host, port, afi = get_ip_port_afi(broker.host)
- cb = functools.partial(self._conn_state_change, node_id)
- conn = BrokerConnection(host, broker.port, afi,
- state_change_callback=cb,
- node_id=node_id,
- **self.config)
- self._conns[node_id] = conn
-
- # Check if existing connection should be recreated because host/port changed
- elif conn.disconnected() and broker is not None:
- host, _, __ = get_ip_port_afi(broker.host)
- if conn.host != host or conn.port != broker.port:
- log.info("Broker metadata change detected for node %s"
- " from %s:%s to %s:%s", node_id, conn.host, conn.port,
- broker.host, broker.port)
-
- # Drop old connection object.
- # It will be recreated on next _maybe_connect
- self._conns.pop(node_id)
- return False
+ with self._lock:
+ broker = self.cluster.broker_metadata(node_id)
+ conn = self._conns.get(node_id)
- elif conn.connected():
- return True
+ if conn is None:
+ assert broker, 'Broker id %s not in current metadata' % node_id
+
+ log.debug("Initiating connection to node %s at %s:%s",
+ node_id, broker.host, broker.port)
+ host, port, afi = get_ip_port_afi(broker.host)
+ cb = functools.partial(self._conn_state_change, node_id)
+ conn = BrokerConnection(host, broker.port, afi,
+ state_change_callback=cb,
+ node_id=node_id,
+ **self.config)
+ self._conns[node_id] = conn
+
+ # Check if existing connection should be recreated because host/port changed
+ elif conn.disconnected() and broker is not None:
+ host, _, __ = get_ip_port_afi(broker.host)
+ if conn.host != host or conn.port != broker.port:
+ log.info("Broker metadata change detected for node %s"
+ " from %s:%s to %s:%s", node_id, conn.host, conn.port,
+ broker.host, broker.port)
+
+ # Drop old connection object.
+ # It will be recreated on next _maybe_connect
+ self._conns.pop(node_id)
+ return False
+
+ elif conn.connected():
+ return True
- conn.connect()
- return conn.connected()
+ conn.connect()
+ return conn.connected()
def ready(self, node_id, metadata_priority=True):
"""Check whether a node is connected and ok to send more requests.
@@ -397,9 +399,10 @@ class KafkaClient(object):
def connected(self, node_id):
"""Return True iff the node_id is connected."""
- if node_id not in self._conns:
- return False
- return self._conns[node_id].connected()
+ with self._lock:
+ if node_id not in self._conns:
+ return False
+ return self._conns[node_id].connected()
def close(self, node_id=None):
"""Close one or all broker connections.
@@ -407,18 +410,19 @@ class KafkaClient(object):
Arguments:
node_id (int, optional): the id of the node to close
"""
- if node_id is None:
- self._closed = True
- for conn in self._conns.values():
- conn.close()
- self._wake_r.close()
- self._wake_w.close()
- self._selector.close()
- elif node_id in self._conns:
- self._conns[node_id].close()
- else:
- log.warning("Node %s not found in current connection list; skipping", node_id)
- return
+ with self._lock:
+ if node_id is None:
+ self._closed = True
+ for conn in self._conns.values():
+ conn.close()
+ self._wake_r.close()
+ self._wake_w.close()
+ self._selector.close()
+ elif node_id in self._conns:
+ self._conns[node_id].close()
+ else:
+ log.warning("Node %s not found in current connection list; skipping", node_id)
+ return
def is_disconnected(self, node_id):
"""Check whether the node connection has been disconnected or failed.
@@ -434,9 +438,10 @@ class KafkaClient(object):
Returns:
bool: True iff the node exists and is disconnected
"""
- if node_id not in self._conns:
- return False
- return self._conns[node_id].disconnected()
+ with self._lock:
+ if node_id not in self._conns:
+ return False
+ return self._conns[node_id].disconnected()
def connection_delay(self, node_id):
"""
@@ -452,9 +457,10 @@ class KafkaClient(object):
Returns:
int: The number of milliseconds to wait.
"""
- if node_id not in self._conns:
- return 0
- return self._conns[node_id].connection_delay()
+ with self._lock:
+ if node_id not in self._conns:
+ return 0
+ return self._conns[node_id].connection_delay()
def is_ready(self, node_id, metadata_priority=True):
"""Check whether a node is ready to send more requests.
@@ -483,10 +489,11 @@ class KafkaClient(object):
return True
def _can_send_request(self, node_id):
- if node_id not in self._conns:
- return False
- conn = self._conns[node_id]
- return conn.connected() and conn.can_send_more()
+ with self._lock:
+ if node_id not in self._conns:
+ return False
+ conn = self._conns[node_id]
+ return conn.connected() and conn.can_send_more()
def send(self, node_id, request):
"""Send a request to a specific node.
@@ -501,12 +508,13 @@ class KafkaClient(object):
Returns:
Future: resolves to Response struct or Error
"""
- if not self._maybe_connect(node_id):
- return Future().failure(Errors.NodeNotReadyError(node_id))
+ with self._lock:
+ if not self._maybe_connect(node_id):
+ return Future().failure(Errors.NodeNotReadyError(node_id))
- return self._conns[node_id].send(request)
+ return self._conns[node_id].send(request)
- def poll(self, timeout_ms=None, future=None, delayed_tasks=True):
+ def poll(self, timeout_ms=None, future=None):
"""Try to read and write to sockets.
This method will also attempt to complete node connections, refresh
@@ -527,44 +535,34 @@ class KafkaClient(object):
elif timeout_ms is None:
timeout_ms = self.config['request_timeout_ms']
- responses = []
-
# Loop for futures, break after first loop if None
+ responses = []
while True:
-
- # Attempt to complete pending connections
- for node_id in list(self._connecting):
- self._maybe_connect(node_id)
-
- # Send a metadata request if needed
- metadata_timeout_ms = self._maybe_refresh_metadata()
-
- # Send scheduled tasks
- if delayed_tasks:
- for task, task_future in self._delayed_tasks.pop_ready():
- try:
- result = task()
- except Exception as e:
- log.error("Task %s failed: %s", task, e)
- task_future.failure(e)
- else:
- task_future.success(result)
-
- # If we got a future that is already done, don't block in _poll
- if future is not None and future.is_done:
- timeout = 0
- else:
- idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
- timeout = min(
- timeout_ms,
- metadata_timeout_ms,
- self._delayed_tasks.next_at() * 1000,
- idle_connection_timeout_ms,
- self.config['request_timeout_ms'])
- timeout = max(0, timeout / 1000.0) # avoid negative timeouts
-
- self._poll(timeout)
-
+ with self._lock:
+
+ # Attempt to complete pending connections
+ for node_id in list(self._connecting):
+ self._maybe_connect(node_id)
+
+ # Send a metadata request if needed
+ metadata_timeout_ms = self._maybe_refresh_metadata()
+
+ # If we got a future that is already done, don't block in _poll
+ if future is not None and future.is_done:
+ timeout = 0
+ else:
+ idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
+ timeout = min(
+ timeout_ms,
+ metadata_timeout_ms,
+ idle_connection_timeout_ms,
+ self.config['request_timeout_ms'])
+ timeout = max(0, timeout / 1000) # avoid negative timeouts
+
+ self._poll(timeout)
+
+ # called without the lock to avoid deadlock potential
+ # if handlers need to acquire locks
responses.extend(self._fire_pending_completed_requests())
# If all we had was a timeout (future is None) - only do one poll
@@ -646,12 +644,13 @@ class KafkaClient(object):
Returns:
int: pending in-flight requests for the node, or all nodes if None
"""
- if node_id is not None:
- if node_id not in self._conns:
- return 0
- return len(self._conns[node_id].in_flight_requests)
- else:
- return sum([len(conn.in_flight_requests) for conn in self._conns.values()])
+ with self._lock:
+ if node_id is not None:
+ if node_id not in self._conns:
+ return 0
+ return len(self._conns[node_id].in_flight_requests)
+ else:
+ return sum([len(conn.in_flight_requests) for conn in self._conns.values()])
def _fire_pending_completed_requests(self):
responses = []
@@ -672,37 +671,38 @@ class KafkaClient(object):
Returns:
node_id or None if no suitable node was found
"""
- nodes = [broker.nodeId for broker in self.cluster.brokers()]
- random.shuffle(nodes)
+ with self._lock:
+ nodes = [broker.nodeId for broker in self.cluster.brokers()]
+ random.shuffle(nodes)
+
+ inflight = float('inf')
+ found = None
+ for node_id in nodes:
+ conn = self._conns.get(node_id)
+ connected = conn is not None and conn.connected()
+ blacked_out = conn is not None and conn.blacked_out()
+ curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
+ if connected and curr_inflight == 0:
+ # if we find an established connection
+ # with no in-flight requests, we can stop right away
+ return node_id
+ elif not blacked_out and curr_inflight < inflight:
+ # otherwise if this is the best we have found so far, record that
+ inflight = curr_inflight
+ found = node_id
+
+ if found is not None:
+ return found
+
+ # some broker versions return an empty list of broker metadata
+ # if there are no topics created yet. the bootstrap process
+ # should detect this and keep a 'bootstrap' node alive until
+ # a non-bootstrap node is connected and non-empty broker
+ # metadata is available
+ elif 'bootstrap' in self._conns:
+ return 'bootstrap'
- inflight = float('inf')
- found = None
- for node_id in nodes:
- conn = self._conns.get(node_id)
- connected = conn is not None and conn.connected()
- blacked_out = conn is not None and conn.blacked_out()
- curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
- if connected and curr_inflight == 0:
- # if we find an established connection
- # with no in-flight requests, we can stop right away
- return node_id
- elif not blacked_out and curr_inflight < inflight:
- # otherwise if this is the best we have found so far, record that
- inflight = curr_inflight
- found = node_id
-
- if found is not None:
- return found
-
- # some broker versions return an empty list of broker metadata
- # if there are no topics created yet. the bootstrap process
- # should detect this and keep a 'bootstrap' node alive until
- # a non-bootstrap node is connected and non-empty broker
- # metadata is available
- elif 'bootstrap' in self._conns:
- return 'bootstrap'
-
- return None
+ return None
def set_topics(self, topics):
"""Set specific topics to track for metadata.
@@ -735,7 +735,7 @@ class KafkaClient(object):
self._topics.add(topic)
return self.cluster.request_update()
- # request metadata update on disconnect and timedout
+ # This method should be locked when running multi-threaded
def _maybe_refresh_metadata(self):
"""Send a metadata request if needed.
@@ -793,34 +793,6 @@ class KafkaClient(object):
# to let us know the selected connection might be usable again.
return float('inf')
- def schedule(self, task, at):
- """Schedule a new task to be executed at the given time.
-
- This is "best-effort" scheduling and should only be used for coarse
- synchronization. A task cannot be scheduled for multiple times
- simultaneously; any previously scheduled instance of the same task
- will be cancelled.
-
- Arguments:
- task (callable): task to be scheduled
- at (float or int): epoch seconds when task should run
-
- Returns:
- Future: resolves to result of task call, or exception if raised
- """
- return self._delayed_tasks.add(task, at)
-
- def unschedule(self, task):
- """Unschedule a task.
-
- This will remove all instances of the task from the task queue.
- This is a no-op if the task is not scheduled.
-
- Arguments:
- task (callable): task to be unscheduled
- """
- self._delayed_tasks.remove(task)
-
def check_version(self, node_id=None, timeout=2, strict=False):
"""Attempt to guess the version of a Kafka broker.
@@ -890,79 +862,6 @@ class KafkaClient(object):
self.close(node_id=conn_id)
-class DelayedTaskQueue(object):
- # see https://docs.python.org/2/library/heapq.html
- def __init__(self):
- self._tasks = [] # list of entries arranged in a heap
- self._task_map = {} # mapping of tasks to entries
- self._counter = itertools.count() # unique sequence count
-
- def add(self, task, at):
- """Add a task to run at a later time.
-
- Arguments:
- task: can be anything, but generally a callable
- at (float or int): epoch seconds to schedule task
-
- Returns:
- Future: a future that will be returned with the task when ready
- """
- if task in self._task_map:
- self.remove(task)
- count = next(self._counter)
- future = Future()
- entry = [at, count, (task, future)]
- self._task_map[task] = entry
- heapq.heappush(self._tasks, entry)
- return future
-
- def remove(self, task):
- """Remove a previously scheduled task.
-
- Raises:
- KeyError: if task is not found
- """
- entry = self._task_map.pop(task)
- task, future = entry[-1]
- future.failure(Errors.Cancelled)
- entry[-1] = 'REMOVED'
-
- def _drop_removed(self):
- while self._tasks and self._tasks[0][-1] is 'REMOVED':
- at, count, task = heapq.heappop(self._tasks)
-
- def _pop_next(self):
- self._drop_removed()
- if not self._tasks:
- raise KeyError('pop from an empty DelayedTaskQueue')
- _, _, maybe_task = heapq.heappop(self._tasks)
- if maybe_task is 'REMOVED':
- raise ValueError('popped a removed tasks from queue - bug')
- else:
- task, future = maybe_task
- del self._task_map[task]
- return (task, future)
-
- def next_at(self):
- """Number of seconds until next task is ready."""
- self._drop_removed()
- if not self._tasks:
- return float('inf')
- else:
- return max(self._tasks[0][0] - time.time(), 0)
-
- def pop_ready(self):
- """Pop and return a list of all ready (task, future) tuples"""
- ready_tasks = []
- while self._tasks and self._tasks[0][0] < time.time():
- try:
- task = self._pop_next()
- except KeyError:
- break
- ready_tasks.append(task)
- return ready_tasks
-
-
# OrderedDict requires python2.7+
try:
from collections import OrderedDict