diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 43 |
1 files changed, 28 insertions, 15 deletions
diff --git a/kafka/client.py b/kafka/client.py index 4d79b41..38136af 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,9 +8,9 @@ import kafka.common from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, - PartitionUnavailableError, LeaderNotAvailableError, - KafkaUnavailableError, KafkaTimeoutError, - UnknownTopicOrPartitionError, NotLeaderForPartitionError) + KafkaTimeoutError, KafkaUnavailableError, + LeaderNotAvailableError, UnknownTopicOrPartitionError, + NotLeaderForPartitionError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -63,20 +63,37 @@ class KafkaClient(object): Returns the leader for a partition or None if the partition exists but has no leader. - PartitionUnavailableError will be raised if the topic or partition + UnknownTopicOrPartitionError will be raised if the topic or partition is not part of the metadata. + + LeaderNotAvailableError is raised if server has metadata, but there is + no current leader """ key = TopicAndPartition(topic, partition) - # reload metadata whether the partition is not available - # or has no leader (broker is None) - if self.topics_to_brokers.get(key) is None: - self.load_metadata_for_topics(topic) - if key not in self.topics_to_brokers: - raise PartitionUnavailableError("%s not available" % str(key)) + # Use cached metadata if it is there + if self.topics_to_brokers.get(key) is not None: + return self.topics_to_brokers[key] + + # Otherwise refresh metadata + + # If topic does not already exist, this will raise + # UnknownTopicOrPartitionError if not auto-creating + # LeaderNotAvailableError otherwise until partitions are created + self.load_metadata_for_topics(topic) + + # If the partition doesn't actually exist, raise + if partition not in self.topic_partitions[topic]: + raise UnknownTopicOrPartitionError(key) + + # If there's no leader for the partition, raise + meta = self.topic_partitions[topic][partition] + if meta.leader == -1: + raise LeaderNotAvailableError(meta) - return self.topics_to_brokers[key] + # Otherwise return the BrokerMetadata + return self.brokers[meta.leader] def _next_id(self): """ @@ -136,10 +153,6 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) - if leader is None: - raise LeaderNotAvailableError( - "Leader not available for topic %s partition %s" % - (payload.topic, payload.partition)) payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) |