diff options
author | Zack Dever <zdever@pandora.com> | 2016-03-17 12:17:03 -0700 |
---|---|---|
committer | Zack Dever <zdever@pandora.com> | 2016-03-17 15:39:28 -0700 |
commit | 4915942b07236ca28731dab2fab80c0e93c14bf6 (patch) | |
tree | a6d3dcab6253e9eb2d43713eab0b10181659f0cd | |
parent | ab03296b65b2031930a0f04d06502b156dd01657 (diff) | |
download | kafka-python-4915942b07236ca28731dab2fab80c0e93c14bf6.tar.gz |
catch all errors thrown by _get_leader_for_partition in SimpleClient
-rw-r--r-- | kafka/client.py | 3 | ||||
-rw-r--r-- | test/test_client.py | 6 |
2 files changed, 5 insertions, 4 deletions
diff --git a/kafka/client.py b/kafka/client.py index 4b5a043..11f54eb 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -169,7 +169,8 @@ class SimpleClient(object): for payload in payloads: try: leader = self._get_leader_for_partition(payload.topic, payload.partition) - except KafkaUnavailableError: + except (KafkaUnavailableError, LeaderNotAvailableError, + UnknownTopicOrPartitionError): leader = None payloads_by_broker[leader].append(payload) return dict(payloads_by_broker) diff --git a/test/test_client.py b/test/test_client.py index 5a35c83..a53fce1 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -11,7 +11,7 @@ from kafka.common import ( BrokerMetadata, TopicPartition, KafkaUnavailableError, LeaderNotAvailableError, UnknownTopicOrPartitionError, - KafkaTimeoutError, ConnectionError + KafkaTimeoutError, ConnectionError, FailedPayloadsError ) from kafka.conn import KafkaConnection from kafka.future import Future @@ -361,7 +361,7 @@ class TestSimpleClient(unittest.TestCase): "topic_noleader", 0, [create_message("a"), create_message("b")])] - with self.assertRaises(LeaderNotAvailableError): + with self.assertRaises(FailedPayloadsError): client.send_produce_request(requests) @patch('kafka.SimpleClient._get_conn') @@ -386,7 +386,7 @@ class TestSimpleClient(unittest.TestCase): "topic_doesnt_exist", 0, [create_message("a"), create_message("b")])] - with self.assertRaises(UnknownTopicOrPartitionError): + with self.assertRaises(FailedPayloadsError): client.send_produce_request(requests) def test_timeout(self): |