summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py361
1 files changed, 176 insertions, 185 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 9018bb4..14e71bb 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -2,19 +2,22 @@ import collections
import copy
import functools
import logging
-import select
+import random
import time
+import six
+
import kafka.common
-from kafka.common import (TopicAndPartition, BrokerMetadata,
+from kafka.common import (TopicPartition, BrokerMetadata, UnknownError,
ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
-from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
+from kafka.conn import (
+ collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS,
+ ConnectionStates)
from kafka.protocol import KafkaProtocol
-from kafka.util import kafka_bytestring
log = logging.getLogger(__name__)
@@ -31,20 +34,18 @@ class KafkaClient(object):
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
correlation_id=0):
# We need one connection to bootstrap
- self.client_id = kafka_bytestring(client_id)
+ self.client_id = client_id
self.timeout = timeout
self.hosts = collect_hosts(hosts)
self.correlation_id = correlation_id
- # create connections only when we need them
- self.conns = {}
+ self._conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
- self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata
+ self.topics_to_brokers = {} # TopicPartition -> BrokerMetadata
self.topic_partitions = {} # topic -> partition -> PartitionMetadata
self.load_metadata_for_topics() # bootstrap with all metadata
-
##################
# Private API #
##################
@@ -52,14 +53,17 @@ class KafkaClient(object):
def _get_conn(self, host, port):
"""Get or create a connection to a broker using host and port"""
host_key = (host, port)
- if host_key not in self.conns:
- self.conns[host_key] = KafkaConnection(
- host,
- port,
- timeout=self.timeout
+ if host_key not in self._conns:
+ self._conns[host_key] = BrokerConnection(
+ host, port,
+ request_timeout_ms=self.timeout * 1000,
+ client_id=self.client_id
)
- return self.conns[host_key]
+ conn = self._conns[host_key]
+ while conn.connect() == ConnectionStates.CONNECTING:
+ pass
+ return conn
def _get_leader_for_partition(self, topic, partition):
"""
@@ -73,7 +77,7 @@ class KafkaClient(object):
no current leader
"""
- key = TopicAndPartition(topic, partition)
+ key = TopicPartition(topic, partition)
# Use cached metadata if it is there
if self.topics_to_brokers.get(key) is not None:
@@ -91,21 +95,21 @@ class KafkaClient(object):
raise UnknownTopicOrPartitionError(key)
# If there's no leader for the partition, raise
- meta = self.topic_partitions[topic][partition]
- if meta.leader == -1:
- raise LeaderNotAvailableError(meta)
+ leader = self.topic_partitions[topic][partition]
+ if leader == -1:
+ raise LeaderNotAvailableError((topic, partition))
# Otherwise return the BrokerMetadata
- return self.brokers[meta.leader]
+ return self.brokers[leader]
def _get_coordinator_for_group(self, group):
"""
Returns the coordinator broker for a consumer group.
- ConsumerCoordinatorNotAvailableCode will be raised if the coordinator
+ GroupCoordinatorNotAvailableError will be raised if the coordinator
does not currently exist for the group.
- OffsetsLoadInProgressCode is raised if the coordinator is available
+ GroupLoadInProgressError is raised if the coordinator is available
but is still loading offsets from the internal topic
"""
@@ -129,26 +133,40 @@ class KafkaClient(object):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
- for (host, port) in self.hosts:
- requestId = self._next_id()
- log.debug('Request %s: %s', requestId, payloads)
- try:
- conn = self._get_conn(host, port)
- request = encoder_fn(client_id=self.client_id,
- correlation_id=requestId,
- payloads=payloads)
+ hosts = set([(broker.host, broker.port) for broker in self.brokers.values()])
+ hosts.update(self.hosts)
+ hosts = list(hosts)
+ random.shuffle(hosts)
+
+ for (host, port) in hosts:
+ conn = self._get_conn(host, port)
+ if not conn.connected():
+ log.warning("Skipping unconnected connection: %s", conn)
+ continue
+ request = encoder_fn(payloads=payloads)
+ future = conn.send(request)
- conn.send(requestId, request)
- response = conn.recv(requestId)
- decoded = decoder_fn(response)
- log.debug('Response %s: %s', requestId, decoded)
- return decoded
+ # Block
+ while not future.is_done:
+ conn.recv()
- except Exception:
- log.exception('Error sending request [%s] to server %s:%s, '
- 'trying next server', requestId, host, port)
+ if future.failed():
+ log.error("Request failed: %s", future.exception)
+ continue
- raise KafkaUnavailableError('All servers failed to process request')
+ return decoder_fn(future.value)
+
+ raise KafkaUnavailableError('All servers failed to process request: %s' % hosts)
+
+ def _payloads_by_broker(self, payloads):
+ payloads_by_broker = collections.defaultdict(list)
+ for payload in payloads:
+ try:
+ leader = self._get_leader_for_partition(payload.topic, payload.partition)
+ except KafkaUnavailableError:
+ leader = None
+ payloads_by_broker[leader].append(payload)
+ return dict(payloads_by_broker)
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
@@ -178,97 +196,79 @@ class KafkaClient(object):
# so we need to keep this so we can rebuild order before returning
original_ordering = [(p.topic, p.partition) for p in payloads]
- # Group the requests by topic+partition
- brokers_for_payloads = []
- payloads_by_broker = collections.defaultdict(list)
-
- responses = {}
- for payload in payloads:
- try:
- leader = self._get_leader_for_partition(payload.topic,
- payload.partition)
- payloads_by_broker[leader].append(payload)
- brokers_for_payloads.append(leader)
- except KafkaUnavailableError as e:
- log.warning('KafkaUnavailableError attempting to send request '
- 'on topic %s partition %d', payload.topic, payload.partition)
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
+ # Connection errors generally mean stale metadata
+ # although sometimes it means incorrect api request
+ # Unfortunately there is no good way to tell the difference
+ # so we'll just reset metadata on all errors to be safe
+ refresh_metadata = False
# For each broker, send the list of request payloads
# and collect the responses and errors
- broker_failures = []
+ payloads_by_broker = self._payloads_by_broker(payloads)
+ responses = {}
- # For each KafkaConnection keep the real socket so that we can use
+ def failed_payloads(payloads):
+ for payload in payloads:
+ topic_partition = (str(payload.topic), payload.partition)
+ responses[(topic_partition)] = FailedPayloadsError(payload)
+
+ # For each BrokerConnection keep the real socket so that we can use
# a select to perform unblocking I/O
- connections_by_socket = {}
- for broker, payloads in payloads_by_broker.items():
- requestId = self._next_id()
- log.debug('Request %s to %s: %s', requestId, broker, payloads)
- request = encoder_fn(client_id=self.client_id,
- correlation_id=requestId, payloads=payloads)
-
- # Send the request, recv the response
- try:
- conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
- conn.send(requestId, request)
+ connections_by_future = {}
+ for broker, broker_payloads in six.iteritems(payloads_by_broker):
+ if broker is None:
+ failed_payloads(broker_payloads)
+ continue
- except ConnectionError as e:
- broker_failures.append(broker)
- log.warning('ConnectionError attempting to send request %s '
- 'to server %s: %s', requestId, broker, e)
+ conn = self._get_conn(broker.host, broker.port)
+ conn.connect()
+ if not conn.connected():
+ refresh_metadata = True
+ failed_payloads(broker_payloads)
+ continue
- for payload in payloads:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
+ request = encoder_fn(payloads=broker_payloads)
+ # decoder_fn=None signal that the server is expected to not
+ # send a response. This probably only applies to
+ # ProduceRequest w/ acks = 0
+ expect_response = (decoder_fn is not None)
+ future = conn.send(request, expect_response=expect_response)
- # No exception, try to get response
- else:
+ if future.failed():
+ refresh_metadata = True
+ failed_payloads(broker_payloads)
+ continue
- # decoder_fn=None signal that the server is expected to not
- # send a response. This probably only applies to
- # ProduceRequest w/ acks = 0
- if decoder_fn is None:
- log.debug('Request %s does not expect a response '
- '(skipping conn.recv)', requestId)
- for payload in payloads:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = None
- continue
- else:
- connections_by_socket[conn.get_connected_socket()] = (conn, broker, requestId)
+ if not expect_response:
+ for payload in broker_payloads:
+ topic_partition = (str(payload.topic), payload.partition)
+ responses[topic_partition] = None
+ continue
+
+ connections_by_future[future] = (conn, broker)
conn = None
- while connections_by_socket:
- sockets = connections_by_socket.keys()
- rlist, _, _ = select.select(sockets, [], [], None)
- conn, broker, requestId = connections_by_socket.pop(rlist[0])
- try:
- response = conn.recv(requestId)
- except ConnectionError as e:
- broker_failures.append(broker)
- log.warning('ConnectionError attempting to receive a '
- 'response to request %s from server %s: %s',
- requestId, broker, e)
+ while connections_by_future:
+ futures = list(connections_by_future.keys())
+ for future in futures:
- for payload in payloads_by_broker[broker]:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
+ if not future.is_done:
+ conn, _ = connections_by_future[future]
+ conn.recv()
+ continue
- else:
- _resps = []
- for payload_response in decoder_fn(response):
- topic_partition = (payload_response.topic,
- payload_response.partition)
- responses[topic_partition] = payload_response
- _resps.append(payload_response)
- log.debug('Response %s: %s', requestId, _resps)
+ _, broker = connections_by_future.pop(future)
+ if future.failed():
+ refresh_metadata = True
+ failed_payloads(payloads_by_broker[broker])
- # Connection errors generally mean stale metadata
- # although sometimes it means incorrect api request
- # Unfortunately there is no good way to tell the difference
- # so we'll just reset metadata on all errors to be safe
- if broker_failures:
+ else:
+ for payload_response in decoder_fn(future.value):
+ topic_partition = (str(payload_response.topic),
+ payload_response.partition)
+ responses[topic_partition] = payload_response
+
+ if refresh_metadata:
self.reset_all_metadata()
# Return responses in the same order as provided
@@ -316,7 +316,7 @@ class KafkaClient(object):
# Send the request, recv the response
try:
- conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
+ conn = self._get_conn(broker.host, broker.port)
conn.send(requestId, request)
except ConnectionError as e:
@@ -387,7 +387,7 @@ class KafkaClient(object):
# Public API #
#################
def close(self):
- for conn in self.conns.values():
+ for conn in self._conns.values():
conn.close()
def copy(self):
@@ -398,14 +398,17 @@ class KafkaClient(object):
Note that the copied connections are not initialized, so reinit() must
be called on the returned copy.
"""
+ _conns = self._conns
+ self._conns = {}
c = copy.deepcopy(self)
- for key in c.conns:
- c.conns[key] = self.conns[key].copy()
+ self._conns = _conns
return c
def reinit(self):
- for conn in self.conns.values():
- conn.reinit()
+ for conn in self._conns.values():
+ conn.close()
+ while conn.connect() == ConnectionStates.CONNECTING:
+ pass
def reset_topic_metadata(self, *topics):
for topic in topics:
@@ -420,14 +423,12 @@ class KafkaClient(object):
self.topic_partitions.clear()
def has_metadata_for_topic(self, topic):
- topic = kafka_bytestring(topic)
return (
topic in self.topic_partitions
and len(self.topic_partitions[topic]) > 0
)
def get_partition_ids_for_topic(self, topic):
- topic = kafka_bytestring(topic)
if topic not in self.topic_partitions:
return []
@@ -454,89 +455,79 @@ class KafkaClient(object):
time.sleep(.5)
def load_metadata_for_topics(self, *topics):
- """
- Fetch broker and topic-partition metadata from the server,
- and update internal data:
- broker list, topic/partition list, and topic/parition -> broker map
+ """Fetch broker and topic-partition metadata from the server.
+
+ Updates internal data: broker list, topic/partition list, and
+ topic/parition -> broker map. This method should be called after
+ receiving any error.
- This method should be called after receiving any error
+ Note: Exceptions *will not* be raised in a full refresh (i.e. no topic
+ list). In this case, error codes will be logged as errors.
+ Partition-level errors will also not be raised here (a single partition
+ w/o a leader, for example).
Arguments:
*topics (optional): If a list of topics is provided,
- the metadata refresh will be limited to the specified topics only.
-
- Exceptions:
- ----------
- If the broker is configured to not auto-create topics,
- expect UnknownTopicOrPartitionError for topics that don't exist
-
- If the broker is configured to auto-create topics,
- expect LeaderNotAvailableError for new topics
- until partitions have been initialized.
-
- Exceptions *will not* be raised in a full refresh (i.e. no topic list)
- In this case, error codes will be logged as errors
-
- Partition-level errors will also not be raised here
- (a single partition w/o a leader, for example)
+ the metadata refresh will be limited to the specified topics
+ only.
+
+ Raises:
+ UnknownTopicOrPartitionError: Raised for topics that do not exist,
+ unless the broker is configured to auto-create topics.
+ LeaderNotAvailableError: Raised for topics that do not exist yet,
+ when the broker is configured to auto-create topics. Retry
+ after a short backoff (topics/partitions are initializing).
"""
- topics = [kafka_bytestring(t) for t in topics]
-
if topics:
- for topic in topics:
- self.reset_topic_metadata(topic)
+ self.reset_topic_metadata(*topics)
else:
self.reset_all_metadata()
resp = self.send_metadata_request(topics)
log.debug('Updating broker metadata: %s', resp.brokers)
- log.debug('Updating topic metadata: %s', resp.topics)
+ log.debug('Updating topic metadata: %s', [topic for _, topic, _ in resp.topics])
- self.brokers = dict([(broker.nodeId, broker)
- for broker in resp.brokers])
-
- for topic_metadata in resp.topics:
- topic = topic_metadata.topic
- partitions = topic_metadata.partitions
+ self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port))
+ for nodeId, host, port in resp.brokers])
+ for error, topic, partitions in resp.topics:
# Errors expected for new topics
- try:
- kafka.common.check_error(topic_metadata)
- except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e:
-
- # Raise if the topic was passed in explicitly
- if topic in topics:
- raise
-
- # Otherwise, just log a warning
- log.error('Error loading topic metadata for %s: %s', topic, type(e))
- continue
+ if error:
+ error_type = kafka.common.kafka_errors.get(error, UnknownError)
+ if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError):
+ log.error('Error loading topic metadata for %s: %s (%s)',
+ topic, error_type, error)
+ if topic not in topics:
+ continue
+ raise error_type(topic)
self.topic_partitions[topic] = {}
- for partition_metadata in partitions:
- partition = partition_metadata.partition
- leader = partition_metadata.leader
+ for error, partition, leader, _, _ in partitions:
- self.topic_partitions[topic][partition] = partition_metadata
+ self.topic_partitions[topic][partition] = leader
# Populate topics_to_brokers dict
- topic_part = TopicAndPartition(topic, partition)
+ topic_part = TopicPartition(topic, partition)
# Check for partition errors
- try:
- kafka.common.check_error(partition_metadata)
-
- # If No Leader, topics_to_brokers topic_partition -> None
- except LeaderNotAvailableError:
- log.error('No leader for topic %s partition %d', topic, partition)
- self.topics_to_brokers[topic_part] = None
- continue
- # If one of the replicas is unavailable -- ignore
- # this error code is provided for admin purposes only
- # we never talk to replicas, only the leader
- except ReplicaNotAvailableError:
- log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
+ if error:
+ error_type = kafka.common.kafka_errors.get(error, UnknownError)
+
+ # If No Leader, topics_to_brokers topic_partition -> None
+ if error_type is LeaderNotAvailableError:
+ log.error('No leader for topic %s partition %d', topic, partition)
+ self.topics_to_brokers[topic_part] = None
+ continue
+
+ # If one of the replicas is unavailable -- ignore
+ # this error code is provided for admin purposes only
+ # we never talk to replicas, only the leader
+ elif error_type is ReplicaNotAvailableError:
+ log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
+
+ else:
+ raise error_type(topic_part)
# If Known Broker, topic_partition -> BrokerMetadata
if leader in self.brokers: