summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-01 17:03:46 -0700
committerDana Powers <dana.powers@rd.io>2014-09-01 18:04:10 -0700
commit945ecbcee7d2844ebbfa407b1542109fd8518cde (patch)
treeee996c536dd4458149f5c53257ed28b2fe5b1212
parentb260b356b23802a595336c554d6ea044c9be0a79 (diff)
downloadkafka-python-945ecbcee7d2844ebbfa407b1542109fd8518cde.tar.gz
Use standard exceptions in client._get_leader_for_partition()
- drop custom PartitionUnavailable exception - raise UnknownTopicOrPartitionError or LeaderNotAvailableError - add tests for exception raises
-rw-r--r--kafka/client.py43
-rw-r--r--kafka/common.py4
-rw-r--r--test/test_client.py26
-rw-r--r--test/test_consumer.py8
-rw-r--r--test/test_protocol.py4
5 files changed, 48 insertions, 37 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 4d79b41..38136af 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -8,9 +8,9 @@ import kafka.common
from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
- PartitionUnavailableError, LeaderNotAvailableError,
- KafkaUnavailableError, KafkaTimeoutError,
- UnknownTopicOrPartitionError, NotLeaderForPartitionError)
+ KafkaTimeoutError, KafkaUnavailableError,
+ LeaderNotAvailableError, UnknownTopicOrPartitionError,
+ NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -63,20 +63,37 @@ class KafkaClient(object):
Returns the leader for a partition or None if the partition exists
but has no leader.
- PartitionUnavailableError will be raised if the topic or partition
+ UnknownTopicOrPartitionError will be raised if the topic or partition
is not part of the metadata.
+
+ LeaderNotAvailableError is raised if server has metadata, but there is
+ no current leader
"""
key = TopicAndPartition(topic, partition)
- # reload metadata whether the partition is not available
- # or has no leader (broker is None)
- if self.topics_to_brokers.get(key) is None:
- self.load_metadata_for_topics(topic)
- if key not in self.topics_to_brokers:
- raise PartitionUnavailableError("%s not available" % str(key))
+ # Use cached metadata if it is there
+ if self.topics_to_brokers.get(key) is not None:
+ return self.topics_to_brokers[key]
+
+ # Otherwise refresh metadata
+
+ # If topic does not already exist, this will raise
+ # UnknownTopicOrPartitionError if not auto-creating
+ # LeaderNotAvailableError otherwise until partitions are created
+ self.load_metadata_for_topics(topic)
+
+ # If the partition doesn't actually exist, raise
+ if partition not in self.topic_partitions[topic]:
+ raise UnknownTopicOrPartitionError(key)
+
+ # If there's no leader for the partition, raise
+ meta = self.topic_partitions[topic][partition]
+ if meta.leader == -1:
+ raise LeaderNotAvailableError(meta)
- return self.topics_to_brokers[key]
+ # Otherwise return the BrokerMetadata
+ return self.brokers[meta.leader]
def _next_id(self):
"""
@@ -136,10 +153,6 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
- if leader is None:
- raise LeaderNotAvailableError(
- "Leader not available for topic %s partition %s" %
- (payload.topic, payload.partition))
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
diff --git a/kafka/common.py b/kafka/common.py
index e8fa31e..008736c 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -158,10 +158,6 @@ class KafkaTimeoutError(KafkaError):
pass
-class PartitionUnavailableError(KafkaError):
- pass
-
-
class FailedPayloadsError(KafkaError):
pass
diff --git a/test/test_client.py b/test/test_client.py
index 06eec75..7744ede 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -7,7 +7,7 @@ from kafka.common import (
ProduceRequest, MetadataResponse,
BrokerMetadata, TopicMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError,
- LeaderNotAvailableError, PartitionUnavailableError, NoError,
+ LeaderNotAvailableError, NoError,
UnknownTopicOrPartitionError
)
from kafka.protocol import create_message
@@ -191,7 +191,6 @@ class TestKafkaClient(unittest2.TestCase):
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_for_unassigned_partitions(self, protocol, conn):
- "Get leader raises if no partitions is defined for a topic"
conn.recv.return_value = 'response' # anything but None
@@ -201,7 +200,8 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
- TopicMetadata('topic_no_partitions', NO_ERROR, [])
+ TopicMetadata('topic_no_partitions', NO_LEADER, []),
+ TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -209,13 +209,15 @@ class TestKafkaClient(unittest2.TestCase):
self.assertDictEqual({}, client.topics_to_brokers)
- with self.assertRaises(PartitionUnavailableError):
+ with self.assertRaises(LeaderNotAvailableError):
client._get_leader_for_partition('topic_no_partitions', 0)
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client._get_leader_for_partition('topic_unknown', 0)
+
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
- def test_get_leader_returns_none_when_noleader(self, protocol, conn):
- "Getting leader for partitions returns None when the partiion has no leader"
+ def test_get_leader_exceptions_when_noleader(self, protocol, conn):
conn.recv.return_value = 'response' # anything but None
@@ -241,8 +243,16 @@ class TestKafkaClient(unittest2.TestCase):
TopicAndPartition('topic_noleader', 1): None
},
client.topics_to_brokers)
- self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
- self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
+
+ # No leader partitions -- raise LeaderNotAvailableError
+ with self.assertRaises(LeaderNotAvailableError):
+ self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
+ with self.assertRaises(LeaderNotAvailableError):
+ self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
+
+ # Unknown partitions -- raise UnknownTopicOrPartitionError
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2))
topics = [
TopicMetadata('topic_noleader', NO_ERROR, [
diff --git a/test/test_consumer.py b/test/test_consumer.py
index f0b4e53..d5b4fdd 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -5,15 +5,7 @@ import unittest2
from mock import MagicMock, patch
-from kafka import KafkaClient
from kafka.consumer import SimpleConsumer
-from kafka.common import (
- ProduceRequest, BrokerMetadata, PartitionMetadata,
- TopicAndPartition, PartitionUnavailableError
-)
-from kafka.protocol import (
- create_message, KafkaProtocol
-)
class TestKafkaConsumer(unittest2.TestCase):
def test_non_integer_partitions(self):
diff --git a/test/test_protocol.py b/test/test_protocol.py
index a92d20e..dc2411d 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -13,8 +13,8 @@ from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ProduceResponse, FetchResponse, OffsetAndMessage,
BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
- KafkaUnavailableError, PartitionUnavailableError,
- UnsupportedCodecError, ConsumerFetchSizeTooSmall, ProtocolError,
+ KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
+ ProtocolError
)
from kafka.codec import (
has_snappy, gzip_encode, gzip_decode,