summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py16
1 files changed, 14 insertions, 2 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 717eac5..2762008 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -104,16 +104,28 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_simple_consumer_pending(self):
+ # make sure that we start with no pending messages
+ consumer = self.consumer()
+ self.assertEquals(consumer.pending(), 0)
+ self.assertEquals(consumer.pending(partitions=[0]), 0)
+ self.assertEquals(consumer.pending(partitions=[1]), 0)
+
# Produce 10 messages to partitions 0 and 1
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
- consumer = self.consumer()
-
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
+ # move to last message, so one partition should have 1 pending
+ # message and other 0
+ consumer.seek(-1, 2)
+ self.assertEqual(consumer.pending(), 1)
+
+ pending_part1 = consumer.pending(partitions=[0])
+ pending_part2 = consumer.pending(partitions=[1])
+ self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
consumer.stop()
@kafka_versions("all")