diff options
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 16 |
1 files changed, 14 insertions, 2 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 717eac5..2762008 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -104,16 +104,28 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_simple_consumer_pending(self): + # make sure that we start with no pending messages + consumer = self.consumer() + self.assertEquals(consumer.pending(), 0) + self.assertEquals(consumer.pending(partitions=[0]), 0) + self.assertEquals(consumer.pending(partitions=[1]), 0) + # Produce 10 messages to partitions 0 and 1 self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) - consumer = self.consumer() - self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) + # move to last message, so one partition should have 1 pending + # message and other 0 + consumer.seek(-1, 2) + self.assertEqual(consumer.pending(), 1) + + pending_part1 = consumer.pending(partitions=[0]) + pending_part2 = consumer.pending(partitions=[1]) + self.assertEquals(set([0, 1]), set([pending_part1, pending_part2])) consumer.stop() @kafka_versions("all") |