diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-01 17:03:46 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-01 18:04:10 -0700 |
commit | 945ecbcee7d2844ebbfa407b1542109fd8518cde (patch) | |
tree | ee996c536dd4458149f5c53257ed28b2fe5b1212 | |
parent | b260b356b23802a595336c554d6ea044c9be0a79 (diff) | |
download | kafka-python-945ecbcee7d2844ebbfa407b1542109fd8518cde.tar.gz |
Use standard exceptions in client._get_leader_for_partition()
- drop custom PartitionUnavailable exception
- raise UnknownTopicOrPartitionError or LeaderNotAvailableError
- add tests for exception raises
-rw-r--r-- | kafka/client.py | 43 | ||||
-rw-r--r-- | kafka/common.py | 4 | ||||
-rw-r--r-- | test/test_client.py | 26 | ||||
-rw-r--r-- | test/test_consumer.py | 8 | ||||
-rw-r--r-- | test/test_protocol.py | 4 |
5 files changed, 48 insertions, 37 deletions
diff --git a/kafka/client.py b/kafka/client.py index 4d79b41..38136af 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,9 +8,9 @@ import kafka.common from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, - PartitionUnavailableError, LeaderNotAvailableError, - KafkaUnavailableError, KafkaTimeoutError, - UnknownTopicOrPartitionError, NotLeaderForPartitionError) + KafkaTimeoutError, KafkaUnavailableError, + LeaderNotAvailableError, UnknownTopicOrPartitionError, + NotLeaderForPartitionError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -63,20 +63,37 @@ class KafkaClient(object): Returns the leader for a partition or None if the partition exists but has no leader. - PartitionUnavailableError will be raised if the topic or partition + UnknownTopicOrPartitionError will be raised if the topic or partition is not part of the metadata. + + LeaderNotAvailableError is raised if server has metadata, but there is + no current leader """ key = TopicAndPartition(topic, partition) - # reload metadata whether the partition is not available - # or has no leader (broker is None) - if self.topics_to_brokers.get(key) is None: - self.load_metadata_for_topics(topic) - if key not in self.topics_to_brokers: - raise PartitionUnavailableError("%s not available" % str(key)) + # Use cached metadata if it is there + if self.topics_to_brokers.get(key) is not None: + return self.topics_to_brokers[key] + + # Otherwise refresh metadata + + # If topic does not already exist, this will raise + # UnknownTopicOrPartitionError if not auto-creating + # LeaderNotAvailableError otherwise until partitions are created + self.load_metadata_for_topics(topic) + + # If the partition doesn't actually exist, raise + if partition not in self.topic_partitions[topic]: + raise UnknownTopicOrPartitionError(key) + + # If there's no leader for the partition, raise + meta = self.topic_partitions[topic][partition] + if meta.leader == -1: + raise LeaderNotAvailableError(meta) - return self.topics_to_brokers[key] + # Otherwise return the BrokerMetadata + return self.brokers[meta.leader] def _next_id(self): """ @@ -136,10 +153,6 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) - if leader is None: - raise LeaderNotAvailableError( - "Leader not available for topic %s partition %s" % - (payload.topic, payload.partition)) payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) diff --git a/kafka/common.py b/kafka/common.py index e8fa31e..008736c 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -158,10 +158,6 @@ class KafkaTimeoutError(KafkaError): pass -class PartitionUnavailableError(KafkaError): - pass - - class FailedPayloadsError(KafkaError): pass diff --git a/test/test_client.py b/test/test_client.py index 06eec75..7744ede 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -7,7 +7,7 @@ from kafka.common import ( ProduceRequest, MetadataResponse, BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, - LeaderNotAvailableError, PartitionUnavailableError, NoError, + LeaderNotAvailableError, NoError, UnknownTopicOrPartitionError ) from kafka.protocol import create_message @@ -191,7 +191,6 @@ class TestKafkaClient(unittest2.TestCase): @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_get_leader_for_unassigned_partitions(self, protocol, conn): - "Get leader raises if no partitions is defined for a topic" conn.recv.return_value = 'response' # anything but None @@ -201,7 +200,8 @@ class TestKafkaClient(unittest2.TestCase): ] topics = [ - TopicMetadata('topic_no_partitions', NO_ERROR, []) + TopicMetadata('topic_no_partitions', NO_LEADER, []), + TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) @@ -209,13 +209,15 @@ class TestKafkaClient(unittest2.TestCase): self.assertDictEqual({}, client.topics_to_brokers) - with self.assertRaises(PartitionUnavailableError): + with self.assertRaises(LeaderNotAvailableError): client._get_leader_for_partition('topic_no_partitions', 0) + with self.assertRaises(UnknownTopicOrPartitionError): + client._get_leader_for_partition('topic_unknown', 0) + @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') - def test_get_leader_returns_none_when_noleader(self, protocol, conn): - "Getting leader for partitions returns None when the partiion has no leader" + def test_get_leader_exceptions_when_noleader(self, protocol, conn): conn.recv.return_value = 'response' # anything but None @@ -241,8 +243,16 @@ class TestKafkaClient(unittest2.TestCase): TopicAndPartition('topic_noleader', 1): None }, client.topics_to_brokers) - self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) - self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) + + # No leader partitions -- raise LeaderNotAvailableError + with self.assertRaises(LeaderNotAvailableError): + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) + with self.assertRaises(LeaderNotAvailableError): + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) + + # Unknown partitions -- raise UnknownTopicOrPartitionError + with self.assertRaises(UnknownTopicOrPartitionError): + self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2)) topics = [ TopicMetadata('topic_noleader', NO_ERROR, [ diff --git a/test/test_consumer.py b/test/test_consumer.py index f0b4e53..d5b4fdd 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -5,15 +5,7 @@ import unittest2 from mock import MagicMock, patch -from kafka import KafkaClient from kafka.consumer import SimpleConsumer -from kafka.common import ( - ProduceRequest, BrokerMetadata, PartitionMetadata, - TopicAndPartition, PartitionUnavailableError -) -from kafka.protocol import ( - create_message, KafkaProtocol -) class TestKafkaConsumer(unittest2.TestCase): def test_non_integer_partitions(self): diff --git a/test/test_protocol.py b/test/test_protocol.py index a92d20e..dc2411d 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -13,8 +13,8 @@ from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, - KafkaUnavailableError, PartitionUnavailableError, - UnsupportedCodecError, ConsumerFetchSizeTooSmall, ProtocolError, + KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall, + ProtocolError ) from kafka.codec import ( has_snappy, gzip_encode, gzip_decode, |