diff options
-rw-r--r-- | kafka/client.py | 3 | ||||
-rw-r--r-- | test/test_client.py | 6 |
2 files changed, 5 insertions, 4 deletions
diff --git a/kafka/client.py b/kafka/client.py index 4b5a043..11f54eb 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -169,7 +169,8 @@ class SimpleClient(object): for payload in payloads: try: leader = self._get_leader_for_partition(payload.topic, payload.partition) - except KafkaUnavailableError: + except (KafkaUnavailableError, LeaderNotAvailableError, + UnknownTopicOrPartitionError): leader = None payloads_by_broker[leader].append(payload) return dict(payloads_by_broker) diff --git a/test/test_client.py b/test/test_client.py index 5a35c83..a53fce1 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -11,7 +11,7 @@ from kafka.common import ( BrokerMetadata, TopicPartition, KafkaUnavailableError, LeaderNotAvailableError, UnknownTopicOrPartitionError, - KafkaTimeoutError, ConnectionError + KafkaTimeoutError, ConnectionError, FailedPayloadsError ) from kafka.conn import KafkaConnection from kafka.future import Future @@ -361,7 +361,7 @@ class TestSimpleClient(unittest.TestCase): "topic_noleader", 0, [create_message("a"), create_message("b")])] - with self.assertRaises(LeaderNotAvailableError): + with self.assertRaises(FailedPayloadsError): client.send_produce_request(requests) @patch('kafka.SimpleClient._get_conn') @@ -386,7 +386,7 @@ class TestSimpleClient(unittest.TestCase): "topic_doesnt_exist", 0, [create_message("a"), create_message("b")])] - with self.assertRaises(UnknownTopicOrPartitionError): + with self.assertRaises(FailedPayloadsError): client.send_produce_request(requests) def test_timeout(self): |