summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py11
1 files changed, 7 insertions, 4 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 4c64cf2..fbc9f94 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -206,6 +206,8 @@ class SimpleConsumer(Consumer):
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
+ fetch_size_bytes: number of bytes to request in a FetchRequest
+
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
reset one another when one is triggered. These triggers simply call the
@@ -214,11 +216,12 @@ class SimpleConsumer(Consumer):
"""
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
- auto_commit_every_t=AUTO_COMMIT_INTERVAL):
+ auto_commit_every_t=AUTO_COMMIT_INTERVAL,
+ fetch_size_bytes=FETCH_MIN_BYTES):
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.fetch_min_bytes = FETCH_MIN_BYTES
+ self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
super(SimpleConsumer, self).__init__(client, group, topic,
@@ -243,6 +246,7 @@ class SimpleConsumer(Consumer):
1 is relative to the current offset
2 is relative to the latest known offset (tail)
"""
+
if whence == 1: # relative to current position
for partition, _offset in self.offsets.items():
self.offsets[partition] = _offset + offset
@@ -354,8 +358,7 @@ class SimpleConsumer(Consumer):
offset += 1
while True:
- # TODO: configure fetch size
- req = FetchRequest(self.topic, partition, offset, 1024)
+ req = FetchRequest(self.topic, partition, offset, self.fetch_min_bytes)
(resp,) = self.client.send_fetch_request([req],
max_wait_time=self.fetch_max_wait_time,