diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 85 |
1 files changed, 85 insertions, 0 deletions
diff --git a/kafka/conn.py b/kafka/conn.py new file mode 100644 index 0000000..203488b --- /dev/null +++ b/kafka/conn.py @@ -0,0 +1,85 @@ +import logging +import socket +import struct + +log = logging.getLogger("kafka") + +class KafkaConnection(object): + """ + A socket connection to a single Kafka broker + + This class is _not_ thread safe. Each call to `send` must be followed + by a call to `recv` in order to get the correct response. Eventually, + we can do something in here to facilitate multiplexed requests/responses + since the Kafka API includes a correlation id. + """ + def __init__(self, host, port, bufsize=4096): + self.host = host + self.port = port + self.bufsize = bufsize + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((host, port)) + self._sock.settimeout(10) + + def __str__(self): + return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) + + ################### + # Private API # + ################### + + def _consume_response(self): + """ + Fully consumer the response iterator + """ + data = "" + for chunk in self._consume_response_iter(): + data += chunk + return data + + def _consume_response_iter(self): + """ + This method handles the response header and error messages. It + then returns an iterator for the chunks of the response + """ + log.debug("Handling response from Kafka") + + # Read the size off of the header + resp = self._sock.recv(4) + if resp == "": + raise Exception("Got no response from Kafka") + (size,) = struct.unpack('>i', resp) + + messageSize = size - 4 + log.debug("About to read %d bytes from Kafka", messageSize) + + # Read the remainder of the response + total = 0 + while total < messageSize: + resp = self._sock.recv(self.bufsize) + log.debug("Read %d bytes from Kafka", len(resp)) + if resp == "": + raise BufferUnderflowError("Not enough data to read this response") + total += len(resp) + yield resp + + ################## + # Public API # + ################## + + # TODO multiplex socket communication to allow for multi-threaded clients + + def send(self, requestId, payload): + "Send a request to Kafka" + sent = self._sock.sendall(payload) + if sent == 0: + raise RuntimeError("Kafka went away") + self.data = self._consume_response() + + def recv(self, requestId): + "Get a response from Kafka" + return self.data + + def close(self): + "Close this connection" + self._sock.close() |