summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorMark Roberts <markroberts@kixeye.com>2014-02-25 01:06:17 -0800
committerMark Roberts <markroberts@kixeye.com>2014-02-25 11:26:06 -0800
commit9732ed1670ef0739956900df37c0c77699628ec7 (patch)
tree641572cdbd91dcaf60cd42723b46bb2c6f626d32 /kafka/conn.py
parentee7e86ea712de0a0390e64752c5cf9180c1681b5 (diff)
downloadkafka-python-9732ed1670ef0739956900df37c0c77699628ec7.tar.gz
Minor refactor in conn.py, update version in __init__.py, add ErrorString
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py19
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 2b8f1c2..cc946fc 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -28,11 +28,10 @@ class KafkaConnection(local):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.connect((host, port))
self.timeout = timeout
- self._sock.settimeout(self.timeout)
- self._dirty = False
+ self._sock = None
+
+ self.reinit()
def __repr__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
@@ -47,24 +46,28 @@ class KafkaConnection(local):
def _read_bytes(self, num_bytes):
bytes_left = num_bytes
- resp = ''
+ responses = []
+
log.debug("About to read %d bytes from Kafka", num_bytes)
if self._dirty:
self.reinit()
+
while bytes_left:
try:
- data = self._sock.recv(bytes_left)
+ data = self._sock.recv(min(bytes_left, 4096))
except socket.error:
log.exception('Unable to receive data from Kafka')
self._raise_connection_error()
+
if data == '':
log.error("Not enough data to read this response")
self._raise_connection_error()
+
bytes_left -= len(data)
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
- resp += data
+ responses.append(data)
- return resp
+ return ''.join(responses)
##################
# Public API #