summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2014-01-03 15:26:00 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-06 15:14:50 -0800
commitd1e4fd25c66f9fa7d955694c55e8b51c5da3a565 (patch)
treeb651fc4b59a32c8376222e3399b2e67acf20fdaa /kafka/conn.py
parent81d001bfa2b6936dbefd8515204c2d51a7f299f8 (diff)
downloadkafka-python-d1e4fd25c66f9fa7d955694c55e8b51c5da3a565.tar.gz
Raise a ConnectionError when a socket.error is raised when receiving data
Also, log.exception() is unhelpfully noisy. Use log.error() with some error details in the message instead.
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py18
1 files changed, 11 insertions, 7 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index ca62f52..ff823d2 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -35,13 +35,21 @@ class KafkaConnection(local):
# Private API #
###################
+ def _raise_connection_error(self):
+ self._dirty = True
+ raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
+
def _read_bytes(self, num_bytes):
bytes_left = num_bytes
resp = ''
log.debug("About to read %d bytes from Kafka", num_bytes)
while bytes_left:
- data = self._sock.recv(bytes_left)
+ try:
+ data = self._sock.recv(bytes_left)
+ except socket.error, e:
+ log.error('Unable to receive data from Kafka: %s', e)
+ self._raise_connection_error()
if data == '':
raise BufferUnderflowError("Not enough data to read this response")
bytes_left -= len(data)
@@ -65,10 +73,6 @@ class KafkaConnection(local):
resp = self._read_bytes(size)
return str(resp)
- def _raise_connection_error(self):
- self._dirty = True
- raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
-
##################
# Public API #
##################
@@ -84,8 +88,8 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
- except socket.error:
- log.exception('Unable to send payload to Kafka')
+ except socket.error, e:
+ log.error('Unable to send payload to Kafka: %s', e)
self._raise_connection_error()
def recv(self, request_id):