diff options
author | mrtheb <mrlabbe@gmail.com> | 2014-01-31 22:43:59 -0500 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2014-01-31 22:43:59 -0500 |
commit | 84de472a4d5b583ff3ed6cc6d92250a7c9291ceb (patch) | |
tree | e3d03da4eeecf8eab2dc63cf113a4daf82addf72 /kafka/conn.py | |
parent | 0bdff4e833f73518a7219fca04dfbc3ed201b06e (diff) | |
parent | 4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb (diff) | |
download | kafka-python-84de472a4d5b583ff3ed6cc6d92250a7c9291ceb.tar.gz |
Merge branch 'master' into multihosts
Conflicts:
kafka/client.py
kafka/conn.py
setup.py
test/test_integration.py
test/test_unit.py
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 93 |
1 files changed, 45 insertions, 48 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 614b1bb..de2d385 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,11 +5,11 @@ import struct from random import shuffle from threading import local -from kafka.common import BufferUnderflowError from kafka.common import ConnectionError log = logging.getLogger("kafka") +DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 def collect_hosts(hosts, randomize=True): """ @@ -39,64 +39,53 @@ class KafkaConnection(local): by a call to `recv` in order to get the correct response. Eventually, we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. + + host: the host name or IP address of a kafka broker + port: the port number the kafka broker is listening on + timeout: default 120. The socket timeout for sending and receiving data + in seconds. None means no timeout, so a request can block forever. """ - def __init__(self, host, port, bufsize=4096, timeout=10): + def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): super(KafkaConnection, self).__init__() self.host = host self.port = port - self.bufsize = bufsize + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((host, port)) self.timeout = timeout - - self._sock = socket.create_connection((host, port), timeout=timeout) + self._sock.settimeout(self.timeout) self._dirty = False - def __str__(self): + def __repr__(self): return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) ################### # Private API # ################### - def _consume_response(self): - """ - Fully consumer the response iterator - """ - data = "" - for chunk in self._consume_response_iter(): - data += chunk - return data - - def _consume_response_iter(self): - """ - This method handles the response header and error messages. It - then returns an iterator for the chunks of the response - """ - log.debug("Handling response from Kafka") - - # Read the size off of the header - resp = self._sock.recv(4) - if resp == "": - self._raise_connection_error() - (size,) = struct.unpack('>i', resp) - - messagesize = size - 4 - log.debug("About to read %d bytes from Kafka", messagesize) - - # Read the remainder of the response - total = 0 - while total < messagesize: - resp = self._sock.recv(self.bufsize) - log.debug("Read %d bytes from Kafka", len(resp)) - if resp == "": - raise BufferUnderflowError( - "Not enough data to read this response") - - total += len(resp) - yield resp - def _raise_connection_error(self): self._dirty = True - raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port)) + raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) + + def _read_bytes(self, num_bytes): + bytes_left = num_bytes + resp = '' + log.debug("About to read %d bytes from Kafka", num_bytes) + if self._dirty: + self.reinit() + while bytes_left: + try: + data = self._sock.recv(bytes_left) + except socket.error: + log.exception('Unable to receive data from Kafka') + self._raise_connection_error() + if data == '': + log.error("Not enough data to read this response") + self._raise_connection_error() + bytes_left -= len(data) + log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) + resp += data + + return resp ################## # Public API # @@ -113,7 +102,7 @@ class KafkaConnection(local): sent = self._sock.sendall(payload) if sent is not None: self._raise_connection_error() - except socket.error: + except socket.error, e: log.exception('Unable to send payload to Kafka') self._raise_connection_error() @@ -122,8 +111,14 @@ class KafkaConnection(local): Get a response from Kafka """ log.debug("Reading response %d from Kafka" % request_id) - self.data = self._consume_response() - return self.data + # Read the size off of the header + resp = self._read_bytes(4) + + (size,) = struct.unpack('>i', resp) + + # Read the remainder of the response + resp = self._read_bytes(size) + return str(resp) def copy(self): """ @@ -146,5 +141,7 @@ class KafkaConnection(local): Re-initialize the socket connection """ self.close() - self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout) + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((self.host, self.port)) + self._sock.settimeout(self.timeout) self._dirty = False |