diff options
author | mrtheb <mrlabbe@gmail.com> | 2013-10-01 15:15:36 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-10-03 11:31:25 -0400 |
commit | deb584c90802d382ca376331a8bf31bfb12d94dc (patch) | |
tree | b7ba890bda6d207d5a1d2f9c4940433c20056364 | |
parent | c304e3d4b080ce720e32abb820ab9120b8669d23 (diff) | |
download | kafka-python-deb584c90802d382ca376331a8bf31bfb12d94dc.tar.gz |
Cherry-pick mrtheb/kafka-python 2b016b69
Set FetchRequest MaxBytes value to bufsize instead of fetchsize (=MinBytes)
-rw-r--r-- | kafka/consumer.py | 5 | ||||
-rw-r--r-- | kafka/util.py | 7 |
2 files changed, 8 insertions, 4 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index c338337..3efffdf 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -359,7 +359,10 @@ class SimpleConsumer(Consumer): fetch_size = self.fetch_min_bytes while True: - req = FetchRequest(self.topic, partition, offset, fetch_size) + # use MaxBytes = client's bufsize since we're only + # fetching one topic + partition + req = FetchRequest( + self.topic, partition, offset, self.client.bufsize) (resp,) = self.client.send_fetch_request([req], max_wait_time=self.fetch_max_wait_time, diff --git a/kafka/util.py b/kafka/util.py index 259e285..598ba84 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -38,7 +38,8 @@ def read_short_string(data, cur): def read_int_string(data, cur): if len(data) < cur + 4: - raise BufferUnderflowError("Not enough data left") + raise BufferUnderflowError( + "Not enough data left to read string len (%d < %d)" % (len(data), cur + 4)) (strLen,) = struct.unpack('>i', data[cur:cur + 4]) if strLen == -1: @@ -46,7 +47,8 @@ def read_int_string(data, cur): cur += 4 if len(data) < cur + strLen: - raise BufferUnderflowError("Not enough data left") + raise BufferUnderflowError( + "Not enough data left to read string (%d < %d)" % (len(data), cur + strLen)) out = data[cur:cur + strLen] return (out, cur + strLen) @@ -68,7 +70,6 @@ def group_by_topic_and_partition(tuples): return out - class ReentrantTimer(object): """ A timer that can be restarted, unlike threading.Timer |