diff options
-rw-r--r-- | kafka/consumer/kafka.py | 11 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 43 |
2 files changed, 48 insertions, 6 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 43e8c55..705c70d 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -485,12 +485,11 @@ class KafkaConsumer(object): offset, prev_done) # Warn on smaller offsets than previous commit - # "commit" offsets are actually the offset of the next # message to fetch. - # so task_done should be compared with (commit - 1) - prev_done = (self._offsets.commit[topic_partition] - 1) - if prev_done is not None and (offset <= prev_done): - logger.warning('Marking task_done on a previously committed offset?: %d <= %d', - offset, prev_done) + # "commit" offsets are actually the offset of the next message to fetch. + prev_commit = self._offsets.commit[topic_partition] + if prev_commit is not None and ((offset + 1) <= prev_commit): + logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d', + offset, prev_commit) self._offsets.task_done[topic_partition] = offset diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index c97081b..1d28f8e 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -353,3 +353,46 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): messages.add((msg.partition, msg.offset)) self.assertEqual(len(messages), 5) self.assertGreaterEqual(t.interval, 1) + + @kafka_versions("0.8.1", "0.8.1.1") + def test_kafka_consumer__offset_commit_resume(self): + GROUP_ID = random_string(10) + + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Start a consumer + consumer1 = self.kafka_consumer( + group_id = GROUP_ID, + auto_commit_enable = True, + auto_commit_interval_ms = None, + auto_commit_interval_messages = 20, + auto_offset_reset='smallest', + ) + + # Grab the first 195 messages + output_msgs1 = [] + for _ in xrange(195): + m = consumer1.next() + output_msgs1.append(m) + consumer1.task_done(m) + self.assert_message_count(output_msgs1, 195) + + # The total offset across both partitions should be at 180 + consumer2 = self.kafka_consumer( + group_id = GROUP_ID, + auto_commit_enable = True, + auto_commit_interval_ms = None, + auto_commit_interval_messages = 20, + consumer_timeout_ms = 100, + auto_offset_reset='smallest', + ) + + # 181-200 + output_msgs2 = [] + with self.assertRaises(ConsumerTimeout): + while True: + m = consumer2.next() + output_msgs2.append(m) + self.assert_message_count(output_msgs2, 20) + self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15) |