summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-01 17:03:46 -0700
committerDana Powers <dana.powers@rd.io>2014-09-01 18:04:10 -0700
commit945ecbcee7d2844ebbfa407b1542109fd8518cde (patch)
treeee996c536dd4458149f5c53257ed28b2fe5b1212 /kafka/client.py
parentb260b356b23802a595336c554d6ea044c9be0a79 (diff)
downloadkafka-python-945ecbcee7d2844ebbfa407b1542109fd8518cde.tar.gz
Use standard exceptions in client._get_leader_for_partition()
- drop custom PartitionUnavailable exception - raise UnknownTopicOrPartitionError or LeaderNotAvailableError - add tests for exception raises
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py43
1 files changed, 28 insertions, 15 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 4d79b41..38136af 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -8,9 +8,9 @@ import kafka.common
from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
- PartitionUnavailableError, LeaderNotAvailableError,
- KafkaUnavailableError, KafkaTimeoutError,
- UnknownTopicOrPartitionError, NotLeaderForPartitionError)
+ KafkaTimeoutError, KafkaUnavailableError,
+ LeaderNotAvailableError, UnknownTopicOrPartitionError,
+ NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -63,20 +63,37 @@ class KafkaClient(object):
Returns the leader for a partition or None if the partition exists
but has no leader.
- PartitionUnavailableError will be raised if the topic or partition
+ UnknownTopicOrPartitionError will be raised if the topic or partition
is not part of the metadata.
+
+ LeaderNotAvailableError is raised if server has metadata, but there is
+ no current leader
"""
key = TopicAndPartition(topic, partition)
- # reload metadata whether the partition is not available
- # or has no leader (broker is None)
- if self.topics_to_brokers.get(key) is None:
- self.load_metadata_for_topics(topic)
- if key not in self.topics_to_brokers:
- raise PartitionUnavailableError("%s not available" % str(key))
+ # Use cached metadata if it is there
+ if self.topics_to_brokers.get(key) is not None:
+ return self.topics_to_brokers[key]
+
+ # Otherwise refresh metadata
+
+ # If topic does not already exist, this will raise
+ # UnknownTopicOrPartitionError if not auto-creating
+ # LeaderNotAvailableError otherwise until partitions are created
+ self.load_metadata_for_topics(topic)
+
+ # If the partition doesn't actually exist, raise
+ if partition not in self.topic_partitions[topic]:
+ raise UnknownTopicOrPartitionError(key)
+
+ # If there's no leader for the partition, raise
+ meta = self.topic_partitions[topic][partition]
+ if meta.leader == -1:
+ raise LeaderNotAvailableError(meta)
- return self.topics_to_brokers[key]
+ # Otherwise return the BrokerMetadata
+ return self.brokers[meta.leader]
def _next_id(self):
"""
@@ -136,10 +153,6 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
- if leader is None:
- raise LeaderNotAvailableError(
- "Leader not available for topic %s partition %s" %
- (payload.topic, payload.partition))
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))