diff options
author | Mark Roberts <markroberts@kixeye.com> | 2014-02-25 01:06:17 -0800 |
---|---|---|
committer | Mark Roberts <markroberts@kixeye.com> | 2014-02-25 11:26:06 -0800 |
commit | 9732ed1670ef0739956900df37c0c77699628ec7 (patch) | |
tree | 641572cdbd91dcaf60cd42723b46bb2c6f626d32 /kafka/conn.py | |
parent | ee7e86ea712de0a0390e64752c5cf9180c1681b5 (diff) | |
download | kafka-python-9732ed1670ef0739956900df37c0c77699628ec7.tar.gz |
Minor refactor in conn.py, update version in __init__.py, add ErrorString
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 2b8f1c2..cc946fc 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -28,11 +28,10 @@ class KafkaConnection(local): super(KafkaConnection, self).__init__() self.host = host self.port = port - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) self.timeout = timeout - self._sock.settimeout(self.timeout) - self._dirty = False + self._sock = None + + self.reinit() def __repr__(self): return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) @@ -47,24 +46,28 @@ class KafkaConnection(local): def _read_bytes(self, num_bytes): bytes_left = num_bytes - resp = '' + responses = [] + log.debug("About to read %d bytes from Kafka", num_bytes) if self._dirty: self.reinit() + while bytes_left: try: - data = self._sock.recv(bytes_left) + data = self._sock.recv(min(bytes_left, 4096)) except socket.error: log.exception('Unable to receive data from Kafka') self._raise_connection_error() + if data == '': log.error("Not enough data to read this response") self._raise_connection_error() + bytes_left -= len(data) log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) - resp += data + responses.append(data) - return resp + return ''.join(responses) ################## # Public API # |