summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-08 13:15:56 -0700
committerDana Powers <dana.powers@rd.io>2014-09-08 13:17:43 -0700
commitfff812ddc80243208233f785b3f005904cf33482 (patch)
tree30f1eb703dcd8ce1063c413fd70ac11f3fff5072 /kafka/client.py
parent42a7ab18bb84fea60deed5f7e3a6cfdfaaaeecd6 (diff)
parent0dabb1fbe8a9f538527a03c2903475ed77a12c10 (diff)
downloadkafka-python-fff812ddc80243208233f785b3f005904cf33482.tar.gz
Merge pull request #223 from dpkp/metadata_refactor
Metadata Refactor * add MetadataRequest and MetadataResponse namedtuples * add TopicMetadata namedtuple * add error codes to Topic and Partition Metadata * add KafkaClient.send_metadata_request() method * KafkaProtocol.decode_metadata_response changed to return a MetadataResponse object so that it is consistent with server api: [broker_list, topic_list] * raise server exceptions in load_metadata_for_topics(*topics) unless topics is null (full refresh) * Replace non-standard exceptions (LeaderUnavailable, PartitionUnavailable) with server standard exceptions (LeaderNotAvailableError, UnknownTopicOrPartitionError) Conflicts: kafka/client.py test/test_client.py test/test_producer_integration.py test/test_protocol.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py219
1 files changed, 156 insertions, 63 deletions
diff --git a/kafka/client.py b/kafka/client.py
index a918091..8c78694 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -7,11 +7,11 @@ import logging
import time
import kafka.common
-from kafka.common import (TopicAndPartition,
+from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
- PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError,
- KafkaTimeoutError,
- UnknownTopicOrPartitionError, NotLeaderForPartitionError)
+ KafkaTimeoutError, KafkaUnavailableError,
+ LeaderNotAvailableError, UnknownTopicOrPartitionError,
+ NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -37,8 +37,9 @@ class KafkaClient(object):
# create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
- self.topics_to_brokers = {} # topic_id -> broker_id
- self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
+ self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata
+ self.topic_partitions = {} # topic -> partition -> PartitionMetadata
+
self.load_metadata_for_topics() # bootstrap with all metadata
@@ -63,20 +64,37 @@ class KafkaClient(object):
Returns the leader for a partition or None if the partition exists
but has no leader.
- PartitionUnavailableError will be raised if the topic or partition
+ UnknownTopicOrPartitionError will be raised if the topic or partition
is not part of the metadata.
+
+ LeaderNotAvailableError is raised if server has metadata, but there is
+ no current leader
"""
key = TopicAndPartition(topic, partition)
- # reload metadata whether the partition is not available
- # or has no leader (broker is None)
- if self.topics_to_brokers.get(key) is None:
- self.load_metadata_for_topics(topic)
- if key not in self.topics_to_brokers:
- raise PartitionUnavailableError("%s not available" % str(key))
+ # Use cached metadata if it is there
+ if self.topics_to_brokers.get(key) is not None:
+ return self.topics_to_brokers[key]
+
+ # Otherwise refresh metadata
+
+ # If topic does not already exist, this will raise
+ # UnknownTopicOrPartitionError if not auto-creating
+ # LeaderNotAvailableError otherwise until partitions are created
+ self.load_metadata_for_topics(topic)
- return self.topics_to_brokers[key]
+ # If the partition doesn't actually exist, raise
+ if partition not in self.topic_partitions[topic]:
+ raise UnknownTopicOrPartitionError(key)
+
+ # If there's no leader for the partition, raise
+ meta = self.topic_partitions[topic][partition]
+ if meta.leader == -1:
+ raise LeaderNotAvailableError(meta)
+
+ # Otherwise return the BrokerMetadata
+ return self.brokers[meta.leader]
def _next_id(self):
"""
@@ -84,20 +102,26 @@ class KafkaClient(object):
"""
return next(KafkaClient.ID_GEN)
- def _send_broker_unaware_request(self, requestId, request):
+ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
"""
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for (host, port) in self.hosts:
+ requestId = self._next_id()
try:
conn = self._get_conn(host, port)
+ request = encoder_fn(client_id=self.client_id,
+ correlation_id=requestId,
+ payloads=payloads)
+
conn.send(requestId, request)
response = conn.recv(requestId)
- return response
+ return decoder_fn(response)
+
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
- "trying next server: %s" % (binascii.b2a_hex(request), host, port, e))
+ "trying next server: %s" % (requestId, host, port, e))
raise KafkaUnavailableError("All servers failed to process request")
@@ -109,8 +133,8 @@ class KafkaClient(object):
Params
======
- payloads: list of object-like entities with a topic and
- partition attribute
+ payloads: list of object-like entities with a topic (str) and
+ partition (int) attribute
encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
keyword arguments
@@ -130,10 +154,6 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
- if leader is None:
- raise LeaderUnavailableError(
- "Leader not available for topic %s partition %s" %
- (payload.topic, payload.partition))
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
@@ -195,6 +215,24 @@ class KafkaClient(object):
#################
# Public API #
#################
+ def close(self):
+ for conn in self.conns.values():
+ conn.close()
+
+ def copy(self):
+ """
+ Create an inactive copy of the client object
+ A reinit() has to be done on the copy before it can be used again
+ """
+ c = copy.deepcopy(self)
+ for key in c.conns:
+ c.conns[key] = self.conns[key].copy()
+ return c
+
+ def reinit(self):
+ for conn in self.conns.values():
+ conn.reinit()
+
def reset_topic_metadata(self, *topics):
for topic in topics:
try:
@@ -212,70 +250,125 @@ class KafkaClient(object):
self.topic_partitions.clear()
def has_metadata_for_topic(self, topic):
- return topic in self.topic_partitions
+ return (
+ topic in self.topic_partitions
+ and len(self.topic_partitions[topic]) > 0
+ )
+
+ def get_partition_ids_for_topic(self, topic):
+ if topic not in self.topic_partitions:
+ return None
+
+ return list(self.topic_partitions[topic])
def ensure_topic_exists(self, topic, timeout = 30):
start_time = time.time()
- self.load_metadata_for_topics(topic)
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError("Unable to create topic {0}".format(topic))
- self.load_metadata_for_topics(topic)
+ try:
+ self.load_metadata_for_topics(topic)
+ except LeaderNotAvailableError:
+ pass
+ except UnknownTopicOrPartitionError:
+ # Server is not configured to auto-create
+ # retrying in this case will not help
+ raise
time.sleep(.5)
- def close(self):
- for conn in self.conns.values():
- conn.close()
-
- def copy(self):
- """
- Create an inactive copy of the client object
- A reinit() has to be done on the copy before it can be used again
+ def load_metadata_for_topics(self, *topics):
"""
- c = copy.deepcopy(self)
- for key in c.conns:
- c.conns[key] = self.conns[key].copy()
- return c
+ Fetch broker and topic-partition metadata from the server,
+ and update internal data:
+ broker list, topic/partition list, and topic/parition -> broker map
- def reinit(self):
- for conn in self.conns.values():
- conn.reinit()
+ This method should be called after receiving any error
- def load_metadata_for_topics(self, *topics):
- """
- Discover brokers and metadata for a set of topics. This function is called
- lazily whenever metadata is unavailable.
- """
- request_id = self._next_id()
- request = KafkaProtocol.encode_metadata_request(self.client_id,
- request_id, topics)
+ @param: *topics (optional)
+ If a list of topics is provided, the metadata refresh will be limited
+ to the specified topics only.
+
+ Exceptions:
+ ----------
+ If the broker is configured to not auto-create topics,
+ expect UnknownTopicOrPartitionError for topics that don't exist
- response = self._send_broker_unaware_request(request_id, request)
+ If the broker is configured to auto-create topics,
+ expect LeaderNotAvailableError for new topics
+ until partitions have been initialized.
- (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
+ Exceptions *will not* be raised in a full refresh (i.e. no topic list)
+ In this case, error codes will be logged as errors
- log.debug("Broker metadata: %s", brokers)
- log.debug("Topic metadata: %s", topics)
+ Partition-level errors will also not be raised here
+ (a single partition w/o a leader, for example)
+ """
+ resp = self.send_metadata_request(topics)
+
+ log.debug("Broker metadata: %s", resp.brokers)
+ log.debug("Topic metadata: %s", resp.topics)
+
+ self.brokers = dict([(broker.nodeId, broker)
+ for broker in resp.brokers])
- self.brokers = brokers
+ for topic_metadata in resp.topics:
+ topic = topic_metadata.topic
+ partitions = topic_metadata.partitions
- for topic, partitions in topics.items():
self.reset_topic_metadata(topic)
- if not partitions:
- log.warning('No partitions for %s', topic)
+ # Errors expected for new topics
+ try:
+ kafka.common.check_error(topic_metadata)
+ except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e:
+
+ # Raise if the topic was passed in explicitly
+ if topic in topics:
+ raise
+
+ # Otherwise, just log a warning
+ log.error("Error loading topic metadata for %s: %s", topic, type(e))
continue
- self.topic_partitions[topic] = []
- for partition, meta in partitions.items():
- self.topic_partitions[topic].append(partition)
+ self.topic_partitions[topic] = {}
+ for partition_metadata in partitions:
+ partition = partition_metadata.partition
+ leader = partition_metadata.leader
+
+ self.topic_partitions[topic][partition] = partition_metadata
+
+ # Populate topics_to_brokers dict
topic_part = TopicAndPartition(topic, partition)
- if meta.leader == -1:
- log.warning('No leader for topic %s partition %s', topic, partition)
+
+ # Check for partition errors
+ try:
+ kafka.common.check_error(partition_metadata)
+
+ # If No Leader, topics_to_brokers topic_partition -> None
+ except LeaderNotAvailableError:
+ log.error('No leader for topic %s partition %d', topic, partition)
self.topics_to_brokers[topic_part] = None
+ continue
+
+ # If Known Broker, topic_partition -> BrokerMetadata
+ if leader in self.brokers:
+ self.topics_to_brokers[topic_part] = self.brokers[leader]
+
+ # If Unknown Broker, fake BrokerMetadata so we dont lose the id
+ # (not sure how this could happen. server could be in bad state)
else:
- self.topics_to_brokers[topic_part] = brokers[meta.leader]
+ self.topics_to_brokers[topic_part] = BrokerMetadata(
+ leader, None, None
+ )
+
+ def send_metadata_request(self, payloads=[], fail_on_error=True,
+ callback=None):
+
+ encoder = KafkaProtocol.encode_metadata_request
+ decoder = KafkaProtocol.decode_metadata_response
+
+ return self._send_broker_unaware_request(payloads, encoder, decoder)
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):