diff options
author | Dana Powers <dana.powers@gmail.com> | 2014-09-07 12:17:54 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2014-09-07 12:17:54 -0700 |
commit | a99384f4c601d127ab1c4fe5b272ea5c07fd695d (patch) | |
tree | d559e3c3f650dab1ce9247aa7a89f41bdd410e46 /test/test_failover_integration.py | |
parent | 9856cc36d7742922133af0aa53767c8ed4731957 (diff) | |
parent | 1b282d21522d101f4129d5fc3e70e2b904d3b171 (diff) | |
download | kafka-python-a99384f4c601d127ab1c4fe5b272ea5c07fd695d.tar.gz |
Merge pull request #221 from dpkp/minor_cleanups
Minor cleanups
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r-- | test/test_failover_integration.py | 53 |
1 files changed, 30 insertions, 23 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 6c0e662..5e737b0 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -3,11 +3,14 @@ import os import time import unittest2 -from kafka import * # noqa -from kafka.common import * # noqa +from kafka import KafkaClient, SimpleConsumer +from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.producer import Producer -from fixtures import ZookeeperFixture, KafkaFixture -from testutil import * + +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import ( + KafkaIntegrationTestCase, kafka_versions, random_string +) class TestFailover(KafkaIntegrationTestCase): @@ -42,16 +45,18 @@ class TestFailover(KafkaIntegrationTestCase): @kafka_versions("all") def test_switch_leader(self): - key, topic, partition = random_string(5), self.topic, 0 + topic = self.topic + partition = 0 # Test the base class Producer -- send_messages to a specific partition - producer = Producer(self.client, async=False) + producer = Producer(self.client, async=False, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) # Send 10 random messages - self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition, 100) # kill leader for partition - broker = self._kill_leader(topic, partition) + self._kill_leader(topic, partition) # expect failure, but dont wait more than 60 secs to recover recovered = False @@ -71,20 +76,18 @@ class TestFailover(KafkaIntegrationTestCase): self.assertTrue(recovered) # send some more messages to new leader - self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition, 100) # count number of messages - count = self._count_messages('test_switch_leader group', topic, - partitions=(partition,)) - # Should be equal to 10 before + 1 recovery + 10 after - self.assertEquals(count, 21) + self.assert_message_count(topic, 201, partitions=(partition,)) #@kafka_versions("all") @unittest2.skip("async producer does not support reliable failover yet") def test_switch_leader_async(self): - key, topic, partition = random_string(5), self.topic, 0 + topic = self.topic + partition = 0 # Test the base class Producer -- send_messages to a specific partition producer = Producer(self.client, async=True) @@ -93,7 +96,7 @@ class TestFailover(KafkaIntegrationTestCase): self._send_random_messages(producer, topic, partition, 10) # kill leader for partition - broker = self._kill_leader(topic, partition) + self._kill_leader(topic, partition) logging.debug("attempting to send 'success' message after leader killed") @@ -109,12 +112,8 @@ class TestFailover(KafkaIntegrationTestCase): producer.stop() # count number of messages - count = self._count_messages('test_switch_leader_async group', topic, - partitions=(partition,)) - # Should be equal to 10 before + 1 recovery + 10 after - self.assertEquals(count, 21) - + self.assert_message_count(topic, 21, partitions=(partition,)) def _send_random_messages(self, producer, topic, partition, n): for j in range(n): @@ -130,17 +129,25 @@ class TestFailover(KafkaIntegrationTestCase): broker.close() return broker - def _count_messages(self, group, topic, timeout=1, partitions=None): + def assert_message_count(self, topic, check_count, timeout=10, partitions=None): hosts = ','.join(['%s:%d' % (broker.host, broker.port) for broker in self.brokers]) client = KafkaClient(hosts) + group = random_string(10) consumer = SimpleConsumer(client, group, topic, partitions=partitions, auto_commit=False, iter_timeout=timeout) - count = consumer.pending(partitions) + started_at = time.time() + pending = consumer.pending(partitions) + + # Keep checking if it isn't immediately correct, subject to timeout + while pending != check_count and (time.time() - started_at < timeout): + pending = consumer.pending(partitions) + consumer.stop() client.close() - return count + + self.assertEqual(pending, check_count) |