summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-06-01 00:08:16 -0400
committerDavid Arthur <mumrah@gmail.com>2013-06-01 00:10:47 -0400
commitacd388e4eab100b5cf23481780f0d836ca0c21fd (patch)
treedd21e726de91356689c6063057199418b7b16f2c /kafka/consumer.py
parent2c257eeb1f02748840a8f4535d8d2a88ef5235f2 (diff)
downloadkafka-python-issue-26.tar.gz
Starting work on fixing offset commitsissue-26
* Update the "public" offset before yielding the message * Add an option to SimpleConsumer.commit that excludes the current offset Ref #26
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index d09803a..4097fe3 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -140,12 +140,12 @@ class SimpleConsumer(object):
"""
Commit offsets as part of timer
"""
- self.commit()
+ self.commit(include_current_offset=False)
# Once the commit is done, start the timer again
self.commit_timer.start()
- def commit(self, partitions=[]):
+ def commit(self, partitions=[], include_current_offset=True):
"""
Commit offsets for this consumer
@@ -163,7 +163,10 @@ class SimpleConsumer(object):
partitions = self.offsets.keys()
for partition in partitions:
- offset = self.offsets[partition]
+ if include_current_offset is True:
+ offset = self.offsets[partition] + 1
+ else:
+ offset = self.offsets[partition]
log.debug("Commit offset %d in SimpleConsumer: "
"group=%s, topic=%s, partition=%s" %
(offset, self.group, self.topic, partition))
@@ -227,9 +230,8 @@ class SimpleConsumer(object):
next_offset = None
for message in resp.messages:
next_offset = message.offset
- yield message
- # update the internal state _after_ we yield the message
self.offsets[partition] = message.offset
+ yield message
if next_offset is None:
break
else: