diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 28 |
1 files changed, 18 insertions, 10 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 194a19c..14aebc6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,7 +5,7 @@ import struct from threading import local from kafka.common import BufferUnderflowError - +from kafka.common import ConnectionError log = logging.getLogger("kafka") @@ -27,6 +27,7 @@ class KafkaConnection(local): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._sock.settimeout(10) + self._dirty = False def __str__(self): return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) @@ -54,7 +55,7 @@ class KafkaConnection(local): # Read the size off of the header resp = self._sock.recv(4) if resp == "": - raise Exception("Got no response from Kafka") + self._raise_connection_error() (size,) = struct.unpack('>i', resp) messagesize = size - 4 @@ -72,6 +73,10 @@ class KafkaConnection(local): total += len(resp) yield resp + def _raise_connection_error(self): + self._dirty = True + raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port)) + ################## # Public API # ################## @@ -80,14 +85,16 @@ class KafkaConnection(local): def send(self, request_id, payload): "Send a request to Kafka" - - log.debug( - "About to send %d bytes to Kafka, request %d" % - (len(payload), request_id)) - - sent = self._sock.sendall(payload) - if sent is not None: - raise RuntimeError("Kafka went away") + log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) + try: + if self._dirty: + self.reinit() + sent = self._sock.sendall(payload) + if sent is not None: + self._raise_connection_error() + except socket.error: + log.exception('Unable to send payload to Kafka') + self._raise_connection_error() def recv(self, request_id): """ @@ -121,3 +128,4 @@ class KafkaConnection(local): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.host, self.port)) self._sock.settimeout(10) + self._dirty = False |