summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorNickolai Novik <is.infinity@yahoo.com>2014-12-15 01:16:50 +0200
committerNickolai Novik <is.infinity@yahoo.com>2014-12-15 01:16:50 +0200
commitae79eff49bba38611048a46f0ec380eb664c2983 (patch)
treec457fc91ee4413447f64d9cdaad5facc9f51ddc3 /test/test_consumer_integration.py
parent3fc4dca1dba39c1416d6cc9624cbc9595276b366 (diff)
downloadkafka-python-ae79eff49bba38611048a46f0ec380eb664c2983.tar.gz
fix pending method
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py16
1 files changed, 14 insertions, 2 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 717eac5..2762008 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -104,16 +104,28 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_simple_consumer_pending(self):
+ # make sure that we start with no pending messages
+ consumer = self.consumer()
+ self.assertEquals(consumer.pending(), 0)
+ self.assertEquals(consumer.pending(partitions=[0]), 0)
+ self.assertEquals(consumer.pending(partitions=[1]), 0)
+
# Produce 10 messages to partitions 0 and 1
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
- consumer = self.consumer()
-
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
+ # move to last message, so one partition should have 1 pending
+ # message and other 0
+ consumer.seek(-1, 2)
+ self.assertEqual(consumer.pending(), 1)
+
+ pending_part1 = consumer.pending(partitions=[0])
+ pending_part2 = consumer.pending(partitions=[1])
+ self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
consumer.stop()
@kafka_versions("all")