diff options
-rw-r--r-- | kafka/conn.py | 76 |
1 files changed, 65 insertions, 11 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 0602d70..fee44c4 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,6 +1,8 @@ +from collections import deque import copy import logging from random import shuffle +from select import select import socket import struct from threading import local @@ -24,26 +26,78 @@ class BrokerConnection(local): self.host = host self.port = port self.timeout = timeout - self._sock = socket.create_connection((host, port), timeout) - self.fd = self._sock.makefile(mode='+') + self._write_fd = None + self._read_fd = None self.correlation_id = 0 + self.in_flight_requests = deque() + + def connect(self): + if self.connected(): + self.close() + try: + sock = socket.create_connection((self.host, self.port), self.timeout) + self._write_fd = sock.makefile('wb') + self._read_fd = sock.makefile('rb') + except socket.error as e: + log.exception("Error in BrokerConnection.connect()") + return None + self.in_flight_requests.clear() + return True + + def connected(self): + return (self._read_fd is not None and self._write_fd is not None) def close(self): - self.fd.close() - self._sock.close() + if self.connected(): + try: + self._read_fd.close() + self._write_fd.close() + except socket.error as e: + log.exception("Error in BrokerConnection.close()") + pass + self._read_fd = None + self._write_fd = None + self.in_flight_requests.clear() def send(self, request): + if not self.connected() and not self.connect(): + return None self.correlation_id += 1 header = RequestHeader(request, correlation_id=self.correlation_id) message = b''.join([header.encode(), request.encode()]) size = Int32.encode(len(message)) - self.fd.write(size) - self.fd.write(message) - self.fd.flush() - - size = Int32.decode(self.fd) - correlation_id = Int32.decode(self.fd) - return request.RESPONSE_TYPE.decode(self.fd) + try: + self._write_fd.write(size) + self._write_fd.write(message) + self._write_fd.flush() + except socket.error as e: + log.exception("Error in BrokerConnection.send()") + self.close() + return None + self.in_flight_requests.append((self.correlation_id, request.RESPONSE_TYPE)) + return self.correlation_id + + def recv(self, timeout=None): + if not self.connected(): + return None + readable, _, _ = select([self._read_fd], [], [], timeout) + if not readable: + return None + correlation_id, response_type = self.in_flight_requests.popleft() + # Current implementation does not use size + # instead we read directly from the socket fd buffer + # alternatively, we could read size bytes into a separate buffer + # and decode from that buffer (and verify buffer is empty afterwards) + size = Int32.decode(self._read_fd) + recv_correlation_id = Int32.decode(self._read_fd) + assert correlation_id == recv_correlation_id + try: + response = response_type.decode(self._read_fd) + except socket.error as e: + log.exception("Error in BrokerConnection.recv()") + self.close() + return None + return response def __getnewargs__(self): return (self.host, self.port, self.timeout) |