summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-10-07 14:16:13 -0700
committerDavid Arthur <mumrah@gmail.com>2013-10-07 14:16:13 -0700
commiteb2c1735f26ce11540fb92ea94817f43b9b3a798 (patch)
tree0c036a3d0691c6c7f69166bdc373fbe4e79b2705 /kafka/conn.py
parentcfd9f86e60429d1f7af8bcac5849808354b8719e (diff)
parenta6c99b287b1cf9c39068be74d72150808588dd43 (diff)
downloadkafka-python-eb2c1735f26ce11540fb92ea94817f43b9b3a798.tar.gz
Merge pull request #55 from quixey/fault-tolerance
Improve fault tolerance by handling leadership election and other metadata changes Thanks, @jimjh!
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py28
1 files changed, 18 insertions, 10 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index e85fd11..9356731 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -4,7 +4,7 @@ import struct
from threading import local
from kafka.common import BufferUnderflowError
-
+from kafka.common import ConnectionError
log = logging.getLogger("kafka")
@@ -26,6 +26,7 @@ class KafkaConnection(local):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(10)
+ self._dirty = False
def __str__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
@@ -53,7 +54,7 @@ class KafkaConnection(local):
# Read the size off of the header
resp = self._sock.recv(4)
if resp == "":
- raise Exception("Got no response from Kafka")
+ self._raise_connection_error()
(size,) = struct.unpack('>i', resp)
messagesize = size - 4
@@ -71,6 +72,10 @@ class KafkaConnection(local):
total += len(resp)
yield resp
+ def _raise_connection_error(self):
+ self._dirty = True
+ raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
+
##################
# Public API #
##################
@@ -79,14 +84,16 @@ class KafkaConnection(local):
def send(self, request_id, payload):
"Send a request to Kafka"
-
- log.debug(
- "About to send %d bytes to Kafka, request %d" %
- (len(payload), request_id))
-
- sent = self._sock.sendall(payload)
- if sent is not None:
- raise RuntimeError("Kafka went away")
+ log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
+ try:
+ if self._dirty:
+ self.reinit()
+ sent = self._sock.sendall(payload)
+ if sent is not None:
+ self._raise_connection_error()
+ except socket.error:
+ log.exception('Unable to send payload to Kafka')
+ self._raise_connection_error()
def recv(self, request_id):
"""
@@ -110,3 +117,4 @@ class KafkaConnection(local):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
+ self._dirty = False