summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorOliver Jowett <oliver@mutability.co.uk>2015-07-19 10:57:37 +0000
committerOliver Jowett <oliver@mutability.co.uk>2015-07-19 10:57:37 +0000
commit718e5fb66da5dca449aa31d305b8867fba4f783c (patch)
tree2ca286dc827ea05295b58c1e3c36613a440b7e9c /kafka/client.py
parentadbd4ac052e4a5b40cfc2a3589b7adbcb656afe5 (diff)
downloadkafka-python-718e5fb66da5dca449aa31d305b8867fba4f783c.tar.gz
Treat KafkaUnavailableError like other errors.
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py15
1 files changed, 10 insertions, 5 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 817c621..dbd9863 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -162,11 +162,16 @@ class KafkaClient(object):
payloads_by_broker = collections.defaultdict(list)
for payload in payloads:
- leader = self._get_leader_for_partition(payload.topic,
- payload.partition)
-
- payloads_by_broker[leader].append(payload)
- brokers_for_payloads.append(leader)
+ try:
+ leader = self._get_leader_for_partition(payload.topic,
+ payload.partition)
+ payloads_by_broker[leader].append(payload)
+ brokers_for_payloads.append(leader)
+ except KafkaUnavailableError as e:
+ log.warning('KafkaUnavailableError attempting to send request '
+ 'on topic %s partition %d', payload.topic, payload.partition)
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsErrors(payload)
# For each broker, send the list of request payloads
# and collect the responses and errors