diff options
-rw-r--r-- | kafka/consumer.py | 2 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 4 |
2 files changed, 3 insertions, 3 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index d855874..3f8d8c2 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -164,7 +164,7 @@ class Consumer(object): if not self.auto_commit or self.auto_commit_every_n is None: return - if self.count_since_commit > self.auto_commit_every_n: + if self.count_since_commit >= self.auto_commit_every_n: self.commit() def stop(self): diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 63d2dda..9300021 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -228,13 +228,13 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ] self.assert_message_count(output_msgs1, 195) - # The offset should be at 180 + # The total offset across both partitions should be at 180 consumer2 = self.consumer( auto_commit_every_t = 600, auto_commit_every_n = 20, ) - # 180-200 + # 181-200 self.assert_message_count([ message for message in consumer2 ], 20) consumer1.stop() |