summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2014-01-15 14:28:48 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-15 16:26:15 -0800
commit8cc36dd7a1c7691e5c26b47cb667bc48054594a0 (patch)
treee8244038c2394748bfac12799b23bf7548d9bc94
parentc36cb618d2ba5fcba118dd8b87bf51f26074c8f1 (diff)
downloadkafka-python-8cc36dd7a1c7691e5c26b47cb667bc48054594a0.tar.gz
Store fetched offsets separately.
Fetch requests can be repeated if we get a ConsumerFetchSizeTooSmall or if _fetch() is called multiple times for some reason. We don't want to re-fetch messages that are already in our queue, so store the offsets of the last enqueued messages from each partition.
-rw-r--r--kafka/consumer.py24
1 files changed, 14 insertions, 10 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 657024f..b202b23 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -235,6 +235,12 @@ class SimpleConsumer(Consumer):
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None):
+ super(SimpleConsumer, self).__init__(
+ client, group, topic,
+ partitions=partitions,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
if max_buffer_size is not None and buffer_size > max_buffer_size:
raise ValueError("buffer_size (%d) is greater than "
@@ -245,17 +251,10 @@ class SimpleConsumer(Consumer):
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
- self.fetch_started = defaultdict(bool) # defaults to false
+ self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
self.queue = Queue()
- super(SimpleConsumer, self).__init__(
- client, group, topic,
- partitions=partitions,
- auto_commit=auto_commit,
- auto_commit_every_n=auto_commit_every_n,
- auto_commit_every_t=auto_commit_every_t)
-
def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
@@ -301,6 +300,10 @@ class SimpleConsumer(Consumer):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
+ # Reset queue and fetch offsets since they are invalid
+ self.fetch_offsets = self.offsets.copy()
+ self.queue = Queue()
+
def get_messages(self, count=1, block=True, timeout=0.1):
"""
Fetch the specified number of messages
@@ -375,11 +378,11 @@ class SimpleConsumer(Consumer):
def _fetch(self):
# Create fetch request payloads for all the partitions
requests = []
- partitions = self.offsets.keys()
+ partitions = self.fetch_offsets.keys()
while partitions:
for partition in partitions:
requests.append(FetchRequest(self.topic, partition,
- self.offsets[partition],
+ self.fetch_offsets[partition],
self.buffer_size))
# Send request
responses = self.client.send_fetch_request(
@@ -394,6 +397,7 @@ class SimpleConsumer(Consumer):
for message in resp.messages:
# Put the message in our queue
self.queue.put((partition, message))
+ self.fetch_offsets[partition] = message.offset + 1
except ConsumerFetchSizeTooSmall, e:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):