summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py24
1 files changed, 18 insertions, 6 deletions
diff --git a/kafka/client.py b/kafka/client.py
index b7ceb2e..71ededa 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -3,9 +3,11 @@ from collections import defaultdict
from functools import partial
from itertools import count
import logging
+import socket
import time
from kafka.common import ErrorMapping, TopicAndPartition
+from kafka.common import ConnectionError, FailedPayloadsException
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol
@@ -71,7 +73,7 @@ class KafkaClient(object):
log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)
- self.brokers.update(brokers)
+ self.brokers = brokers
self.topics_to_brokers = {}
for topic, partitions in topics.items():
@@ -147,13 +149,15 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
-
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
# Accumulate the responses in a dictionary
acc = {}
+ # keep a list of payloads that were failed to be sent to brokers
+ failed_payloads = []
+
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn_for_broker(broker)
@@ -162,15 +166,23 @@ class KafkaClient(object):
correlation_id=requestId, payloads=payloads)
# Send the request, recv the response
- conn.send(requestId, request)
-
- if decoder_fn is None:
+ try:
+ conn.send(requestId, request)
+ if decoder_fn is None:
+ continue
+ response = conn.recv(requestId)
+ except ConnectionError, e: # ignore BufferUnderflow for now
+ log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
+ failed_payloads += payloads
+ self.topics_to_brokers = {} # reset metadata
continue
- response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
+ if failed_payloads:
+ raise FailedPayloadsException(failed_payloads)
+
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()