summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-09 15:34:58 -0800
committerDana Powers <dana.powers@rd.io>2015-12-09 15:34:58 -0800
commitad030ccd4df57305bb624b03eddaa2641f956160 (patch)
treee4f849aa0718a853f271592f2b4be9d3862ed601 /kafka/client.py
parent4be8a58592e63859964ca903fa09a7a31ba0c3a2 (diff)
downloadkafka-python-ad030ccd4df57305bb624b03eddaa2641f956160.tar.gz
Refactor KafkaClient to use BrokerConnections and new Request/Response structs
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py279
1 files changed, 131 insertions, 148 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 9018bb4..cb60d98 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -2,17 +2,20 @@ import collections
import copy
import functools
import logging
+import random
import select
import time
+import six
+
import kafka.common
-from kafka.common import (TopicAndPartition, BrokerMetadata,
+from kafka.common import (TopicAndPartition, BrokerMetadata, UnknownError,
ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
-from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
+from kafka.conn import collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
from kafka.util import kafka_bytestring
@@ -31,13 +34,12 @@ class KafkaClient(object):
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
correlation_id=0):
# We need one connection to bootstrap
- self.client_id = kafka_bytestring(client_id)
+ self.client_id = client_id
self.timeout = timeout
self.hosts = collect_hosts(hosts)
self.correlation_id = correlation_id
- # create connections only when we need them
- self.conns = {}
+ self._conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata
self.topic_partitions = {} # topic -> partition -> PartitionMetadata
@@ -52,14 +54,14 @@ class KafkaClient(object):
def _get_conn(self, host, port):
"""Get or create a connection to a broker using host and port"""
host_key = (host, port)
- if host_key not in self.conns:
- self.conns[host_key] = KafkaConnection(
- host,
- port,
- timeout=self.timeout
+ if host_key not in self._conns:
+ self._conns[host_key] = BrokerConnection(
+ host, port,
+ timeout=self.timeout,
+ client_id=self.client_id
)
- return self.conns[host_key]
+ return self._conns[host_key]
def _get_leader_for_partition(self, topic, partition):
"""
@@ -91,12 +93,12 @@ class KafkaClient(object):
raise UnknownTopicOrPartitionError(key)
# If there's no leader for the partition, raise
- meta = self.topic_partitions[topic][partition]
- if meta.leader == -1:
- raise LeaderNotAvailableError(meta)
+ leader = self.topic_partitions[topic][partition]
+ if leader == -1:
+ raise LeaderNotAvailableError((topic, partition))
# Otherwise return the BrokerMetadata
- return self.brokers[meta.leader]
+ return self.brokers[leader]
def _get_coordinator_for_group(self, group):
"""
@@ -129,27 +131,35 @@ class KafkaClient(object):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
- for (host, port) in self.hosts:
- requestId = self._next_id()
- log.debug('Request %s: %s', requestId, payloads)
- try:
- conn = self._get_conn(host, port)
- request = encoder_fn(client_id=self.client_id,
- correlation_id=requestId,
- payloads=payloads)
-
- conn.send(requestId, request)
- response = conn.recv(requestId)
+ hosts = set([(broker.host, broker.port) for broker in self.brokers.values()])
+ hosts.update(self.hosts)
+ hosts = list(hosts)
+ random.shuffle(hosts)
+
+ for (host, port) in hosts:
+ conn = self._get_conn(host, port)
+ request = encoder_fn(payloads=payloads)
+ correlation_id = conn.send(request)
+ if correlation_id is None:
+ continue
+ response = conn.recv()
+ if response is not None:
decoded = decoder_fn(response)
- log.debug('Response %s: %s', requestId, decoded)
+ log.debug('Response %s: %s', correlation_id, decoded)
return decoded
- except Exception:
- log.exception('Error sending request [%s] to server %s:%s, '
- 'trying next server', requestId, host, port)
-
raise KafkaUnavailableError('All servers failed to process request')
+ def _payloads_by_broker(self, payloads):
+ payloads_by_broker = collections.defaultdict(list)
+ for payload in payloads:
+ try:
+ leader = self._get_leader_for_partition(payload.topic, payload.partition)
+ except KafkaUnavailableError:
+ leader = None
+ payloads_by_broker[leader].append(payload)
+ return dict(payloads_by_broker)
+
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
Group a list of request payloads by topic+partition and send them to
@@ -178,97 +188,76 @@ class KafkaClient(object):
# so we need to keep this so we can rebuild order before returning
original_ordering = [(p.topic, p.partition) for p in payloads]
- # Group the requests by topic+partition
- brokers_for_payloads = []
- payloads_by_broker = collections.defaultdict(list)
-
- responses = {}
- for payload in payloads:
- try:
- leader = self._get_leader_for_partition(payload.topic,
- payload.partition)
- payloads_by_broker[leader].append(payload)
- brokers_for_payloads.append(leader)
- except KafkaUnavailableError as e:
- log.warning('KafkaUnavailableError attempting to send request '
- 'on topic %s partition %d', payload.topic, payload.partition)
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
+ # Connection errors generally mean stale metadata
+ # although sometimes it means incorrect api request
+ # Unfortunately there is no good way to tell the difference
+ # so we'll just reset metadata on all errors to be safe
+ refresh_metadata = False
# For each broker, send the list of request payloads
# and collect the responses and errors
- broker_failures = []
+ payloads_by_broker = self._payloads_by_broker(payloads)
+ responses = {}
- # For each KafkaConnection keep the real socket so that we can use
+ def failed_payloads(payloads):
+ for payload in payloads:
+ topic_partition = (str(payload.topic), payload.partition)
+ responses[(topic_partition)] = FailedPayloadsError(payload)
+
+ # For each BrokerConnection keep the real socket so that we can use
# a select to perform unblocking I/O
connections_by_socket = {}
- for broker, payloads in payloads_by_broker.items():
- requestId = self._next_id()
- log.debug('Request %s to %s: %s', requestId, broker, payloads)
- request = encoder_fn(client_id=self.client_id,
- correlation_id=requestId, payloads=payloads)
-
- # Send the request, recv the response
- try:
- conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
- conn.send(requestId, request)
-
- except ConnectionError as e:
- broker_failures.append(broker)
- log.warning('ConnectionError attempting to send request %s '
- 'to server %s: %s', requestId, broker, e)
+ for broker, broker_payloads in six.iteritems(payloads_by_broker):
+ if broker is None:
+ failed_payloads(broker_payloads)
+ continue
- for payload in payloads:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
+ conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
+ request = encoder_fn(payloads=broker_payloads)
+ # decoder_fn=None signal that the server is expected to not
+ # send a response. This probably only applies to
+ # ProduceRequest w/ acks = 0
+ expect_response = (decoder_fn is not None)
+ correlation_id = conn.send(request, expect_response=expect_response)
+
+ if correlation_id is None:
+ refresh_metadata = True
+ failed_payloads(broker_payloads)
+ log.warning('Error attempting to send request %s '
+ 'to server %s', correlation_id, broker)
+ continue
- # No exception, try to get response
- else:
+ if not expect_response:
+ log.debug('Request %s does not expect a response '
+ '(skipping conn.recv)', correlation_id)
+ for payload in broker_payloads:
+ topic_partition = (str(payload.topic), payload.partition)
+ responses[topic_partition] = None
+ continue
- # decoder_fn=None signal that the server is expected to not
- # send a response. This probably only applies to
- # ProduceRequest w/ acks = 0
- if decoder_fn is None:
- log.debug('Request %s does not expect a response '
- '(skipping conn.recv)', requestId)
- for payload in payloads:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = None
- continue
- else:
- connections_by_socket[conn.get_connected_socket()] = (conn, broker, requestId)
+ connections_by_socket[conn._read_fd] = (conn, broker)
conn = None
while connections_by_socket:
sockets = connections_by_socket.keys()
rlist, _, _ = select.select(sockets, [], [], None)
- conn, broker, requestId = connections_by_socket.pop(rlist[0])
- try:
- response = conn.recv(requestId)
- except ConnectionError as e:
- broker_failures.append(broker)
- log.warning('ConnectionError attempting to receive a '
- 'response to request %s from server %s: %s',
- requestId, broker, e)
+ conn, broker = connections_by_socket.pop(rlist[0])
+ correlation_id = conn.next_correlation_id_recv()
+ response = conn.recv()
+ if response is None:
+ refresh_metadata = True
+ failed_payloads(payloads_by_broker[broker])
+ log.warning('Error receiving response to request %s '
+ 'from server %s', correlation_id, broker)
+ continue
- for payload in payloads_by_broker[broker]:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
+ log.debug('Response %s: %s', correlation_id, response)
+ for payload_response in decoder_fn(response):
+ topic_partition = (str(payload_response.topic),
+ payload_response.partition)
+ responses[topic_partition] = payload_response
- else:
- _resps = []
- for payload_response in decoder_fn(response):
- topic_partition = (payload_response.topic,
- payload_response.partition)
- responses[topic_partition] = payload_response
- _resps.append(payload_response)
- log.debug('Response %s: %s', requestId, _resps)
-
- # Connection errors generally mean stale metadata
- # although sometimes it means incorrect api request
- # Unfortunately there is no good way to tell the difference
- # so we'll just reset metadata on all errors to be safe
- if broker_failures:
+ if refresh_metadata:
self.reset_all_metadata()
# Return responses in the same order as provided
@@ -387,7 +376,7 @@ class KafkaClient(object):
# Public API #
#################
def close(self):
- for conn in self.conns.values():
+ for conn in self._conns.values():
conn.close()
def copy(self):
@@ -398,13 +387,14 @@ class KafkaClient(object):
Note that the copied connections are not initialized, so reinit() must
be called on the returned copy.
"""
+ _conns = self._conns
+ self._conns = {}
c = copy.deepcopy(self)
- for key in c.conns:
- c.conns[key] = self.conns[key].copy()
+ self._conns = _conns
return c
def reinit(self):
- for conn in self.conns.values():
+ for conn in self._conns.values():
conn.reinit()
def reset_topic_metadata(self, *topics):
@@ -480,11 +470,8 @@ class KafkaClient(object):
Partition-level errors will also not be raised here
(a single partition w/o a leader, for example)
"""
- topics = [kafka_bytestring(t) for t in topics]
-
if topics:
- for topic in topics:
- self.reset_topic_metadata(topic)
+ self.reset_topic_metadata(*topics)
else:
self.reset_all_metadata()
@@ -493,50 +480,46 @@ class KafkaClient(object):
log.debug('Updating broker metadata: %s', resp.brokers)
log.debug('Updating topic metadata: %s', resp.topics)
- self.brokers = dict([(broker.nodeId, broker)
- for broker in resp.brokers])
-
- for topic_metadata in resp.topics:
- topic = topic_metadata.topic
- partitions = topic_metadata.partitions
+ self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port))
+ for nodeId, host, port in resp.brokers])
+ for error, topic, partitions in resp.topics:
# Errors expected for new topics
- try:
- kafka.common.check_error(topic_metadata)
- except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e:
-
- # Raise if the topic was passed in explicitly
- if topic in topics:
- raise
-
- # Otherwise, just log a warning
- log.error('Error loading topic metadata for %s: %s', topic, type(e))
- continue
+ if error:
+ error_type = kafka.common.kafka_errors.get(error, UnknownError)
+ if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError):
+ log.error('Error loading topic metadata for %s: %s (%s)',
+ topic, error_type, error)
+ if topic not in topics:
+ continue
+ raise error_type(topic)
self.topic_partitions[topic] = {}
- for partition_metadata in partitions:
- partition = partition_metadata.partition
- leader = partition_metadata.leader
+ for error, partition, leader, _, _ in partitions:
- self.topic_partitions[topic][partition] = partition_metadata
+ self.topic_partitions[topic][partition] = leader
# Populate topics_to_brokers dict
topic_part = TopicAndPartition(topic, partition)
# Check for partition errors
- try:
- kafka.common.check_error(partition_metadata)
-
- # If No Leader, topics_to_brokers topic_partition -> None
- except LeaderNotAvailableError:
- log.error('No leader for topic %s partition %d', topic, partition)
- self.topics_to_brokers[topic_part] = None
- continue
- # If one of the replicas is unavailable -- ignore
- # this error code is provided for admin purposes only
- # we never talk to replicas, only the leader
- except ReplicaNotAvailableError:
- log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
+ if error:
+ error_type = kafka.common.kafka_errors.get(error, UnknownError)
+
+ # If No Leader, topics_to_brokers topic_partition -> None
+ if error_type is LeaderNotAvailableError:
+ log.error('No leader for topic %s partition %d', topic, partition)
+ self.topics_to_brokers[topic_part] = None
+ continue
+
+ # If one of the replicas is unavailable -- ignore
+ # this error code is provided for admin purposes only
+ # we never talk to replicas, only the leader
+ elif error_type is ReplicaNotAvailableError:
+ log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
+
+ else:
+ raise error_type(topic_part)
# If Known Broker, topic_partition -> BrokerMetadata
if leader in self.brokers: