diff options
-rw-r--r-- | test/test_failover_integration.py | 39 |
1 files changed, 23 insertions, 16 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 49a98a8..51b791f 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -45,13 +45,16 @@ class TestFailover(KafkaIntegrationTestCase): @kafka_versions("all") def test_switch_leader(self): - key, topic, partition = random_string(5), self.topic, 0 + key = random_string(5) + topic = self.topic + partition = 0 # Test the base class Producer -- send_messages to a specific partition - producer = Producer(self.client, async=False) + producer = Producer(self.client, async=False, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) # Send 10 random messages - self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition, 100) # kill leader for partition broker = self._kill_leader(topic, partition) @@ -74,20 +77,19 @@ class TestFailover(KafkaIntegrationTestCase): self.assertTrue(recovered) # send some more messages to new leader - self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition, 100) # count number of messages - count = self._count_messages('test_switch_leader group', topic, - partitions=(partition,)) - # Should be equal to 10 before + 1 recovery + 10 after - self.assertEquals(count, 21) + self.assert_message_count(topic, 201, partitions=(partition,)) #@kafka_versions("all") @unittest2.skip("async producer does not support reliable failover yet") def test_switch_leader_async(self): - key, topic, partition = random_string(5), self.topic, 0 + key = random_string(5) + topic = self.topic + partition = 0 # Test the base class Producer -- send_messages to a specific partition producer = Producer(self.client, async=True) @@ -112,11 +114,8 @@ class TestFailover(KafkaIntegrationTestCase): producer.stop() # count number of messages - count = self._count_messages('test_switch_leader_async group', topic, - partitions=(partition,)) - # Should be equal to 10 before + 1 recovery + 10 after - self.assertEquals(count, 21) + count = self.assert_message_count(topic, 21, partitions=(partition,)) def _send_random_messages(self, producer, topic, partition, n): @@ -133,17 +132,25 @@ class TestFailover(KafkaIntegrationTestCase): broker.close() return broker - def _count_messages(self, group, topic, timeout=1, partitions=None): + def assert_message_count(self, topic, check_count, timeout=10, partitions=None): hosts = ','.join(['%s:%d' % (broker.host, broker.port) for broker in self.brokers]) client = KafkaClient(hosts) + group = random_string(10) consumer = SimpleConsumer(client, group, topic, partitions=partitions, auto_commit=False, iter_timeout=timeout) - count = consumer.pending(partitions) + started_at = time.time() + pending = consumer.pending(partitions) + + # Keep checking if it isn't immediately correct, subject to timeout + while pending != check_count and (time.time() - started_at < timeout): + pending = consumer.pending(partitions) + consumer.stop() client.close() - return count + + self.assertEqual(pending, check_count) |