diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-10-07 09:51:32 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-10-07 09:51:32 -0700 |
commit | e99a934bab1d551d07dd0c6365f6a730028489f3 (patch) | |
tree | 231ada631af2b7c9cf36576a7005fd28b88ff890 /kafka/client.py | |
parent | b525e1a8d63e4fcb0ede43c05739bc84c85cc79c (diff) | |
parent | 77e5180a377197c8157a19d5603ad2653c238aa3 (diff) | |
download | kafka-python-e99a934bab1d551d07dd0c6365f6a730028489f3.tar.gz |
Merge pull request #436 from mutability/async-catch-unavailable-error
Catch KafkaUnavailableError in _send_broker_aware_request
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/kafka/client.py b/kafka/client.py index d8d77e4..13777a4 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -161,16 +161,21 @@ class KafkaClient(object): brokers_for_payloads = [] payloads_by_broker = collections.defaultdict(list) + responses = {} for payload in payloads: - leader = self._get_leader_for_partition(payload.topic, - payload.partition) - - payloads_by_broker[leader].append(payload) - brokers_for_payloads.append(leader) + try: + leader = self._get_leader_for_partition(payload.topic, + payload.partition) + payloads_by_broker[leader].append(payload) + brokers_for_payloads.append(leader) + except KafkaUnavailableError as e: + log.warning('KafkaUnavailableError attempting to send request ' + 'on topic %s partition %d', payload.topic, payload.partition) + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) # For each broker, send the list of request payloads # and collect the responses and errors - responses = {} broker_failures = [] for broker, payloads in payloads_by_broker.items(): requestId = self._next_id() |