summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py3
-rw-r--r--test/test_client.py6
2 files changed, 5 insertions, 4 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 4b5a043..11f54eb 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -169,7 +169,8 @@ class SimpleClient(object):
for payload in payloads:
try:
leader = self._get_leader_for_partition(payload.topic, payload.partition)
- except KafkaUnavailableError:
+ except (KafkaUnavailableError, LeaderNotAvailableError,
+ UnknownTopicOrPartitionError):
leader = None
payloads_by_broker[leader].append(payload)
return dict(payloads_by_broker)
diff --git a/test/test_client.py b/test/test_client.py
index 5a35c83..a53fce1 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -11,7 +11,7 @@ from kafka.common import (
BrokerMetadata,
TopicPartition, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
- KafkaTimeoutError, ConnectionError
+ KafkaTimeoutError, ConnectionError, FailedPayloadsError
)
from kafka.conn import KafkaConnection
from kafka.future import Future
@@ -361,7 +361,7 @@ class TestSimpleClient(unittest.TestCase):
"topic_noleader", 0,
[create_message("a"), create_message("b")])]
- with self.assertRaises(LeaderNotAvailableError):
+ with self.assertRaises(FailedPayloadsError):
client.send_produce_request(requests)
@patch('kafka.SimpleClient._get_conn')
@@ -386,7 +386,7 @@ class TestSimpleClient(unittest.TestCase):
"topic_doesnt_exist", 0,
[create_message("a"), create_message("b")])]
- with self.assertRaises(UnknownTopicOrPartitionError):
+ with self.assertRaises(FailedPayloadsError):
client.send_produce_request(requests)
def test_timeout(self):