diff options
author | Dana Powers <dana.powers@rd.io> | 2014-08-27 13:16:09 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-08-27 13:26:23 -0700 |
commit | 9cac8eacb2c3776d048500c4bfbcd2ee33ff9c02 (patch) | |
tree | c2086622d834c08ccf76e4188de7433f78e7e5ee /test/test_failover_integration.py | |
parent | 7c830590bf891913d309c45be53db08e7220b4a0 (diff) | |
download | kafka-python-9cac8eacb2c3776d048500c4bfbcd2ee33ff9c02.tar.gz |
Fixup producer failover tests to avoid consumer message count flap
check message counts via consumer.pending() (OffsetRequest)
rather than relying on consumer message iterator (FetchRequest)
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r-- | test/test_failover_integration.py | 15 |
1 files changed, 8 insertions, 7 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 20a7f28..45b249d 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -74,7 +74,8 @@ class TestFailover(KafkaIntegrationTestCase): self._send_random_messages(producer, topic, partition, 10) # count number of messages - count = self._count_messages('test_switch_leader group', topic, partition) + count = self._count_messages('test_switch_leader group', topic, + partitions=(partition,)) # Should be equal to 10 before + 1 recovery + 10 after self.assertEquals(count, 21) @@ -108,7 +109,8 @@ class TestFailover(KafkaIntegrationTestCase): producer.stop() # count number of messages - count = self._count_messages('test_switch_leader_async group', topic, partition) + count = self._count_messages('test_switch_leader_async group', topic, + partitions=(partition,)) # Should be equal to 10 before + 1 recovery + 10 after self.assertEquals(count, 21) @@ -128,18 +130,17 @@ class TestFailover(KafkaIntegrationTestCase): broker.close() return broker - def _count_messages(self, group, topic, timeout=1): + def _count_messages(self, group, topic, timeout=1, partitions=None): hosts = ','.join(['%s:%d' % (broker.host, broker.port) for broker in self.brokers]) client = KafkaClient(hosts) consumer = SimpleConsumer(client, group, topic, + partitions=partitions, auto_commit=False, iter_timeout=timeout) - all_messages = [] - for message in consumer: - all_messages.append(message) + count = consumer.pending(partitions) consumer.stop() client.close() - return len(all_messages) + return count |