diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 24 |
1 files changed, 18 insertions, 6 deletions
diff --git a/kafka/client.py b/kafka/client.py index b7ceb2e..71ededa 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -3,9 +3,11 @@ from collections import defaultdict from functools import partial from itertools import count import logging +import socket import time from kafka.common import ErrorMapping, TopicAndPartition +from kafka.common import ConnectionError, FailedPayloadsException from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -71,7 +73,7 @@ class KafkaClient(object): log.debug("Broker metadata: %s", brokers) log.debug("Topic metadata: %s", topics) - self.brokers.update(brokers) + self.brokers = brokers self.topics_to_brokers = {} for topic, partitions in topics.items(): @@ -147,13 +149,15 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) - payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) # Accumulate the responses in a dictionary acc = {} + # keep a list of payloads that were failed to be sent to brokers + failed_payloads = [] + # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): conn = self._get_conn_for_broker(broker) @@ -162,15 +166,23 @@ class KafkaClient(object): correlation_id=requestId, payloads=payloads) # Send the request, recv the response - conn.send(requestId, request) - - if decoder_fn is None: + try: + conn.send(requestId, request) + if decoder_fn is None: + continue + response = conn.recv(requestId) + except ConnectionError, e: # ignore BufferUnderflow for now + log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) + failed_payloads += payloads + self.topics_to_brokers = {} # reset metadata continue - response = conn.recv(requestId) for response in decoder_fn(response): acc[(response.topic, response.partition)] = response + if failed_payloads: + raise FailedPayloadsException(failed_payloads) + # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) if acc else () |