summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-01 16:21:08 -0800
committerZack Dever <zack.dever@rd.io>2015-12-04 11:25:40 -0800
commit7a6c51bf2e0a926ffe2595f008c68c6b63db2ce7 (patch)
treec24437f9facf21e2a66a2f6ecb7520460374b483
parent254c17e39fb8790957da792acdd7e435551a9ac6 (diff)
downloadkafka-python-7a6c51bf2e0a926ffe2595f008c68c6b63db2ce7.tar.gz
Add size and correlation id decoding to try/except block in BrokerConnection
-rw-r--r--kafka/conn.py9
1 files changed, 5 insertions, 4 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index fee44c4..ab44073 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -88,12 +88,13 @@ class BrokerConnection(local):
# instead we read directly from the socket fd buffer
# alternatively, we could read size bytes into a separate buffer
# and decode from that buffer (and verify buffer is empty afterwards)
- size = Int32.decode(self._read_fd)
- recv_correlation_id = Int32.decode(self._read_fd)
- assert correlation_id == recv_correlation_id
try:
+ size = Int32.decode(self._read_fd)
+ recv_correlation_id = Int32.decode(self._read_fd)
+ if correlation_id != recv_correlation_id:
+ raise RuntimeError('Correlation ids do not match!')
response = response_type.decode(self._read_fd)
- except socket.error as e:
+ except (RuntimeError, socket.error) as e:
log.exception("Error in BrokerConnection.recv()")
self.close()
return None