diff options
author | Jim Lim <jim@quixey.com> | 2013-09-27 14:02:10 -0700 |
---|---|---|
committer | Jim Lim <jim@quixey.com> | 2013-10-04 14:49:44 -0700 |
commit | a6c99b287b1cf9c39068be74d72150808588dd43 (patch) | |
tree | 0c036a3d0691c6c7f69166bdc373fbe4e79b2705 /kafka/conn.py | |
parent | cfd9f86e60429d1f7af8bcac5849808354b8719e (diff) | |
download | kafka-python-a6c99b287b1cf9c39068be74d72150808588dd43.tar.gz |
make changes to be more fault tolerant: clean up connections, brokers, failed_messages
- add integration tests for sync producer
- add integration tests for async producer w. leadership election
- use log.exception
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 28 |
1 files changed, 18 insertions, 10 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index e85fd11..9356731 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -4,7 +4,7 @@ import struct from threading import local from kafka.common import BufferUnderflowError - +from kafka.common import ConnectionError log = logging.getLogger("kafka") @@ -26,6 +26,7 @@ class KafkaConnection(local): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._sock.settimeout(10) + self._dirty = False def __str__(self): return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) @@ -53,7 +54,7 @@ class KafkaConnection(local): # Read the size off of the header resp = self._sock.recv(4) if resp == "": - raise Exception("Got no response from Kafka") + self._raise_connection_error() (size,) = struct.unpack('>i', resp) messagesize = size - 4 @@ -71,6 +72,10 @@ class KafkaConnection(local): total += len(resp) yield resp + def _raise_connection_error(self): + self._dirty = True + raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port)) + ################## # Public API # ################## @@ -79,14 +84,16 @@ class KafkaConnection(local): def send(self, request_id, payload): "Send a request to Kafka" - - log.debug( - "About to send %d bytes to Kafka, request %d" % - (len(payload), request_id)) - - sent = self._sock.sendall(payload) - if sent is not None: - raise RuntimeError("Kafka went away") + log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) + try: + if self._dirty: + self.reinit() + sent = self._sock.sendall(payload) + if sent is not None: + self._raise_connection_error() + except socket.error: + log.exception('Unable to send payload to Kafka') + self._raise_connection_error() def recv(self, request_id): """ @@ -110,3 +117,4 @@ class KafkaConnection(local): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.host, self.port)) self._sock.settimeout(10) + self._dirty = False |