summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-10-07 09:51:32 -0700
committerDana Powers <dana.powers@gmail.com>2015-10-07 09:51:32 -0700
commite99a934bab1d551d07dd0c6365f6a730028489f3 (patch)
tree231ada631af2b7c9cf36576a7005fd28b88ff890 /kafka/client.py
parentb525e1a8d63e4fcb0ede43c05739bc84c85cc79c (diff)
parent77e5180a377197c8157a19d5603ad2653c238aa3 (diff)
downloadkafka-python-e99a934bab1d551d07dd0c6365f6a730028489f3.tar.gz
Merge pull request #436 from mutability/async-catch-unavailable-error
Catch KafkaUnavailableError in _send_broker_aware_request
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py17
1 files changed, 11 insertions, 6 deletions
diff --git a/kafka/client.py b/kafka/client.py
index d8d77e4..13777a4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -161,16 +161,21 @@ class KafkaClient(object):
brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)
+ responses = {}
for payload in payloads:
- leader = self._get_leader_for_partition(payload.topic,
- payload.partition)
-
- payloads_by_broker[leader].append(payload)
- brokers_for_payloads.append(leader)
+ try:
+ leader = self._get_leader_for_partition(payload.topic,
+ payload.partition)
+ payloads_by_broker[leader].append(payload)
+ brokers_for_payloads.append(leader)
+ except KafkaUnavailableError as e:
+ log.warning('KafkaUnavailableError attempting to send request '
+ 'on topic %s partition %d', payload.topic, payload.partition)
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
# For each broker, send the list of request payloads
# and collect the responses and errors
- responses = {}
broker_failures = []
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()