diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 279 |
1 files changed, 131 insertions, 148 deletions
diff --git a/kafka/client.py b/kafka/client.py index 9018bb4..cb60d98 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -2,17 +2,20 @@ import collections import copy import functools import logging +import random import select import time +import six + import kafka.common -from kafka.common import (TopicAndPartition, BrokerMetadata, +from kafka.common import (TopicAndPartition, BrokerMetadata, UnknownError, ConnectionError, FailedPayloadsError, KafkaTimeoutError, KafkaUnavailableError, LeaderNotAvailableError, UnknownTopicOrPartitionError, NotLeaderForPartitionError, ReplicaNotAvailableError) -from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS +from kafka.conn import collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol from kafka.util import kafka_bytestring @@ -31,13 +34,12 @@ class KafkaClient(object): timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, correlation_id=0): # We need one connection to bootstrap - self.client_id = kafka_bytestring(client_id) + self.client_id = client_id self.timeout = timeout self.hosts = collect_hosts(hosts) self.correlation_id = correlation_id - # create connections only when we need them - self.conns = {} + self._conns = {} self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata self.topic_partitions = {} # topic -> partition -> PartitionMetadata @@ -52,14 +54,14 @@ class KafkaClient(object): def _get_conn(self, host, port): """Get or create a connection to a broker using host and port""" host_key = (host, port) - if host_key not in self.conns: - self.conns[host_key] = KafkaConnection( - host, - port, - timeout=self.timeout + if host_key not in self._conns: + self._conns[host_key] = BrokerConnection( + host, port, + timeout=self.timeout, + client_id=self.client_id ) - return self.conns[host_key] + return self._conns[host_key] def _get_leader_for_partition(self, topic, partition): """ @@ -91,12 +93,12 @@ class KafkaClient(object): raise UnknownTopicOrPartitionError(key) # If there's no leader for the partition, raise - meta = self.topic_partitions[topic][partition] - if meta.leader == -1: - raise LeaderNotAvailableError(meta) + leader = self.topic_partitions[topic][partition] + if leader == -1: + raise LeaderNotAvailableError((topic, partition)) # Otherwise return the BrokerMetadata - return self.brokers[meta.leader] + return self.brokers[leader] def _get_coordinator_for_group(self, group): """ @@ -129,27 +131,35 @@ class KafkaClient(object): Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ - for (host, port) in self.hosts: - requestId = self._next_id() - log.debug('Request %s: %s', requestId, payloads) - try: - conn = self._get_conn(host, port) - request = encoder_fn(client_id=self.client_id, - correlation_id=requestId, - payloads=payloads) - - conn.send(requestId, request) - response = conn.recv(requestId) + hosts = set([(broker.host, broker.port) for broker in self.brokers.values()]) + hosts.update(self.hosts) + hosts = list(hosts) + random.shuffle(hosts) + + for (host, port) in hosts: + conn = self._get_conn(host, port) + request = encoder_fn(payloads=payloads) + correlation_id = conn.send(request) + if correlation_id is None: + continue + response = conn.recv() + if response is not None: decoded = decoder_fn(response) - log.debug('Response %s: %s', requestId, decoded) + log.debug('Response %s: %s', correlation_id, decoded) return decoded - except Exception: - log.exception('Error sending request [%s] to server %s:%s, ' - 'trying next server', requestId, host, port) - raise KafkaUnavailableError('All servers failed to process request') + def _payloads_by_broker(self, payloads): + payloads_by_broker = collections.defaultdict(list) + for payload in payloads: + try: + leader = self._get_leader_for_partition(payload.topic, payload.partition) + except KafkaUnavailableError: + leader = None + payloads_by_broker[leader].append(payload) + return dict(payloads_by_broker) + def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ Group a list of request payloads by topic+partition and send them to @@ -178,97 +188,76 @@ class KafkaClient(object): # so we need to keep this so we can rebuild order before returning original_ordering = [(p.topic, p.partition) for p in payloads] - # Group the requests by topic+partition - brokers_for_payloads = [] - payloads_by_broker = collections.defaultdict(list) - - responses = {} - for payload in payloads: - try: - leader = self._get_leader_for_partition(payload.topic, - payload.partition) - payloads_by_broker[leader].append(payload) - brokers_for_payloads.append(leader) - except KafkaUnavailableError as e: - log.warning('KafkaUnavailableError attempting to send request ' - 'on topic %s partition %d', payload.topic, payload.partition) - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = FailedPayloadsError(payload) + # Connection errors generally mean stale metadata + # although sometimes it means incorrect api request + # Unfortunately there is no good way to tell the difference + # so we'll just reset metadata on all errors to be safe + refresh_metadata = False # For each broker, send the list of request payloads # and collect the responses and errors - broker_failures = [] + payloads_by_broker = self._payloads_by_broker(payloads) + responses = {} - # For each KafkaConnection keep the real socket so that we can use + def failed_payloads(payloads): + for payload in payloads: + topic_partition = (str(payload.topic), payload.partition) + responses[(topic_partition)] = FailedPayloadsError(payload) + + # For each BrokerConnection keep the real socket so that we can use # a select to perform unblocking I/O connections_by_socket = {} - for broker, payloads in payloads_by_broker.items(): - requestId = self._next_id() - log.debug('Request %s to %s: %s', requestId, broker, payloads) - request = encoder_fn(client_id=self.client_id, - correlation_id=requestId, payloads=payloads) - - # Send the request, recv the response - try: - conn = self._get_conn(broker.host.decode('utf-8'), broker.port) - conn.send(requestId, request) - - except ConnectionError as e: - broker_failures.append(broker) - log.warning('ConnectionError attempting to send request %s ' - 'to server %s: %s', requestId, broker, e) + for broker, broker_payloads in six.iteritems(payloads_by_broker): + if broker is None: + failed_payloads(broker_payloads) + continue - for payload in payloads: - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = FailedPayloadsError(payload) + conn = self._get_conn(broker.host.decode('utf-8'), broker.port) + request = encoder_fn(payloads=broker_payloads) + # decoder_fn=None signal that the server is expected to not + # send a response. This probably only applies to + # ProduceRequest w/ acks = 0 + expect_response = (decoder_fn is not None) + correlation_id = conn.send(request, expect_response=expect_response) + + if correlation_id is None: + refresh_metadata = True + failed_payloads(broker_payloads) + log.warning('Error attempting to send request %s ' + 'to server %s', correlation_id, broker) + continue - # No exception, try to get response - else: + if not expect_response: + log.debug('Request %s does not expect a response ' + '(skipping conn.recv)', correlation_id) + for payload in broker_payloads: + topic_partition = (str(payload.topic), payload.partition) + responses[topic_partition] = None + continue - # decoder_fn=None signal that the server is expected to not - # send a response. This probably only applies to - # ProduceRequest w/ acks = 0 - if decoder_fn is None: - log.debug('Request %s does not expect a response ' - '(skipping conn.recv)', requestId) - for payload in payloads: - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = None - continue - else: - connections_by_socket[conn.get_connected_socket()] = (conn, broker, requestId) + connections_by_socket[conn._read_fd] = (conn, broker) conn = None while connections_by_socket: sockets = connections_by_socket.keys() rlist, _, _ = select.select(sockets, [], [], None) - conn, broker, requestId = connections_by_socket.pop(rlist[0]) - try: - response = conn.recv(requestId) - except ConnectionError as e: - broker_failures.append(broker) - log.warning('ConnectionError attempting to receive a ' - 'response to request %s from server %s: %s', - requestId, broker, e) + conn, broker = connections_by_socket.pop(rlist[0]) + correlation_id = conn.next_correlation_id_recv() + response = conn.recv() + if response is None: + refresh_metadata = True + failed_payloads(payloads_by_broker[broker]) + log.warning('Error receiving response to request %s ' + 'from server %s', correlation_id, broker) + continue - for payload in payloads_by_broker[broker]: - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = FailedPayloadsError(payload) + log.debug('Response %s: %s', correlation_id, response) + for payload_response in decoder_fn(response): + topic_partition = (str(payload_response.topic), + payload_response.partition) + responses[topic_partition] = payload_response - else: - _resps = [] - for payload_response in decoder_fn(response): - topic_partition = (payload_response.topic, - payload_response.partition) - responses[topic_partition] = payload_response - _resps.append(payload_response) - log.debug('Response %s: %s', requestId, _resps) - - # Connection errors generally mean stale metadata - # although sometimes it means incorrect api request - # Unfortunately there is no good way to tell the difference - # so we'll just reset metadata on all errors to be safe - if broker_failures: + if refresh_metadata: self.reset_all_metadata() # Return responses in the same order as provided @@ -387,7 +376,7 @@ class KafkaClient(object): # Public API # ################# def close(self): - for conn in self.conns.values(): + for conn in self._conns.values(): conn.close() def copy(self): @@ -398,13 +387,14 @@ class KafkaClient(object): Note that the copied connections are not initialized, so reinit() must be called on the returned copy. """ + _conns = self._conns + self._conns = {} c = copy.deepcopy(self) - for key in c.conns: - c.conns[key] = self.conns[key].copy() + self._conns = _conns return c def reinit(self): - for conn in self.conns.values(): + for conn in self._conns.values(): conn.reinit() def reset_topic_metadata(self, *topics): @@ -480,11 +470,8 @@ class KafkaClient(object): Partition-level errors will also not be raised here (a single partition w/o a leader, for example) """ - topics = [kafka_bytestring(t) for t in topics] - if topics: - for topic in topics: - self.reset_topic_metadata(topic) + self.reset_topic_metadata(*topics) else: self.reset_all_metadata() @@ -493,50 +480,46 @@ class KafkaClient(object): log.debug('Updating broker metadata: %s', resp.brokers) log.debug('Updating topic metadata: %s', resp.topics) - self.brokers = dict([(broker.nodeId, broker) - for broker in resp.brokers]) - - for topic_metadata in resp.topics: - topic = topic_metadata.topic - partitions = topic_metadata.partitions + self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port)) + for nodeId, host, port in resp.brokers]) + for error, topic, partitions in resp.topics: # Errors expected for new topics - try: - kafka.common.check_error(topic_metadata) - except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e: - - # Raise if the topic was passed in explicitly - if topic in topics: - raise - - # Otherwise, just log a warning - log.error('Error loading topic metadata for %s: %s', topic, type(e)) - continue + if error: + error_type = kafka.common.kafka_errors.get(error, UnknownError) + if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError): + log.error('Error loading topic metadata for %s: %s (%s)', + topic, error_type, error) + if topic not in topics: + continue + raise error_type(topic) self.topic_partitions[topic] = {} - for partition_metadata in partitions: - partition = partition_metadata.partition - leader = partition_metadata.leader + for error, partition, leader, _, _ in partitions: - self.topic_partitions[topic][partition] = partition_metadata + self.topic_partitions[topic][partition] = leader # Populate topics_to_brokers dict topic_part = TopicAndPartition(topic, partition) # Check for partition errors - try: - kafka.common.check_error(partition_metadata) - - # If No Leader, topics_to_brokers topic_partition -> None - except LeaderNotAvailableError: - log.error('No leader for topic %s partition %d', topic, partition) - self.topics_to_brokers[topic_part] = None - continue - # If one of the replicas is unavailable -- ignore - # this error code is provided for admin purposes only - # we never talk to replicas, only the leader - except ReplicaNotAvailableError: - log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition) + if error: + error_type = kafka.common.kafka_errors.get(error, UnknownError) + + # If No Leader, topics_to_brokers topic_partition -> None + if error_type is LeaderNotAvailableError: + log.error('No leader for topic %s partition %d', topic, partition) + self.topics_to_brokers[topic_part] = None + continue + + # If one of the replicas is unavailable -- ignore + # this error code is provided for admin purposes only + # we never talk to replicas, only the leader + elif error_type is ReplicaNotAvailableError: + log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition) + + else: + raise error_type(topic_part) # If Known Broker, topic_partition -> BrokerMetadata if leader in self.brokers: |