summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py76
1 files changed, 34 insertions, 42 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 1a3e260..c80f428 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -4,7 +4,6 @@ import socket
import struct
from threading import local
-from kafka.common import BufferUnderflowError
from kafka.common import ConnectionError
log = logging.getLogger("kafka")
@@ -19,14 +18,14 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
- def __init__(self, host, port, bufsize=4096):
+ def __init__(self, host, port, timeout=10):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
- self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
- self._sock.settimeout(10)
+ self.timeout = timeout
+ self._sock.settimeout(self.timeout)
self._dirty = False
def __str__(self):
@@ -36,44 +35,31 @@ class KafkaConnection(local):
# Private API #
###################
- def _consume_response(self):
- """
- Fully consume the response iterator
- """
- return "".join(self._consume_response_iter())
-
- def _consume_response_iter(self):
- """
- This method handles the response header and error messages. It
- then returns an iterator for the chunks of the response
- """
- log.debug("Handling response from Kafka")
-
- # Read the size off of the header
- resp = self._sock.recv(4)
- if resp == "":
- self._raise_connection_error()
- (size,) = struct.unpack('>i', resp)
-
- messagesize = size - 4
- log.debug("About to read %d bytes from Kafka", messagesize)
-
- # Read the remainder of the response
- total = 0
- while total < messagesize:
- resp = self._sock.recv(self.bufsize)
- log.debug("Read %d bytes from Kafka", len(resp))
- if resp == "":
- raise BufferUnderflowError(
- "Not enough data to read this response")
-
- total += len(resp)
- yield resp
-
def _raise_connection_error(self):
self._dirty = True
raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
+ def _read_bytes(self, num_bytes):
+ bytes_left = num_bytes
+ resp = ''
+ log.debug("About to read %d bytes from Kafka", num_bytes)
+ if self._dirty:
+ self.reinit()
+ while bytes_left:
+ try:
+ data = self._sock.recv(bytes_left)
+ except socket.error:
+ log.exception('Unable to receive data from Kafka')
+ self._raise_connection_error()
+ if data == '':
+ log.error("Not enough data to read this response")
+ self._raise_connection_error()
+ bytes_left -= len(data)
+ log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
+ resp += data
+
+ return resp
+
##################
# Public API #
##################
@@ -89,7 +75,7 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
- except socket.error:
+ except socket.error, e:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()
@@ -98,8 +84,14 @@ class KafkaConnection(local):
Get a response from Kafka
"""
log.debug("Reading response %d from Kafka" % request_id)
- self.data = self._consume_response()
- return self.data
+ # Read the size off of the header
+ resp = self._read_bytes(4)
+
+ (size,) = struct.unpack('>i', resp)
+
+ # Read the remainder of the response
+ resp = self._read_bytes(size)
+ return str(resp)
def copy(self):
"""
@@ -124,5 +116,5 @@ class KafkaConnection(local):
self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
- self._sock.settimeout(10)
+ self._sock.settimeout(self.timeout)
self._dirty = False