summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2013-12-18 17:51:22 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-06 15:14:50 -0800
commit8c8ca5fa573c21e0f03c892154ba42e187153600 (patch)
tree5939b4442eb8e72c76f3f48bf9f874d8eb2a0aad /kafka
parent0f2b08d80217fb82860c51e05e819012f6acb521 (diff)
downloadkafka-python-8c8ca5fa573c21e0f03c892154ba42e187153600.tar.gz
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn, since they're not actually enforced Notes: This commit changes behavior a bit by raising a BufferUnderflowError when no data is received for the message size rather than a ConnectionError. Since bufsize in the socket is not actually enforced, but it is used by the consumer when creating requests, moving it there until a better solution is implemented.
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py7
-rw-r--r--kafka/conn.py48
-rw-r--r--kafka/consumer.py7
3 files changed, 30 insertions, 32 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 9659364..bd3a214 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -19,13 +19,12 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()
- def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10):
+ def __init__(self, host, port, client_id=CLIENT_ID, timeout=10):
# We need one connection to bootstrap
- self.bufsize = bufsize
self.client_id = client_id
self.timeout = timeout
self.conns = { # (host, port) -> KafkaConnection
- (host, port): KafkaConnection(host, port, bufsize, timeout=timeout)
+ (host, port): KafkaConnection(host, port, timeout=timeout)
}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
@@ -42,7 +41,7 @@ class KafkaClient(object):
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
- KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout)
+ KafkaConnection(broker.host, broker.port, timeout=self.timeout)
return self.conns[(broker.host, broker.port)]
diff --git a/kafka/conn.py b/kafka/conn.py
index 1997804..ca62f52 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -19,11 +19,10 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
- def __init__(self, host, port, bufsize=4098, timeout=10):
+ def __init__(self, host, port, timeout=10):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
- self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(timeout)
@@ -36,38 +35,35 @@ class KafkaConnection(local):
# Private API #
###################
- def _consume_response(self):
- """
- Fully consume the response iterator
- """
- return "".join(self._consume_response_iter())
+ def _read_bytes(self, num_bytes):
+ bytes_left = num_bytes
+ resp = ''
+ log.debug("About to read %d bytes from Kafka", num_bytes)
+
+ while bytes_left:
+ data = self._sock.recv(bytes_left)
+ if data == '':
+ raise BufferUnderflowError("Not enough data to read this response")
+ bytes_left -= len(data)
+ log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
+ resp += data
+
+ return resp
- def _consume_response_iter(self):
+ def _consume_response(self):
"""
This method handles the response header and error messages. It
- then returns an iterator for the chunks of the response
+ then returns the response
"""
- log.debug("Handling response from Kafka")
-
+ log.debug("Expecting response from Kafka")
# Read the size off of the header
- resp = self._sock.recv(4)
- if resp == "":
- self._raise_connection_error()
- (size,) = struct.unpack('>i', resp)
+ resp = self._read_bytes(4)
- log.debug("About to read %d bytes from Kafka", size)
+ (size,) = struct.unpack('>i', resp)
# Read the remainder of the response
- total = 0
- while total < size:
- resp = self._sock.recv(self.bufsize)
- log.debug("Read %d bytes from Kafka", len(resp))
- if resp == "":
- raise BufferUnderflowError(
- "Not enough data to read this response")
-
- total += len(resp)
- yield resp
+ resp = self._read_bytes(size)
+ return str(resp)
def _raise_connection_error(self):
self._dirty = True
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 57b5b97..bead1dd 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -22,6 +22,7 @@ AUTO_COMMIT_INTERVAL = 5000
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
+FETCH_BUFFER_SIZE_BYTES = 4096
class FetchContext(object):
@@ -216,8 +217,10 @@ class SimpleConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- fetch_size_bytes=FETCH_MIN_BYTES):
+ fetch_size_bytes=FETCH_MIN_BYTES,
+ buffer_size=FETCH_BUFFER_SIZE_BYTES):
+ self.buffer_size = buffer_size
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
@@ -364,7 +367,7 @@ class SimpleConsumer(Consumer):
# use MaxBytes = client's bufsize since we're only
# fetching one topic + partition
req = FetchRequest(
- self.topic, partition, offset, self.client.bufsize)
+ self.topic, partition, offset, self.buffer_size)
(resp,) = self.client.send_fetch_request(
[req],