summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-08-19 22:29:42 -0700
committerDana Powers <dana.powers@rd.io>2014-08-19 22:29:42 -0700
commit472f287778ada947e76eba9519ad7af0bbd30fa0 (patch)
tree138a355bd249aa5c4b84f70cbb883657bcc801bb /kafka/conn.py
parent24936881b7db48bab52f2a6129ce31c5136529ee (diff)
downloadkafka-python-472f287778ada947e76eba9519ad7af0bbd30fa0.tar.gz
Remove self._dirty and check self._sock instead in kafka.conn
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py17
1 files changed, 11 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index ba0b996..805476e 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -55,7 +55,6 @@ class KafkaConnection(local):
self.host = host
self.port = port
self.timeout = timeout
- self._dirty = None
self._sock = None
self.reinit()
@@ -68,7 +67,11 @@ class KafkaConnection(local):
###################
def _raise_connection_error(self):
- self._dirty = True
+ # Cleanup socket if we have one
+ if self._sock:
+ self.close()
+
+ # And then raise
raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
def _read_bytes(self, num_bytes):
@@ -78,7 +81,7 @@ class KafkaConnection(local):
log.debug("About to read %d bytes from Kafka", num_bytes)
# Make sure we have a connection
- if self._dirty or not self._sock:
+ if not self._sock:
self.reinit()
while bytes_left:
@@ -110,7 +113,7 @@ class KafkaConnection(local):
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
# Make sure we have a connection
- if self._dirty or not self._sock:
+ if not self._sock:
self.reinit()
try:
@@ -158,10 +161,12 @@ class KafkaConnection(local):
Re-initialize the socket connection
"""
log.debug("Reinitializing socket connection for %s:%d" % (self.host, self.port))
- self.close()
+
+ if self._sock:
+ self.close()
+
try:
self._sock = socket.create_connection((self.host, self.port), self.timeout)
- self._dirty = False
except socket.error:
log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port))
self._raise_connection_error()