diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index ba0b996..805476e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -55,7 +55,6 @@ class KafkaConnection(local): self.host = host self.port = port self.timeout = timeout - self._dirty = None self._sock = None self.reinit() @@ -68,7 +67,11 @@ class KafkaConnection(local): ################### def _raise_connection_error(self): - self._dirty = True + # Cleanup socket if we have one + if self._sock: + self.close() + + # And then raise raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) def _read_bytes(self, num_bytes): @@ -78,7 +81,7 @@ class KafkaConnection(local): log.debug("About to read %d bytes from Kafka", num_bytes) # Make sure we have a connection - if self._dirty or not self._sock: + if not self._sock: self.reinit() while bytes_left: @@ -110,7 +113,7 @@ class KafkaConnection(local): log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) # Make sure we have a connection - if self._dirty or not self._sock: + if not self._sock: self.reinit() try: @@ -158,10 +161,12 @@ class KafkaConnection(local): Re-initialize the socket connection """ log.debug("Reinitializing socket connection for %s:%d" % (self.host, self.port)) - self.close() + + if self._sock: + self.close() + try: self._sock = socket.create_connection((self.host, self.port), self.timeout) - self._dirty = False except socket.error: log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port)) self._raise_connection_error() |