diff options
author | mrtheb <mrlabbe@gmail.com> | 2014-02-15 11:00:35 -0500 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2014-02-15 11:00:35 -0500 |
commit | 5e5d7097a70e79d90d573aef6a0adba40ada5f03 (patch) | |
tree | ce12705aaa9ad1bd77d2a65709cd2b43d0fd86e0 | |
parent | b253166bec5a7d836767523b1ad5275eeed6b83f (diff) | |
download | kafka-python-5e5d7097a70e79d90d573aef6a0adba40ada5f03.tar.gz |
Changes based on comments by @rdiomar, plus added LeaderUnavailableError for clarity
-rw-r--r-- | kafka/client.py | 11 | ||||
-rw-r--r-- | kafka/common.py | 4 | ||||
-rw-r--r-- | test/test_unit.py | 8 |
3 files changed, 15 insertions, 8 deletions
diff --git a/kafka/client.py b/kafka/client.py index fbbff25..c3606e4 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,6 +8,7 @@ from itertools import count from kafka.common import (ErrorMapping, TopicAndPartition, ConnectionError, FailedPayloadsError, BrokerResponseError, PartitionUnavailableError, + LeaderUnavailableError, KafkaUnavailableError) from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS @@ -124,8 +125,10 @@ class KafkaClient(object): leader = self._get_leader_for_partition(payload.topic, payload.partition) if leader is None: - raise PartitionUnavailableError( - "No leader for topic %s partition %s" % (payload.topic, payload.partition)) + raise LeaderUnavailableError( + "Leader not available for topic %s partition %s" % + (payload.topic, payload.partition)) + payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) @@ -250,7 +253,7 @@ class KafkaClient(object): self.reset_topic_metadata(topic) if not partitions: - log.info('No partitions for %s', topic) + log.warning('No partitions for %s', topic) continue self.topic_partitions[topic] = [] @@ -258,7 +261,7 @@ class KafkaClient(object): self.topic_partitions[topic].append(partition) topic_part = TopicAndPartition(topic, partition) if meta.leader == -1: - log.info('No leader for topic %s partition %s', topic, partition) + log.warning('No leader for topic %s partition %s', topic, partition) self.topics_to_brokers[topic_part] = None else: self.topics_to_brokers[topic_part] = brokers[meta.leader] diff --git a/kafka/common.py b/kafka/common.py index ec0b89b..b4fe5c7 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -82,6 +82,10 @@ class BrokerResponseError(KafkaError): pass +class LeaderUnavailableError(KafkaError): + pass + + class PartitionUnavailableError(KafkaError): pass diff --git a/test/test_unit.py b/test/test_unit.py index f0edd16..1439d8b 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -7,7 +7,7 @@ from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata, - TopicAndPartition, PartitionUnavailableError + TopicAndPartition, LeaderUnavailableError, PartitionUnavailableError ) from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, @@ -386,7 +386,7 @@ class TestProtocol(unittest.TestCase): pass -class TestClient(unittest.TestCase): +class TestKafkaClient(unittest.TestCase): @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') @@ -520,7 +520,7 @@ class TestClient(unittest.TestCase): @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_send_produce_request_raises_when_noleader(self, protocol, conn): - "Getting leader for partitions returns None when the partiion has no leader" + "Send producer request raises LeaderUnavailableError if leader is not available" conn.recv.return_value = 'response' # anything but None @@ -542,7 +542,7 @@ class TestClient(unittest.TestCase): [create_message("a"), create_message("b")])] self.assertRaises( - PartitionUnavailableError, + LeaderUnavailableError, client.send_produce_request, requests) if __name__ == '__main__': |