diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 4c64cf2..fbc9f94 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -206,6 +206,8 @@ class SimpleConsumer(Consumer): auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit + fetch_size_bytes: number of bytes to request in a FetchRequest + Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the @@ -214,11 +216,12 @@ class SimpleConsumer(Consumer): """ def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL): + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + fetch_size_bytes=FETCH_MIN_BYTES): self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.fetch_min_bytes = FETCH_MIN_BYTES + self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false super(SimpleConsumer, self).__init__(client, group, topic, @@ -243,6 +246,7 @@ class SimpleConsumer(Consumer): 1 is relative to the current offset 2 is relative to the latest known offset (tail) """ + if whence == 1: # relative to current position for partition, _offset in self.offsets.items(): self.offsets[partition] = _offset + offset @@ -354,8 +358,7 @@ class SimpleConsumer(Consumer): offset += 1 while True: - # TODO: configure fetch size - req = FetchRequest(self.topic, partition, offset, 1024) + req = FetchRequest(self.topic, partition, offset, self.fetch_min_bytes) (resp,) = self.client.send_fetch_request([req], max_wait_time=self.fetch_max_wait_time, |