diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-01 16:21:08 -0800 |
---|---|---|
committer | Zack Dever <zack.dever@rd.io> | 2015-12-04 11:25:40 -0800 |
commit | 7a6c51bf2e0a926ffe2595f008c68c6b63db2ce7 (patch) | |
tree | c24437f9facf21e2a66a2f6ecb7520460374b483 | |
parent | 254c17e39fb8790957da792acdd7e435551a9ac6 (diff) | |
download | kafka-python-7a6c51bf2e0a926ffe2595f008c68c6b63db2ce7.tar.gz |
Add size and correlation id decoding to try/except block in BrokerConnection
-rw-r--r-- | kafka/conn.py | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index fee44c4..ab44073 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -88,12 +88,13 @@ class BrokerConnection(local): # instead we read directly from the socket fd buffer # alternatively, we could read size bytes into a separate buffer # and decode from that buffer (and verify buffer is empty afterwards) - size = Int32.decode(self._read_fd) - recv_correlation_id = Int32.decode(self._read_fd) - assert correlation_id == recv_correlation_id try: + size = Int32.decode(self._read_fd) + recv_correlation_id = Int32.decode(self._read_fd) + if correlation_id != recv_correlation_id: + raise RuntimeError('Correlation ids do not match!') response = response_type.decode(self._read_fd) - except socket.error as e: + except (RuntimeError, socket.error) as e: log.exception("Error in BrokerConnection.recv()") self.close() return None |