summaryrefslogtreecommitdiff
path: root/test/test_failover_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-08-27 13:16:09 -0700
committerDana Powers <dana.powers@rd.io>2014-08-27 13:26:23 -0700
commit9cac8eacb2c3776d048500c4bfbcd2ee33ff9c02 (patch)
treec2086622d834c08ccf76e4188de7433f78e7e5ee /test/test_failover_integration.py
parent7c830590bf891913d309c45be53db08e7220b4a0 (diff)
downloadkafka-python-9cac8eacb2c3776d048500c4bfbcd2ee33ff9c02.tar.gz
Fixup producer failover tests to avoid consumer message count flap
check message counts via consumer.pending() (OffsetRequest) rather than relying on consumer message iterator (FetchRequest)
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r--test/test_failover_integration.py15
1 files changed, 8 insertions, 7 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 20a7f28..45b249d 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -74,7 +74,8 @@ class TestFailover(KafkaIntegrationTestCase):
self._send_random_messages(producer, topic, partition, 10)
# count number of messages
- count = self._count_messages('test_switch_leader group', topic, partition)
+ count = self._count_messages('test_switch_leader group', topic,
+ partitions=(partition,))
# Should be equal to 10 before + 1 recovery + 10 after
self.assertEquals(count, 21)
@@ -108,7 +109,8 @@ class TestFailover(KafkaIntegrationTestCase):
producer.stop()
# count number of messages
- count = self._count_messages('test_switch_leader_async group', topic, partition)
+ count = self._count_messages('test_switch_leader_async group', topic,
+ partitions=(partition,))
# Should be equal to 10 before + 1 recovery + 10 after
self.assertEquals(count, 21)
@@ -128,18 +130,17 @@ class TestFailover(KafkaIntegrationTestCase):
broker.close()
return broker
- def _count_messages(self, group, topic, timeout=1):
+ def _count_messages(self, group, topic, timeout=1, partitions=None):
hosts = ','.join(['%s:%d' % (broker.host, broker.port)
for broker in self.brokers])
client = KafkaClient(hosts)
consumer = SimpleConsumer(client, group, topic,
+ partitions=partitions,
auto_commit=False,
iter_timeout=timeout)
- all_messages = []
- for message in consumer:
- all_messages.append(message)
+ count = consumer.pending(partitions)
consumer.stop()
client.close()
- return len(all_messages)
+ return count