diff options
author | David Arthur <mumrah@gmail.com> | 2013-06-01 00:08:16 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-06-01 00:10:47 -0400 |
commit | acd388e4eab100b5cf23481780f0d836ca0c21fd (patch) | |
tree | dd21e726de91356689c6063057199418b7b16f2c /kafka/consumer.py | |
parent | 2c257eeb1f02748840a8f4535d8d2a88ef5235f2 (diff) | |
download | kafka-python-issue-26.tar.gz |
Starting work on fixing offset commitsissue-26
* Update the "public" offset before yielding the message
* Add an option to SimpleConsumer.commit that excludes the current
offset
Ref #26
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index d09803a..4097fe3 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -140,12 +140,12 @@ class SimpleConsumer(object): """ Commit offsets as part of timer """ - self.commit() + self.commit(include_current_offset=False) # Once the commit is done, start the timer again self.commit_timer.start() - def commit(self, partitions=[]): + def commit(self, partitions=[], include_current_offset=True): """ Commit offsets for this consumer @@ -163,7 +163,10 @@ class SimpleConsumer(object): partitions = self.offsets.keys() for partition in partitions: - offset = self.offsets[partition] + if include_current_offset is True: + offset = self.offsets[partition] + 1 + else: + offset = self.offsets[partition] log.debug("Commit offset %d in SimpleConsumer: " "group=%s, topic=%s, partition=%s" % (offset, self.group, self.topic, partition)) @@ -227,9 +230,8 @@ class SimpleConsumer(object): next_offset = None for message in resp.messages: next_offset = message.offset - yield message - # update the internal state _after_ we yield the message self.offsets[partition] = message.offset + yield message if next_offset is None: break else: |