diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 78 |
1 files changed, 60 insertions, 18 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index b5dafd8..0d17cb8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -67,7 +67,11 @@ class KafkaConnection(local): ################### def _raise_connection_error(self): - self._dirty = True + # Cleanup socket if we have one + if self._sock: + self.close() + + # And then raise raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) def _read_bytes(self, num_bytes): @@ -75,20 +79,26 @@ class KafkaConnection(local): responses = [] log.debug("About to read %d bytes from Kafka", num_bytes) - if self._dirty: + + # Make sure we have a connection + if not self._sock: self.reinit() while bytes_left: + try: data = self._sock.recv(min(bytes_left, 4096)) + + # Receiving empty string from recv signals + # that the socket is in error. we will never get + # more data from this socket + if data == '': + raise socket.error('Not enough data to read message -- did server kill socket?') + except socket.error: log.exception('Unable to receive data from Kafka') self._raise_connection_error() - if data == '': - log.error("Not enough data to read this response") - self._raise_connection_error() - bytes_left -= len(data) log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) responses.append(data) @@ -102,26 +112,34 @@ class KafkaConnection(local): # TODO multiplex socket communication to allow for multi-threaded clients def send(self, request_id, payload): - "Send a request to Kafka" + """ + Send a request to Kafka + param: request_id -- can be any int (used only for debug logging...) + param: payload -- an encoded kafka packet (see KafkaProtocol) + """ + log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) + + # Make sure we have a connection + if not self._sock: + self.reinit() + try: - if self._dirty: - self.reinit() - sent = self._sock.sendall(payload) - if sent is not None: - self._raise_connection_error() + self._sock.sendall(payload) except socket.error: log.exception('Unable to send payload to Kafka') self._raise_connection_error() def recv(self, request_id): """ - Get a response from Kafka + Get a response packet from Kafka + param: request_id -- can be any int (only used for debug logging...) + returns encoded kafka packet response from server as type str """ log.debug("Reading response %d from Kafka" % request_id) + # Read the size off of the header resp = self._read_bytes(4) - (size,) = struct.unpack('>i', resp) # Read the remainder of the response @@ -132,6 +150,7 @@ class KafkaConnection(local): """ Create an inactive copy of the connection object A reinit() has to be done on the copy before it can be used again + return a new KafkaConnection object """ c = copy.deepcopy(self) c._sock = None @@ -139,15 +158,38 @@ class KafkaConnection(local): def close(self): """ - Close this connection + Shutdown and close the connection socket """ + log.debug("Closing socket connection for %s:%d" % (self.host, self.port)) if self._sock: + # Call shutdown to be a good TCP client + # But expect an error if the socket has already been + # closed by the server + try: + self._sock.shutdown(socket.SHUT_RDWR) + except socket.error: + pass + + # Closing the socket should always succeed self._sock.close() + self._sock = None + else: + log.debug("No socket found to close!") def reinit(self): """ Re-initialize the socket connection + close current socket (if open) + and start a fresh connection + raise ConnectionError on error """ - self.close() - self._sock = socket.create_connection((self.host, self.port), self.timeout) - self._dirty = False + log.debug("Reinitializing socket connection for %s:%d" % (self.host, self.port)) + + if self._sock: + self.close() + + try: + self._sock = socket.create_connection((self.host, self.port), self.timeout) + except socket.error: + log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port)) + self._raise_connection_error() |