summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py10
1 files changed, 3 insertions, 7 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 810fa46..9018bb4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -236,13 +236,13 @@ class KafkaClient(object):
responses[topic_partition] = None
continue
else:
- connections_by_socket[conn.get_connected_socket()] = (conn, broker)
+ connections_by_socket[conn.get_connected_socket()] = (conn, broker, requestId)
conn = None
while connections_by_socket:
sockets = connections_by_socket.keys()
rlist, _, _ = select.select(sockets, [], [], None)
- conn, broker = connections_by_socket.pop(rlist[0])
+ conn, broker, requestId = connections_by_socket.pop(rlist[0])
try:
response = conn.recv(requestId)
except ConnectionError as e:
@@ -607,11 +607,7 @@ class KafkaClient(object):
else:
decoder = KafkaProtocol.decode_produce_response
- try:
- resps = self._send_broker_aware_request(payloads, encoder, decoder)
- except Exception:
- if fail_on_error:
- raise
+ resps = self._send_broker_aware_request(payloads, encoder, decoder)
return [resp if not callback else callback(resp) for resp in resps
if resp is not None and