diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 361 |
1 files changed, 176 insertions, 185 deletions
diff --git a/kafka/client.py b/kafka/client.py index 9018bb4..14e71bb 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -2,19 +2,22 @@ import collections import copy import functools import logging -import select +import random import time +import six + import kafka.common -from kafka.common import (TopicAndPartition, BrokerMetadata, +from kafka.common import (TopicPartition, BrokerMetadata, UnknownError, ConnectionError, FailedPayloadsError, KafkaTimeoutError, KafkaUnavailableError, LeaderNotAvailableError, UnknownTopicOrPartitionError, NotLeaderForPartitionError, ReplicaNotAvailableError) -from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS +from kafka.conn import ( + collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS, + ConnectionStates) from kafka.protocol import KafkaProtocol -from kafka.util import kafka_bytestring log = logging.getLogger(__name__) @@ -31,20 +34,18 @@ class KafkaClient(object): timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, correlation_id=0): # We need one connection to bootstrap - self.client_id = kafka_bytestring(client_id) + self.client_id = client_id self.timeout = timeout self.hosts = collect_hosts(hosts) self.correlation_id = correlation_id - # create connections only when we need them - self.conns = {} + self._conns = {} self.brokers = {} # broker_id -> BrokerMetadata - self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata + self.topics_to_brokers = {} # TopicPartition -> BrokerMetadata self.topic_partitions = {} # topic -> partition -> PartitionMetadata self.load_metadata_for_topics() # bootstrap with all metadata - ################## # Private API # ################## @@ -52,14 +53,17 @@ class KafkaClient(object): def _get_conn(self, host, port): """Get or create a connection to a broker using host and port""" host_key = (host, port) - if host_key not in self.conns: - self.conns[host_key] = KafkaConnection( - host, - port, - timeout=self.timeout + if host_key not in self._conns: + self._conns[host_key] = BrokerConnection( + host, port, + request_timeout_ms=self.timeout * 1000, + client_id=self.client_id ) - return self.conns[host_key] + conn = self._conns[host_key] + while conn.connect() == ConnectionStates.CONNECTING: + pass + return conn def _get_leader_for_partition(self, topic, partition): """ @@ -73,7 +77,7 @@ class KafkaClient(object): no current leader """ - key = TopicAndPartition(topic, partition) + key = TopicPartition(topic, partition) # Use cached metadata if it is there if self.topics_to_brokers.get(key) is not None: @@ -91,21 +95,21 @@ class KafkaClient(object): raise UnknownTopicOrPartitionError(key) # If there's no leader for the partition, raise - meta = self.topic_partitions[topic][partition] - if meta.leader == -1: - raise LeaderNotAvailableError(meta) + leader = self.topic_partitions[topic][partition] + if leader == -1: + raise LeaderNotAvailableError((topic, partition)) # Otherwise return the BrokerMetadata - return self.brokers[meta.leader] + return self.brokers[leader] def _get_coordinator_for_group(self, group): """ Returns the coordinator broker for a consumer group. - ConsumerCoordinatorNotAvailableCode will be raised if the coordinator + GroupCoordinatorNotAvailableError will be raised if the coordinator does not currently exist for the group. - OffsetsLoadInProgressCode is raised if the coordinator is available + GroupLoadInProgressError is raised if the coordinator is available but is still loading offsets from the internal topic """ @@ -129,26 +133,40 @@ class KafkaClient(object): Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ - for (host, port) in self.hosts: - requestId = self._next_id() - log.debug('Request %s: %s', requestId, payloads) - try: - conn = self._get_conn(host, port) - request = encoder_fn(client_id=self.client_id, - correlation_id=requestId, - payloads=payloads) + hosts = set([(broker.host, broker.port) for broker in self.brokers.values()]) + hosts.update(self.hosts) + hosts = list(hosts) + random.shuffle(hosts) + + for (host, port) in hosts: + conn = self._get_conn(host, port) + if not conn.connected(): + log.warning("Skipping unconnected connection: %s", conn) + continue + request = encoder_fn(payloads=payloads) + future = conn.send(request) - conn.send(requestId, request) - response = conn.recv(requestId) - decoded = decoder_fn(response) - log.debug('Response %s: %s', requestId, decoded) - return decoded + # Block + while not future.is_done: + conn.recv() - except Exception: - log.exception('Error sending request [%s] to server %s:%s, ' - 'trying next server', requestId, host, port) + if future.failed(): + log.error("Request failed: %s", future.exception) + continue - raise KafkaUnavailableError('All servers failed to process request') + return decoder_fn(future.value) + + raise KafkaUnavailableError('All servers failed to process request: %s' % hosts) + + def _payloads_by_broker(self, payloads): + payloads_by_broker = collections.defaultdict(list) + for payload in payloads: + try: + leader = self._get_leader_for_partition(payload.topic, payload.partition) + except KafkaUnavailableError: + leader = None + payloads_by_broker[leader].append(payload) + return dict(payloads_by_broker) def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ @@ -178,97 +196,79 @@ class KafkaClient(object): # so we need to keep this so we can rebuild order before returning original_ordering = [(p.topic, p.partition) for p in payloads] - # Group the requests by topic+partition - brokers_for_payloads = [] - payloads_by_broker = collections.defaultdict(list) - - responses = {} - for payload in payloads: - try: - leader = self._get_leader_for_partition(payload.topic, - payload.partition) - payloads_by_broker[leader].append(payload) - brokers_for_payloads.append(leader) - except KafkaUnavailableError as e: - log.warning('KafkaUnavailableError attempting to send request ' - 'on topic %s partition %d', payload.topic, payload.partition) - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = FailedPayloadsError(payload) + # Connection errors generally mean stale metadata + # although sometimes it means incorrect api request + # Unfortunately there is no good way to tell the difference + # so we'll just reset metadata on all errors to be safe + refresh_metadata = False # For each broker, send the list of request payloads # and collect the responses and errors - broker_failures = [] + payloads_by_broker = self._payloads_by_broker(payloads) + responses = {} - # For each KafkaConnection keep the real socket so that we can use + def failed_payloads(payloads): + for payload in payloads: + topic_partition = (str(payload.topic), payload.partition) + responses[(topic_partition)] = FailedPayloadsError(payload) + + # For each BrokerConnection keep the real socket so that we can use # a select to perform unblocking I/O - connections_by_socket = {} - for broker, payloads in payloads_by_broker.items(): - requestId = self._next_id() - log.debug('Request %s to %s: %s', requestId, broker, payloads) - request = encoder_fn(client_id=self.client_id, - correlation_id=requestId, payloads=payloads) - - # Send the request, recv the response - try: - conn = self._get_conn(broker.host.decode('utf-8'), broker.port) - conn.send(requestId, request) + connections_by_future = {} + for broker, broker_payloads in six.iteritems(payloads_by_broker): + if broker is None: + failed_payloads(broker_payloads) + continue - except ConnectionError as e: - broker_failures.append(broker) - log.warning('ConnectionError attempting to send request %s ' - 'to server %s: %s', requestId, broker, e) + conn = self._get_conn(broker.host, broker.port) + conn.connect() + if not conn.connected(): + refresh_metadata = True + failed_payloads(broker_payloads) + continue - for payload in payloads: - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = FailedPayloadsError(payload) + request = encoder_fn(payloads=broker_payloads) + # decoder_fn=None signal that the server is expected to not + # send a response. This probably only applies to + # ProduceRequest w/ acks = 0 + expect_response = (decoder_fn is not None) + future = conn.send(request, expect_response=expect_response) - # No exception, try to get response - else: + if future.failed(): + refresh_metadata = True + failed_payloads(broker_payloads) + continue - # decoder_fn=None signal that the server is expected to not - # send a response. This probably only applies to - # ProduceRequest w/ acks = 0 - if decoder_fn is None: - log.debug('Request %s does not expect a response ' - '(skipping conn.recv)', requestId) - for payload in payloads: - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = None - continue - else: - connections_by_socket[conn.get_connected_socket()] = (conn, broker, requestId) + if not expect_response: + for payload in broker_payloads: + topic_partition = (str(payload.topic), payload.partition) + responses[topic_partition] = None + continue + + connections_by_future[future] = (conn, broker) conn = None - while connections_by_socket: - sockets = connections_by_socket.keys() - rlist, _, _ = select.select(sockets, [], [], None) - conn, broker, requestId = connections_by_socket.pop(rlist[0]) - try: - response = conn.recv(requestId) - except ConnectionError as e: - broker_failures.append(broker) - log.warning('ConnectionError attempting to receive a ' - 'response to request %s from server %s: %s', - requestId, broker, e) + while connections_by_future: + futures = list(connections_by_future.keys()) + for future in futures: - for payload in payloads_by_broker[broker]: - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = FailedPayloadsError(payload) + if not future.is_done: + conn, _ = connections_by_future[future] + conn.recv() + continue - else: - _resps = [] - for payload_response in decoder_fn(response): - topic_partition = (payload_response.topic, - payload_response.partition) - responses[topic_partition] = payload_response - _resps.append(payload_response) - log.debug('Response %s: %s', requestId, _resps) + _, broker = connections_by_future.pop(future) + if future.failed(): + refresh_metadata = True + failed_payloads(payloads_by_broker[broker]) - # Connection errors generally mean stale metadata - # although sometimes it means incorrect api request - # Unfortunately there is no good way to tell the difference - # so we'll just reset metadata on all errors to be safe - if broker_failures: + else: + for payload_response in decoder_fn(future.value): + topic_partition = (str(payload_response.topic), + payload_response.partition) + responses[topic_partition] = payload_response + + if refresh_metadata: self.reset_all_metadata() # Return responses in the same order as provided @@ -316,7 +316,7 @@ class KafkaClient(object): # Send the request, recv the response try: - conn = self._get_conn(broker.host.decode('utf-8'), broker.port) + conn = self._get_conn(broker.host, broker.port) conn.send(requestId, request) except ConnectionError as e: @@ -387,7 +387,7 @@ class KafkaClient(object): # Public API # ################# def close(self): - for conn in self.conns.values(): + for conn in self._conns.values(): conn.close() def copy(self): @@ -398,14 +398,17 @@ class KafkaClient(object): Note that the copied connections are not initialized, so reinit() must be called on the returned copy. """ + _conns = self._conns + self._conns = {} c = copy.deepcopy(self) - for key in c.conns: - c.conns[key] = self.conns[key].copy() + self._conns = _conns return c def reinit(self): - for conn in self.conns.values(): - conn.reinit() + for conn in self._conns.values(): + conn.close() + while conn.connect() == ConnectionStates.CONNECTING: + pass def reset_topic_metadata(self, *topics): for topic in topics: @@ -420,14 +423,12 @@ class KafkaClient(object): self.topic_partitions.clear() def has_metadata_for_topic(self, topic): - topic = kafka_bytestring(topic) return ( topic in self.topic_partitions and len(self.topic_partitions[topic]) > 0 ) def get_partition_ids_for_topic(self, topic): - topic = kafka_bytestring(topic) if topic not in self.topic_partitions: return [] @@ -454,89 +455,79 @@ class KafkaClient(object): time.sleep(.5) def load_metadata_for_topics(self, *topics): - """ - Fetch broker and topic-partition metadata from the server, - and update internal data: - broker list, topic/partition list, and topic/parition -> broker map + """Fetch broker and topic-partition metadata from the server. + + Updates internal data: broker list, topic/partition list, and + topic/parition -> broker map. This method should be called after + receiving any error. - This method should be called after receiving any error + Note: Exceptions *will not* be raised in a full refresh (i.e. no topic + list). In this case, error codes will be logged as errors. + Partition-level errors will also not be raised here (a single partition + w/o a leader, for example). Arguments: *topics (optional): If a list of topics is provided, - the metadata refresh will be limited to the specified topics only. - - Exceptions: - ---------- - If the broker is configured to not auto-create topics, - expect UnknownTopicOrPartitionError for topics that don't exist - - If the broker is configured to auto-create topics, - expect LeaderNotAvailableError for new topics - until partitions have been initialized. - - Exceptions *will not* be raised in a full refresh (i.e. no topic list) - In this case, error codes will be logged as errors - - Partition-level errors will also not be raised here - (a single partition w/o a leader, for example) + the metadata refresh will be limited to the specified topics + only. + + Raises: + UnknownTopicOrPartitionError: Raised for topics that do not exist, + unless the broker is configured to auto-create topics. + LeaderNotAvailableError: Raised for topics that do not exist yet, + when the broker is configured to auto-create topics. Retry + after a short backoff (topics/partitions are initializing). """ - topics = [kafka_bytestring(t) for t in topics] - if topics: - for topic in topics: - self.reset_topic_metadata(topic) + self.reset_topic_metadata(*topics) else: self.reset_all_metadata() resp = self.send_metadata_request(topics) log.debug('Updating broker metadata: %s', resp.brokers) - log.debug('Updating topic metadata: %s', resp.topics) + log.debug('Updating topic metadata: %s', [topic for _, topic, _ in resp.topics]) - self.brokers = dict([(broker.nodeId, broker) - for broker in resp.brokers]) - - for topic_metadata in resp.topics: - topic = topic_metadata.topic - partitions = topic_metadata.partitions + self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port)) + for nodeId, host, port in resp.brokers]) + for error, topic, partitions in resp.topics: # Errors expected for new topics - try: - kafka.common.check_error(topic_metadata) - except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e: - - # Raise if the topic was passed in explicitly - if topic in topics: - raise - - # Otherwise, just log a warning - log.error('Error loading topic metadata for %s: %s', topic, type(e)) - continue + if error: + error_type = kafka.common.kafka_errors.get(error, UnknownError) + if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError): + log.error('Error loading topic metadata for %s: %s (%s)', + topic, error_type, error) + if topic not in topics: + continue + raise error_type(topic) self.topic_partitions[topic] = {} - for partition_metadata in partitions: - partition = partition_metadata.partition - leader = partition_metadata.leader + for error, partition, leader, _, _ in partitions: - self.topic_partitions[topic][partition] = partition_metadata + self.topic_partitions[topic][partition] = leader # Populate topics_to_brokers dict - topic_part = TopicAndPartition(topic, partition) + topic_part = TopicPartition(topic, partition) # Check for partition errors - try: - kafka.common.check_error(partition_metadata) - - # If No Leader, topics_to_brokers topic_partition -> None - except LeaderNotAvailableError: - log.error('No leader for topic %s partition %d', topic, partition) - self.topics_to_brokers[topic_part] = None - continue - # If one of the replicas is unavailable -- ignore - # this error code is provided for admin purposes only - # we never talk to replicas, only the leader - except ReplicaNotAvailableError: - log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition) + if error: + error_type = kafka.common.kafka_errors.get(error, UnknownError) + + # If No Leader, topics_to_brokers topic_partition -> None + if error_type is LeaderNotAvailableError: + log.error('No leader for topic %s partition %d', topic, partition) + self.topics_to_brokers[topic_part] = None + continue + + # If one of the replicas is unavailable -- ignore + # this error code is provided for admin purposes only + # we never talk to replicas, only the leader + elif error_type is ReplicaNotAvailableError: + log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition) + + else: + raise error_type(topic_part) # If Known Broker, topic_partition -> BrokerMetadata if leader in self.brokers: |