diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 465 |
1 files changed, 182 insertions, 283 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 1350503..24162ad 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -3,8 +3,6 @@ from __future__ import absolute_import, division import collections import copy import functools -import heapq -import itertools import logging import random import threading @@ -202,15 +200,17 @@ class KafkaClient(object): self._conns = {} self._connecting = set() self._refresh_on_disconnects = True - self._delayed_tasks = DelayedTaskQueue() self._last_bootstrap = 0 self._bootstrap_fails = 0 self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) self._wake_lock = threading.Lock() + self._lock = threading.RLock() + # when requests complete, they are transferred to this queue prior to - # invocation. + # invocation. The purpose is to avoid invoking them while holding the + # lock above. self._pending_completion = collections.deque() self._selector.register(self._wake_r, selectors.EVENT_READ) @@ -296,90 +296,92 @@ class KafkaClient(object): return conn.disconnected() and not conn.blacked_out() def _conn_state_change(self, node_id, conn): - if conn.connecting(): - # SSL connections can enter this state 2x (second during Handshake) - if node_id not in self._connecting: - self._connecting.add(node_id) - self._selector.register(conn._sock, selectors.EVENT_WRITE) - - elif conn.connected(): - log.debug("Node %s connected", node_id) - if node_id in self._connecting: - self._connecting.remove(node_id) - - try: - self._selector.unregister(conn._sock) - except KeyError: - pass - self._selector.register(conn._sock, selectors.EVENT_READ, conn) - if self._sensors: - self._sensors.connection_created.record() - - self._idle_expiry_manager.update(node_id) - - if 'bootstrap' in self._conns and node_id != 'bootstrap': - bootstrap = self._conns.pop('bootstrap') - # XXX: make conn.close() require error to cause refresh - self._refresh_on_disconnects = False - bootstrap.close() - self._refresh_on_disconnects = True + with self._lock: + if conn.connecting(): + # SSL connections can enter this state 2x (second during Handshake) + if node_id not in self._connecting: + self._connecting.add(node_id) + self._selector.register(conn._sock, selectors.EVENT_WRITE) + + elif conn.connected(): + log.debug("Node %s connected", node_id) + if node_id in self._connecting: + self._connecting.remove(node_id) - # Connection failures imply that our metadata is stale, so let's refresh - elif conn.state is ConnectionStates.DISCONNECTING: - if node_id in self._connecting: - self._connecting.remove(node_id) - try: - self._selector.unregister(conn._sock) - except KeyError: - pass - if self._sensors: - self._sensors.connection_closed.record() + try: + self._selector.unregister(conn._sock) + except KeyError: + pass + self._selector.register(conn._sock, selectors.EVENT_READ, conn) + if self._sensors: + self._sensors.connection_created.record() + + self._idle_expiry_manager.update(node_id) + + if 'bootstrap' in self._conns and node_id != 'bootstrap': + bootstrap = self._conns.pop('bootstrap') + # XXX: make conn.close() require error to cause refresh + self._refresh_on_disconnects = False + bootstrap.close() + self._refresh_on_disconnects = True + + # Connection failures imply that our metadata is stale, so let's refresh + elif conn.state is ConnectionStates.DISCONNECTING: + if node_id in self._connecting: + self._connecting.remove(node_id) + try: + self._selector.unregister(conn._sock) + except KeyError: + pass + if self._sensors: + self._sensors.connection_closed.record() - idle_disconnect = False - if self._idle_expiry_manager.is_expired(node_id): - idle_disconnect = True - self._idle_expiry_manager.remove(node_id) + idle_disconnect = False + if self._idle_expiry_manager.is_expired(node_id): + idle_disconnect = True + self._idle_expiry_manager.remove(node_id) - if self._refresh_on_disconnects and not self._closed and not idle_disconnect: - log.warning("Node %s connection failed -- refreshing metadata", node_id) - self.cluster.request_update() + if self._refresh_on_disconnects and not self._closed and not idle_disconnect: + log.warning("Node %s connection failed -- refreshing metadata", node_id) + self.cluster.request_update() def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" - broker = self.cluster.broker_metadata(node_id) - conn = self._conns.get(node_id) - - if conn is None: - assert broker, 'Broker id %s not in current metadata' % node_id - - log.debug("Initiating connection to node %s at %s:%s", - node_id, broker.host, broker.port) - host, port, afi = get_ip_port_afi(broker.host) - cb = functools.partial(self._conn_state_change, node_id) - conn = BrokerConnection(host, broker.port, afi, - state_change_callback=cb, - node_id=node_id, - **self.config) - self._conns[node_id] = conn - - # Check if existing connection should be recreated because host/port changed - elif conn.disconnected() and broker is not None: - host, _, __ = get_ip_port_afi(broker.host) - if conn.host != host or conn.port != broker.port: - log.info("Broker metadata change detected for node %s" - " from %s:%s to %s:%s", node_id, conn.host, conn.port, - broker.host, broker.port) - - # Drop old connection object. - # It will be recreated on next _maybe_connect - self._conns.pop(node_id) - return False + with self._lock: + broker = self.cluster.broker_metadata(node_id) + conn = self._conns.get(node_id) - elif conn.connected(): - return True + if conn is None: + assert broker, 'Broker id %s not in current metadata' % node_id + + log.debug("Initiating connection to node %s at %s:%s", + node_id, broker.host, broker.port) + host, port, afi = get_ip_port_afi(broker.host) + cb = functools.partial(self._conn_state_change, node_id) + conn = BrokerConnection(host, broker.port, afi, + state_change_callback=cb, + node_id=node_id, + **self.config) + self._conns[node_id] = conn + + # Check if existing connection should be recreated because host/port changed + elif conn.disconnected() and broker is not None: + host, _, __ = get_ip_port_afi(broker.host) + if conn.host != host or conn.port != broker.port: + log.info("Broker metadata change detected for node %s" + " from %s:%s to %s:%s", node_id, conn.host, conn.port, + broker.host, broker.port) + + # Drop old connection object. + # It will be recreated on next _maybe_connect + self._conns.pop(node_id) + return False + + elif conn.connected(): + return True - conn.connect() - return conn.connected() + conn.connect() + return conn.connected() def ready(self, node_id, metadata_priority=True): """Check whether a node is connected and ok to send more requests. @@ -397,9 +399,10 @@ class KafkaClient(object): def connected(self, node_id): """Return True iff the node_id is connected.""" - if node_id not in self._conns: - return False - return self._conns[node_id].connected() + with self._lock: + if node_id not in self._conns: + return False + return self._conns[node_id].connected() def close(self, node_id=None): """Close one or all broker connections. @@ -407,18 +410,19 @@ class KafkaClient(object): Arguments: node_id (int, optional): the id of the node to close """ - if node_id is None: - self._closed = True - for conn in self._conns.values(): - conn.close() - self._wake_r.close() - self._wake_w.close() - self._selector.close() - elif node_id in self._conns: - self._conns[node_id].close() - else: - log.warning("Node %s not found in current connection list; skipping", node_id) - return + with self._lock: + if node_id is None: + self._closed = True + for conn in self._conns.values(): + conn.close() + self._wake_r.close() + self._wake_w.close() + self._selector.close() + elif node_id in self._conns: + self._conns[node_id].close() + else: + log.warning("Node %s not found in current connection list; skipping", node_id) + return def is_disconnected(self, node_id): """Check whether the node connection has been disconnected or failed. @@ -434,9 +438,10 @@ class KafkaClient(object): Returns: bool: True iff the node exists and is disconnected """ - if node_id not in self._conns: - return False - return self._conns[node_id].disconnected() + with self._lock: + if node_id not in self._conns: + return False + return self._conns[node_id].disconnected() def connection_delay(self, node_id): """ @@ -452,9 +457,10 @@ class KafkaClient(object): Returns: int: The number of milliseconds to wait. """ - if node_id not in self._conns: - return 0 - return self._conns[node_id].connection_delay() + with self._lock: + if node_id not in self._conns: + return 0 + return self._conns[node_id].connection_delay() def is_ready(self, node_id, metadata_priority=True): """Check whether a node is ready to send more requests. @@ -483,10 +489,11 @@ class KafkaClient(object): return True def _can_send_request(self, node_id): - if node_id not in self._conns: - return False - conn = self._conns[node_id] - return conn.connected() and conn.can_send_more() + with self._lock: + if node_id not in self._conns: + return False + conn = self._conns[node_id] + return conn.connected() and conn.can_send_more() def send(self, node_id, request): """Send a request to a specific node. @@ -501,12 +508,13 @@ class KafkaClient(object): Returns: Future: resolves to Response struct or Error """ - if not self._maybe_connect(node_id): - return Future().failure(Errors.NodeNotReadyError(node_id)) + with self._lock: + if not self._maybe_connect(node_id): + return Future().failure(Errors.NodeNotReadyError(node_id)) - return self._conns[node_id].send(request) + return self._conns[node_id].send(request) - def poll(self, timeout_ms=None, future=None, delayed_tasks=True): + def poll(self, timeout_ms=None, future=None): """Try to read and write to sockets. This method will also attempt to complete node connections, refresh @@ -527,44 +535,34 @@ class KafkaClient(object): elif timeout_ms is None: timeout_ms = self.config['request_timeout_ms'] - responses = [] - # Loop for futures, break after first loop if None + responses = [] while True: - - # Attempt to complete pending connections - for node_id in list(self._connecting): - self._maybe_connect(node_id) - - # Send a metadata request if needed - metadata_timeout_ms = self._maybe_refresh_metadata() - - # Send scheduled tasks - if delayed_tasks: - for task, task_future in self._delayed_tasks.pop_ready(): - try: - result = task() - except Exception as e: - log.error("Task %s failed: %s", task, e) - task_future.failure(e) - else: - task_future.success(result) - - # If we got a future that is already done, don't block in _poll - if future is not None and future.is_done: - timeout = 0 - else: - idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() - timeout = min( - timeout_ms, - metadata_timeout_ms, - self._delayed_tasks.next_at() * 1000, - idle_connection_timeout_ms, - self.config['request_timeout_ms']) - timeout = max(0, timeout / 1000.0) # avoid negative timeouts - - self._poll(timeout) - + with self._lock: + + # Attempt to complete pending connections + for node_id in list(self._connecting): + self._maybe_connect(node_id) + + # Send a metadata request if needed + metadata_timeout_ms = self._maybe_refresh_metadata() + + # If we got a future that is already done, don't block in _poll + if future is not None and future.is_done: + timeout = 0 + else: + idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() + timeout = min( + timeout_ms, + metadata_timeout_ms, + idle_connection_timeout_ms, + self.config['request_timeout_ms']) + timeout = max(0, timeout / 1000) # avoid negative timeouts + + self._poll(timeout) + + # called without the lock to avoid deadlock potential + # if handlers need to acquire locks responses.extend(self._fire_pending_completed_requests()) # If all we had was a timeout (future is None) - only do one poll @@ -646,12 +644,13 @@ class KafkaClient(object): Returns: int: pending in-flight requests for the node, or all nodes if None """ - if node_id is not None: - if node_id not in self._conns: - return 0 - return len(self._conns[node_id].in_flight_requests) - else: - return sum([len(conn.in_flight_requests) for conn in self._conns.values()]) + with self._lock: + if node_id is not None: + if node_id not in self._conns: + return 0 + return len(self._conns[node_id].in_flight_requests) + else: + return sum([len(conn.in_flight_requests) for conn in self._conns.values()]) def _fire_pending_completed_requests(self): responses = [] @@ -672,37 +671,38 @@ class KafkaClient(object): Returns: node_id or None if no suitable node was found """ - nodes = [broker.nodeId for broker in self.cluster.brokers()] - random.shuffle(nodes) + with self._lock: + nodes = [broker.nodeId for broker in self.cluster.brokers()] + random.shuffle(nodes) + + inflight = float('inf') + found = None + for node_id in nodes: + conn = self._conns.get(node_id) + connected = conn is not None and conn.connected() + blacked_out = conn is not None and conn.blacked_out() + curr_inflight = len(conn.in_flight_requests) if conn is not None else 0 + if connected and curr_inflight == 0: + # if we find an established connection + # with no in-flight requests, we can stop right away + return node_id + elif not blacked_out and curr_inflight < inflight: + # otherwise if this is the best we have found so far, record that + inflight = curr_inflight + found = node_id + + if found is not None: + return found + + # some broker versions return an empty list of broker metadata + # if there are no topics created yet. the bootstrap process + # should detect this and keep a 'bootstrap' node alive until + # a non-bootstrap node is connected and non-empty broker + # metadata is available + elif 'bootstrap' in self._conns: + return 'bootstrap' - inflight = float('inf') - found = None - for node_id in nodes: - conn = self._conns.get(node_id) - connected = conn is not None and conn.connected() - blacked_out = conn is not None and conn.blacked_out() - curr_inflight = len(conn.in_flight_requests) if conn is not None else 0 - if connected and curr_inflight == 0: - # if we find an established connection - # with no in-flight requests, we can stop right away - return node_id - elif not blacked_out and curr_inflight < inflight: - # otherwise if this is the best we have found so far, record that - inflight = curr_inflight - found = node_id - - if found is not None: - return found - - # some broker versions return an empty list of broker metadata - # if there are no topics created yet. the bootstrap process - # should detect this and keep a 'bootstrap' node alive until - # a non-bootstrap node is connected and non-empty broker - # metadata is available - elif 'bootstrap' in self._conns: - return 'bootstrap' - - return None + return None def set_topics(self, topics): """Set specific topics to track for metadata. @@ -735,7 +735,7 @@ class KafkaClient(object): self._topics.add(topic) return self.cluster.request_update() - # request metadata update on disconnect and timedout + # This method should be locked when running multi-threaded def _maybe_refresh_metadata(self): """Send a metadata request if needed. @@ -793,34 +793,6 @@ class KafkaClient(object): # to let us know the selected connection might be usable again. return float('inf') - def schedule(self, task, at): - """Schedule a new task to be executed at the given time. - - This is "best-effort" scheduling and should only be used for coarse - synchronization. A task cannot be scheduled for multiple times - simultaneously; any previously scheduled instance of the same task - will be cancelled. - - Arguments: - task (callable): task to be scheduled - at (float or int): epoch seconds when task should run - - Returns: - Future: resolves to result of task call, or exception if raised - """ - return self._delayed_tasks.add(task, at) - - def unschedule(self, task): - """Unschedule a task. - - This will remove all instances of the task from the task queue. - This is a no-op if the task is not scheduled. - - Arguments: - task (callable): task to be unscheduled - """ - self._delayed_tasks.remove(task) - def check_version(self, node_id=None, timeout=2, strict=False): """Attempt to guess the version of a Kafka broker. @@ -890,79 +862,6 @@ class KafkaClient(object): self.close(node_id=conn_id) -class DelayedTaskQueue(object): - # see https://docs.python.org/2/library/heapq.html - def __init__(self): - self._tasks = [] # list of entries arranged in a heap - self._task_map = {} # mapping of tasks to entries - self._counter = itertools.count() # unique sequence count - - def add(self, task, at): - """Add a task to run at a later time. - - Arguments: - task: can be anything, but generally a callable - at (float or int): epoch seconds to schedule task - - Returns: - Future: a future that will be returned with the task when ready - """ - if task in self._task_map: - self.remove(task) - count = next(self._counter) - future = Future() - entry = [at, count, (task, future)] - self._task_map[task] = entry - heapq.heappush(self._tasks, entry) - return future - - def remove(self, task): - """Remove a previously scheduled task. - - Raises: - KeyError: if task is not found - """ - entry = self._task_map.pop(task) - task, future = entry[-1] - future.failure(Errors.Cancelled) - entry[-1] = 'REMOVED' - - def _drop_removed(self): - while self._tasks and self._tasks[0][-1] is 'REMOVED': - at, count, task = heapq.heappop(self._tasks) - - def _pop_next(self): - self._drop_removed() - if not self._tasks: - raise KeyError('pop from an empty DelayedTaskQueue') - _, _, maybe_task = heapq.heappop(self._tasks) - if maybe_task is 'REMOVED': - raise ValueError('popped a removed tasks from queue - bug') - else: - task, future = maybe_task - del self._task_map[task] - return (task, future) - - def next_at(self): - """Number of seconds until next task is ready.""" - self._drop_removed() - if not self._tasks: - return float('inf') - else: - return max(self._tasks[0][0] - time.time(), 0) - - def pop_ready(self): - """Pop and return a list of all ready (task, future) tuples""" - ready_tasks = [] - while self._tasks and self._tasks[0][0] < time.time(): - try: - task = self._pop_next() - except KeyError: - break - ready_tasks.append(task) - return ready_tasks - - # OrderedDict requires python2.7+ try: from collections import OrderedDict |