summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/kafka.py11
-rw-r--r--test/test_consumer_integration.py43
2 files changed, 48 insertions, 6 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 43e8c55..705c70d 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -485,12 +485,11 @@ class KafkaConsumer(object):
offset, prev_done)
# Warn on smaller offsets than previous commit
- # "commit" offsets are actually the offset of the next # message to fetch.
- # so task_done should be compared with (commit - 1)
- prev_done = (self._offsets.commit[topic_partition] - 1)
- if prev_done is not None and (offset <= prev_done):
- logger.warning('Marking task_done on a previously committed offset?: %d <= %d',
- offset, prev_done)
+ # "commit" offsets are actually the offset of the next message to fetch.
+ prev_commit = self._offsets.commit[topic_partition]
+ if prev_commit is not None and ((offset + 1) <= prev_commit):
+ logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d',
+ offset, prev_commit)
self._offsets.task_done[topic_partition] = offset
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index c97081b..1d28f8e 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -353,3 +353,46 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, 1)
+
+ @kafka_versions("0.8.1", "0.8.1.1")
+ def test_kafka_consumer__offset_commit_resume(self):
+ GROUP_ID = random_string(10)
+
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Start a consumer
+ consumer1 = self.kafka_consumer(
+ group_id = GROUP_ID,
+ auto_commit_enable = True,
+ auto_commit_interval_ms = None,
+ auto_commit_interval_messages = 20,
+ auto_offset_reset='smallest',
+ )
+
+ # Grab the first 195 messages
+ output_msgs1 = []
+ for _ in xrange(195):
+ m = consumer1.next()
+ output_msgs1.append(m)
+ consumer1.task_done(m)
+ self.assert_message_count(output_msgs1, 195)
+
+ # The total offset across both partitions should be at 180
+ consumer2 = self.kafka_consumer(
+ group_id = GROUP_ID,
+ auto_commit_enable = True,
+ auto_commit_interval_ms = None,
+ auto_commit_interval_messages = 20,
+ consumer_timeout_ms = 100,
+ auto_offset_reset='smallest',
+ )
+
+ # 181-200
+ output_msgs2 = []
+ with self.assertRaises(ConsumerTimeout):
+ while True:
+ m = consumer2.next()
+ output_msgs2.append(m)
+ self.assert_message_count(output_msgs2, 20)
+ self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)