summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorZack Dever <zack.dever@rd.io>2015-12-07 13:37:30 -0800
committerZack Dever <zack.dever@rd.io>2015-12-07 13:37:30 -0800
commit753d8dca136178a4c2ecb0cda8d4ec371805455f (patch)
tree83225fb95731551cbb9c5a5aeb6fb08a3ec9f0ad /kafka/client.py
parentefc3d4f466c0d6630c9fff09fb1b90035c5351d7 (diff)
parenta678260d3622a0decd2d123ac0cfc445084eed60 (diff)
downloadkafka-python-753d8dca136178a4c2ecb0cda8d4ec371805455f.tar.gz
Merge branch 'master' into 0.9
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py10
1 files changed, 3 insertions, 7 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 810fa46..9018bb4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -236,13 +236,13 @@ class KafkaClient(object):
responses[topic_partition] = None
continue
else:
- connections_by_socket[conn.get_connected_socket()] = (conn, broker)
+ connections_by_socket[conn.get_connected_socket()] = (conn, broker, requestId)
conn = None
while connections_by_socket:
sockets = connections_by_socket.keys()
rlist, _, _ = select.select(sockets, [], [], None)
- conn, broker = connections_by_socket.pop(rlist[0])
+ conn, broker, requestId = connections_by_socket.pop(rlist[0])
try:
response = conn.recv(requestId)
except ConnectionError as e:
@@ -607,11 +607,7 @@ class KafkaClient(object):
else:
decoder = KafkaProtocol.decode_produce_response
- try:
- resps = self._send_broker_aware_request(payloads, encoder, decoder)
- except Exception:
- if fail_on_error:
- raise
+ resps = self._send_broker_aware_request(payloads, encoder, decoder)
return [resp if not callback else callback(resp) for resp in resps
if resp is not None and