summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2014-01-31 22:43:59 -0500
committermrtheb <mrlabbe@gmail.com>2014-01-31 22:43:59 -0500
commit84de472a4d5b583ff3ed6cc6d92250a7c9291ceb (patch)
treee3d03da4eeecf8eab2dc63cf113a4daf82addf72 /kafka/conn.py
parent0bdff4e833f73518a7219fca04dfbc3ed201b06e (diff)
parent4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb (diff)
downloadkafka-python-84de472a4d5b583ff3ed6cc6d92250a7c9291ceb.tar.gz
Merge branch 'master' into multihosts
Conflicts: kafka/client.py kafka/conn.py setup.py test/test_integration.py test/test_unit.py
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py93
1 files changed, 45 insertions, 48 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 614b1bb..de2d385 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -5,11 +5,11 @@ import struct
from random import shuffle
from threading import local
-from kafka.common import BufferUnderflowError
from kafka.common import ConnectionError
log = logging.getLogger("kafka")
+DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
def collect_hosts(hosts, randomize=True):
"""
@@ -39,64 +39,53 @@ class KafkaConnection(local):
by a call to `recv` in order to get the correct response. Eventually,
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
+
+ host: the host name or IP address of a kafka broker
+ port: the port number the kafka broker is listening on
+ timeout: default 120. The socket timeout for sending and receiving data
+ in seconds. None means no timeout, so a request can block forever.
"""
- def __init__(self, host, port, bufsize=4096, timeout=10):
+ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
- self.bufsize = bufsize
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._sock.connect((host, port))
self.timeout = timeout
-
- self._sock = socket.create_connection((host, port), timeout=timeout)
+ self._sock.settimeout(self.timeout)
self._dirty = False
- def __str__(self):
+ def __repr__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
###################
# Private API #
###################
- def _consume_response(self):
- """
- Fully consumer the response iterator
- """
- data = ""
- for chunk in self._consume_response_iter():
- data += chunk
- return data
-
- def _consume_response_iter(self):
- """
- This method handles the response header and error messages. It
- then returns an iterator for the chunks of the response
- """
- log.debug("Handling response from Kafka")
-
- # Read the size off of the header
- resp = self._sock.recv(4)
- if resp == "":
- self._raise_connection_error()
- (size,) = struct.unpack('>i', resp)
-
- messagesize = size - 4
- log.debug("About to read %d bytes from Kafka", messagesize)
-
- # Read the remainder of the response
- total = 0
- while total < messagesize:
- resp = self._sock.recv(self.bufsize)
- log.debug("Read %d bytes from Kafka", len(resp))
- if resp == "":
- raise BufferUnderflowError(
- "Not enough data to read this response")
-
- total += len(resp)
- yield resp
-
def _raise_connection_error(self):
self._dirty = True
- raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
+ raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
+
+ def _read_bytes(self, num_bytes):
+ bytes_left = num_bytes
+ resp = ''
+ log.debug("About to read %d bytes from Kafka", num_bytes)
+ if self._dirty:
+ self.reinit()
+ while bytes_left:
+ try:
+ data = self._sock.recv(bytes_left)
+ except socket.error:
+ log.exception('Unable to receive data from Kafka')
+ self._raise_connection_error()
+ if data == '':
+ log.error("Not enough data to read this response")
+ self._raise_connection_error()
+ bytes_left -= len(data)
+ log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
+ resp += data
+
+ return resp
##################
# Public API #
@@ -113,7 +102,7 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
- except socket.error:
+ except socket.error, e:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()
@@ -122,8 +111,14 @@ class KafkaConnection(local):
Get a response from Kafka
"""
log.debug("Reading response %d from Kafka" % request_id)
- self.data = self._consume_response()
- return self.data
+ # Read the size off of the header
+ resp = self._read_bytes(4)
+
+ (size,) = struct.unpack('>i', resp)
+
+ # Read the remainder of the response
+ resp = self._read_bytes(size)
+ return str(resp)
def copy(self):
"""
@@ -146,5 +141,7 @@ class KafkaConnection(local):
Re-initialize the socket connection
"""
self.close()
- self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._sock.connect((self.host, self.port))
+ self._sock.settimeout(self.timeout)
self._dirty = False