diff options
Diffstat (limited to 'kafka/client_async.py')
| -rw-r--r-- | kafka/client_async.py | 465 | 
1 files changed, 182 insertions, 283 deletions
| diff --git a/kafka/client_async.py b/kafka/client_async.py index 1350503..24162ad 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -3,8 +3,6 @@ from __future__ import absolute_import, division  import collections  import copy  import functools -import heapq -import itertools  import logging  import random  import threading @@ -202,15 +200,17 @@ class KafkaClient(object):          self._conns = {}          self._connecting = set()          self._refresh_on_disconnects = True -        self._delayed_tasks = DelayedTaskQueue()          self._last_bootstrap = 0          self._bootstrap_fails = 0          self._wake_r, self._wake_w = socket.socketpair()          self._wake_r.setblocking(False)          self._wake_lock = threading.Lock() +        self._lock = threading.RLock() +          # when requests complete, they are transferred to this queue prior to -        # invocation. +        # invocation. The purpose is to avoid invoking them while holding the +        # lock above.          self._pending_completion = collections.deque()          self._selector.register(self._wake_r, selectors.EVENT_READ) @@ -296,90 +296,92 @@ class KafkaClient(object):          return conn.disconnected() and not conn.blacked_out()      def _conn_state_change(self, node_id, conn): -        if conn.connecting(): -            # SSL connections can enter this state 2x (second during Handshake) -            if node_id not in self._connecting: -                self._connecting.add(node_id) -                self._selector.register(conn._sock, selectors.EVENT_WRITE) - -        elif conn.connected(): -            log.debug("Node %s connected", node_id) -            if node_id in self._connecting: -                self._connecting.remove(node_id) - -            try: -                self._selector.unregister(conn._sock) -            except KeyError: -                pass -            self._selector.register(conn._sock, selectors.EVENT_READ, conn) -            if self._sensors: -                self._sensors.connection_created.record() - -            self._idle_expiry_manager.update(node_id) - -            if 'bootstrap' in self._conns and node_id != 'bootstrap': -                bootstrap = self._conns.pop('bootstrap') -                # XXX: make conn.close() require error to cause refresh -                self._refresh_on_disconnects = False -                bootstrap.close() -                self._refresh_on_disconnects = True +        with self._lock: +            if conn.connecting(): +                # SSL connections can enter this state 2x (second during Handshake) +                if node_id not in self._connecting: +                    self._connecting.add(node_id) +                    self._selector.register(conn._sock, selectors.EVENT_WRITE) + +            elif conn.connected(): +                log.debug("Node %s connected", node_id) +                if node_id in self._connecting: +                    self._connecting.remove(node_id) -        # Connection failures imply that our metadata is stale, so let's refresh -        elif conn.state is ConnectionStates.DISCONNECTING: -            if node_id in self._connecting: -                self._connecting.remove(node_id) -            try: -                self._selector.unregister(conn._sock) -            except KeyError: -                pass -            if self._sensors: -                self._sensors.connection_closed.record() +                try: +                    self._selector.unregister(conn._sock) +                except KeyError: +                    pass +                self._selector.register(conn._sock, selectors.EVENT_READ, conn) +                if self._sensors: +                    self._sensors.connection_created.record() + +                self._idle_expiry_manager.update(node_id) + +                if 'bootstrap' in self._conns and node_id != 'bootstrap': +                    bootstrap = self._conns.pop('bootstrap') +                    # XXX: make conn.close() require error to cause refresh +                    self._refresh_on_disconnects = False +                    bootstrap.close() +                    self._refresh_on_disconnects = True + +            # Connection failures imply that our metadata is stale, so let's refresh +            elif conn.state is ConnectionStates.DISCONNECTING: +                if node_id in self._connecting: +                    self._connecting.remove(node_id) +                try: +                    self._selector.unregister(conn._sock) +                except KeyError: +                    pass +                if self._sensors: +                    self._sensors.connection_closed.record() -            idle_disconnect = False -            if self._idle_expiry_manager.is_expired(node_id): -                idle_disconnect = True -            self._idle_expiry_manager.remove(node_id) +                idle_disconnect = False +                if self._idle_expiry_manager.is_expired(node_id): +                    idle_disconnect = True +                self._idle_expiry_manager.remove(node_id) -            if self._refresh_on_disconnects and not self._closed and not idle_disconnect: -                log.warning("Node %s connection failed -- refreshing metadata", node_id) -                self.cluster.request_update() +                if self._refresh_on_disconnects and not self._closed and not idle_disconnect: +                    log.warning("Node %s connection failed -- refreshing metadata", node_id) +                    self.cluster.request_update()      def _maybe_connect(self, node_id):          """Idempotent non-blocking connection attempt to the given node id.""" -        broker = self.cluster.broker_metadata(node_id) -        conn = self._conns.get(node_id) - -        if conn is None: -            assert broker, 'Broker id %s not in current metadata' % node_id - -            log.debug("Initiating connection to node %s at %s:%s", -                      node_id, broker.host, broker.port) -            host, port, afi = get_ip_port_afi(broker.host) -            cb = functools.partial(self._conn_state_change, node_id) -            conn = BrokerConnection(host, broker.port, afi, -                                    state_change_callback=cb, -                                    node_id=node_id, -                                    **self.config) -            self._conns[node_id] = conn - -        # Check if existing connection should be recreated because host/port changed -        elif conn.disconnected() and broker is not None: -            host, _, __ = get_ip_port_afi(broker.host) -            if conn.host != host or conn.port != broker.port: -                log.info("Broker metadata change detected for node %s" -                         " from %s:%s to %s:%s", node_id, conn.host, conn.port, -                         broker.host, broker.port) - -                # Drop old connection object. -                # It will be recreated on next _maybe_connect -                self._conns.pop(node_id) -                return False +        with self._lock: +            broker = self.cluster.broker_metadata(node_id) +            conn = self._conns.get(node_id) -        elif conn.connected(): -            return True +            if conn is None: +                assert broker, 'Broker id %s not in current metadata' % node_id + +                log.debug("Initiating connection to node %s at %s:%s", +                          node_id, broker.host, broker.port) +                host, port, afi = get_ip_port_afi(broker.host) +                cb = functools.partial(self._conn_state_change, node_id) +                conn = BrokerConnection(host, broker.port, afi, +                                        state_change_callback=cb, +                                        node_id=node_id, +                                        **self.config) +                self._conns[node_id] = conn + +            # Check if existing connection should be recreated because host/port changed +            elif conn.disconnected() and broker is not None: +                host, _, __ = get_ip_port_afi(broker.host) +                if conn.host != host or conn.port != broker.port: +                    log.info("Broker metadata change detected for node %s" +                             " from %s:%s to %s:%s", node_id, conn.host, conn.port, +                             broker.host, broker.port) + +                    # Drop old connection object. +                    # It will be recreated on next _maybe_connect +                    self._conns.pop(node_id) +                    return False + +            elif conn.connected(): +                return True -        conn.connect() -        return conn.connected() +            conn.connect() +            return conn.connected()      def ready(self, node_id, metadata_priority=True):          """Check whether a node is connected and ok to send more requests. @@ -397,9 +399,10 @@ class KafkaClient(object):      def connected(self, node_id):          """Return True iff the node_id is connected.""" -        if node_id not in self._conns: -            return False -        return self._conns[node_id].connected() +        with self._lock: +            if node_id not in self._conns: +                return False +            return self._conns[node_id].connected()      def close(self, node_id=None):          """Close one or all broker connections. @@ -407,18 +410,19 @@ class KafkaClient(object):          Arguments:              node_id (int, optional): the id of the node to close          """ -        if node_id is None: -            self._closed = True -            for conn in self._conns.values(): -                conn.close() -            self._wake_r.close() -            self._wake_w.close() -            self._selector.close() -        elif node_id in self._conns: -            self._conns[node_id].close() -        else: -            log.warning("Node %s not found in current connection list; skipping", node_id) -            return +        with self._lock: +            if node_id is None: +                self._closed = True +                for conn in self._conns.values(): +                    conn.close() +                self._wake_r.close() +                self._wake_w.close() +                self._selector.close() +            elif node_id in self._conns: +                self._conns[node_id].close() +            else: +                log.warning("Node %s not found in current connection list; skipping", node_id) +                return      def is_disconnected(self, node_id):          """Check whether the node connection has been disconnected or failed. @@ -434,9 +438,10 @@ class KafkaClient(object):          Returns:              bool: True iff the node exists and is disconnected          """ -        if node_id not in self._conns: -            return False -        return self._conns[node_id].disconnected() +        with self._lock: +            if node_id not in self._conns: +                return False +            return self._conns[node_id].disconnected()      def connection_delay(self, node_id):          """ @@ -452,9 +457,10 @@ class KafkaClient(object):          Returns:              int: The number of milliseconds to wait.          """ -        if node_id not in self._conns: -            return 0 -        return self._conns[node_id].connection_delay() +        with self._lock: +            if node_id not in self._conns: +                return 0 +            return self._conns[node_id].connection_delay()      def is_ready(self, node_id, metadata_priority=True):          """Check whether a node is ready to send more requests. @@ -483,10 +489,11 @@ class KafkaClient(object):          return True      def _can_send_request(self, node_id): -        if node_id not in self._conns: -            return False -        conn = self._conns[node_id] -        return conn.connected() and conn.can_send_more() +        with self._lock: +            if node_id not in self._conns: +                return False +            conn = self._conns[node_id] +            return conn.connected() and conn.can_send_more()      def send(self, node_id, request):          """Send a request to a specific node. @@ -501,12 +508,13 @@ class KafkaClient(object):          Returns:              Future: resolves to Response struct or Error          """ -        if not self._maybe_connect(node_id): -            return Future().failure(Errors.NodeNotReadyError(node_id)) +        with self._lock: +            if not self._maybe_connect(node_id): +                return Future().failure(Errors.NodeNotReadyError(node_id)) -        return self._conns[node_id].send(request) +            return self._conns[node_id].send(request) -    def poll(self, timeout_ms=None, future=None, delayed_tasks=True): +    def poll(self, timeout_ms=None, future=None):          """Try to read and write to sockets.          This method will also attempt to complete node connections, refresh @@ -527,44 +535,34 @@ class KafkaClient(object):          elif timeout_ms is None:              timeout_ms = self.config['request_timeout_ms'] -        responses = [] -          # Loop for futures, break after first loop if None +        responses = []          while True: - -            # Attempt to complete pending connections -            for node_id in list(self._connecting): -                self._maybe_connect(node_id) - -            # Send a metadata request if needed -            metadata_timeout_ms = self._maybe_refresh_metadata() - -            # Send scheduled tasks -            if delayed_tasks: -                for task, task_future in self._delayed_tasks.pop_ready(): -                    try: -                        result = task() -                    except Exception as e: -                        log.error("Task %s failed: %s", task, e) -                        task_future.failure(e) -                    else: -                        task_future.success(result) - -            # If we got a future that is already done, don't block in _poll -            if future is not None and future.is_done: -                timeout = 0 -            else: -                idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() -                timeout = min( -                    timeout_ms, -                    metadata_timeout_ms, -                    self._delayed_tasks.next_at() * 1000, -                    idle_connection_timeout_ms, -                    self.config['request_timeout_ms']) -                timeout = max(0, timeout / 1000.0)  # avoid negative timeouts - -            self._poll(timeout) - +            with self._lock: + +                # Attempt to complete pending connections +                for node_id in list(self._connecting): +                    self._maybe_connect(node_id) + +                # Send a metadata request if needed +                metadata_timeout_ms = self._maybe_refresh_metadata() + +                # If we got a future that is already done, don't block in _poll +                if future is not None and future.is_done: +                    timeout = 0 +                else: +                    idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() +                    timeout = min( +                        timeout_ms, +                        metadata_timeout_ms, +                        idle_connection_timeout_ms, +                        self.config['request_timeout_ms']) +                    timeout = max(0, timeout / 1000)  # avoid negative timeouts + +                self._poll(timeout) + +            # called without the lock to avoid deadlock potential +            # if handlers need to acquire locks              responses.extend(self._fire_pending_completed_requests())              # If all we had was a timeout (future is None) - only do one poll @@ -646,12 +644,13 @@ class KafkaClient(object):          Returns:              int: pending in-flight requests for the node, or all nodes if None          """ -        if node_id is not None: -            if node_id not in self._conns: -                return 0 -            return len(self._conns[node_id].in_flight_requests) -        else: -            return sum([len(conn.in_flight_requests) for conn in self._conns.values()]) +        with self._lock: +            if node_id is not None: +                if node_id not in self._conns: +                    return 0 +                return len(self._conns[node_id].in_flight_requests) +            else: +                return sum([len(conn.in_flight_requests) for conn in self._conns.values()])      def _fire_pending_completed_requests(self):          responses = [] @@ -672,37 +671,38 @@ class KafkaClient(object):          Returns:              node_id or None if no suitable node was found          """ -        nodes = [broker.nodeId for broker in self.cluster.brokers()] -        random.shuffle(nodes) +        with self._lock: +            nodes = [broker.nodeId for broker in self.cluster.brokers()] +            random.shuffle(nodes) + +            inflight = float('inf') +            found = None +            for node_id in nodes: +                conn = self._conns.get(node_id) +                connected = conn is not None and conn.connected() +                blacked_out = conn is not None and conn.blacked_out() +                curr_inflight = len(conn.in_flight_requests) if conn is not None else 0 +                if connected and curr_inflight == 0: +                    # if we find an established connection +                    # with no in-flight requests, we can stop right away +                    return node_id +                elif not blacked_out and curr_inflight < inflight: +                    # otherwise if this is the best we have found so far, record that +                    inflight = curr_inflight +                    found = node_id + +            if found is not None: +                return found + +            # some broker versions return an empty list of broker metadata +            # if there are no topics created yet. the bootstrap process +            # should detect this and keep a 'bootstrap' node alive until +            # a non-bootstrap node is connected and non-empty broker +            # metadata is available +            elif 'bootstrap' in self._conns: +                return 'bootstrap' -        inflight = float('inf') -        found = None -        for node_id in nodes: -            conn = self._conns.get(node_id) -            connected = conn is not None and conn.connected() -            blacked_out = conn is not None and conn.blacked_out() -            curr_inflight = len(conn.in_flight_requests) if conn is not None else 0 -            if connected and curr_inflight == 0: -                # if we find an established connection -                # with no in-flight requests, we can stop right away -                return node_id -            elif not blacked_out and curr_inflight < inflight: -                # otherwise if this is the best we have found so far, record that -                inflight = curr_inflight -                found = node_id - -        if found is not None: -            return found - -        # some broker versions return an empty list of broker metadata -        # if there are no topics created yet. the bootstrap process -        # should detect this and keep a 'bootstrap' node alive until -        # a non-bootstrap node is connected and non-empty broker -        # metadata is available -        elif 'bootstrap' in self._conns: -            return 'bootstrap' - -        return None +            return None      def set_topics(self, topics):          """Set specific topics to track for metadata. @@ -735,7 +735,7 @@ class KafkaClient(object):          self._topics.add(topic)          return self.cluster.request_update() -    # request metadata update on disconnect and timedout +    # This method should be locked when running multi-threaded      def _maybe_refresh_metadata(self):          """Send a metadata request if needed. @@ -793,34 +793,6 @@ class KafkaClient(object):          # to let us know the selected connection might be usable again.          return float('inf') -    def schedule(self, task, at): -        """Schedule a new task to be executed at the given time. - -        This is "best-effort" scheduling and should only be used for coarse -        synchronization. A task cannot be scheduled for multiple times -        simultaneously; any previously scheduled instance of the same task -        will be cancelled. - -        Arguments: -            task (callable): task to be scheduled -            at (float or int): epoch seconds when task should run - -        Returns: -            Future: resolves to result of task call, or exception if raised -        """ -        return self._delayed_tasks.add(task, at) - -    def unschedule(self, task): -        """Unschedule a task. - -        This will remove all instances of the task from the task queue. -        This is a no-op if the task is not scheduled. - -        Arguments: -            task (callable): task to be unscheduled -        """ -        self._delayed_tasks.remove(task) -      def check_version(self, node_id=None, timeout=2, strict=False):          """Attempt to guess the version of a Kafka broker. @@ -890,79 +862,6 @@ class KafkaClient(object):              self.close(node_id=conn_id) -class DelayedTaskQueue(object): -    # see https://docs.python.org/2/library/heapq.html -    def __init__(self): -        self._tasks = []  # list of entries arranged in a heap -        self._task_map = {}  # mapping of tasks to entries -        self._counter = itertools.count()  # unique sequence count - -    def add(self, task, at): -        """Add a task to run at a later time. - -        Arguments: -            task: can be anything, but generally a callable -            at (float or int): epoch seconds to schedule task - -        Returns: -            Future: a future that will be returned with the task when ready -        """ -        if task in self._task_map: -            self.remove(task) -        count = next(self._counter) -        future = Future() -        entry = [at, count, (task, future)] -        self._task_map[task] = entry -        heapq.heappush(self._tasks, entry) -        return future - -    def remove(self, task): -        """Remove a previously scheduled task. - -        Raises: -            KeyError: if task is not found -        """ -        entry = self._task_map.pop(task) -        task, future = entry[-1] -        future.failure(Errors.Cancelled) -        entry[-1] = 'REMOVED' - -    def _drop_removed(self): -        while self._tasks and self._tasks[0][-1] is 'REMOVED': -            at, count, task = heapq.heappop(self._tasks) - -    def _pop_next(self): -        self._drop_removed() -        if not self._tasks: -            raise KeyError('pop from an empty DelayedTaskQueue') -        _, _, maybe_task = heapq.heappop(self._tasks) -        if maybe_task is 'REMOVED': -            raise ValueError('popped a removed tasks from queue - bug') -        else: -            task, future = maybe_task -        del self._task_map[task] -        return (task, future) - -    def next_at(self): -        """Number of seconds until next task is ready.""" -        self._drop_removed() -        if not self._tasks: -            return float('inf') -        else: -            return max(self._tasks[0][0] - time.time(), 0) - -    def pop_ready(self): -        """Pop and return a list of all ready (task, future) tuples""" -        ready_tasks = [] -        while self._tasks and self._tasks[0][0] < time.time(): -            try: -                task = self._pop_next() -            except KeyError: -                break -            ready_tasks.append(task) -        return ready_tasks - -  # OrderedDict requires python2.7+  try:      from collections import OrderedDict | 
