summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2013-10-01 15:15:36 -0400
committerDavid Arthur <mumrah@gmail.com>2013-10-03 11:31:25 -0400
commitdeb584c90802d382ca376331a8bf31bfb12d94dc (patch)
treeb7ba890bda6d207d5a1d2f9c4940433c20056364
parentc304e3d4b080ce720e32abb820ab9120b8669d23 (diff)
downloadkafka-python-deb584c90802d382ca376331a8bf31bfb12d94dc.tar.gz
Cherry-pick mrtheb/kafka-python 2b016b69
Set FetchRequest MaxBytes value to bufsize instead of fetchsize (=MinBytes)
-rw-r--r--kafka/consumer.py5
-rw-r--r--kafka/util.py7
2 files changed, 8 insertions, 4 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index c338337..3efffdf 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -359,7 +359,10 @@ class SimpleConsumer(Consumer):
fetch_size = self.fetch_min_bytes
while True:
- req = FetchRequest(self.topic, partition, offset, fetch_size)
+ # use MaxBytes = client's bufsize since we're only
+ # fetching one topic + partition
+ req = FetchRequest(
+ self.topic, partition, offset, self.client.bufsize)
(resp,) = self.client.send_fetch_request([req],
max_wait_time=self.fetch_max_wait_time,
diff --git a/kafka/util.py b/kafka/util.py
index 259e285..598ba84 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -38,7 +38,8 @@ def read_short_string(data, cur):
def read_int_string(data, cur):
if len(data) < cur + 4:
- raise BufferUnderflowError("Not enough data left")
+ raise BufferUnderflowError(
+ "Not enough data left to read string len (%d < %d)" % (len(data), cur + 4))
(strLen,) = struct.unpack('>i', data[cur:cur + 4])
if strLen == -1:
@@ -46,7 +47,8 @@ def read_int_string(data, cur):
cur += 4
if len(data) < cur + strLen:
- raise BufferUnderflowError("Not enough data left")
+ raise BufferUnderflowError(
+ "Not enough data left to read string (%d < %d)" % (len(data), cur + strLen))
out = data[cur:cur + strLen]
return (out, cur + strLen)
@@ -68,7 +70,6 @@ def group_by_topic_and_partition(tuples):
return out
-
class ReentrantTimer(object):
"""
A timer that can be restarted, unlike threading.Timer