diff options
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r-- | test/test_failover_integration.py | 77 |
1 files changed, 62 insertions, 15 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index ca71f2d..7d27526 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -7,6 +7,7 @@ from . import unittest from kafka import KafkaClient, SimpleConsumer from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.producer.base import Producer +from kafka.producer import KeyedProducer from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( @@ -17,8 +18,7 @@ from test.testutil import ( class TestFailover(KafkaIntegrationTestCase): create_client = False - @classmethod - def setUpClass(cls): # noqa + def setUp(self): if not os.environ.get('KAFKA_VERSION'): return @@ -27,33 +27,41 @@ class TestFailover(KafkaIntegrationTestCase): partitions = 2 # mini zookeeper, 2 kafka brokers - cls.zk = ZookeeperFixture.instance() - kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] - cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + self.zk = ZookeeperFixture.instance() + kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] + self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] - cls.client = KafkaClient(hosts) + hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers] + self.client = KafkaClient(hosts) + super(TestFailover, self).setUp() - @classmethod - def tearDownClass(cls): + def tearDown(self): + super(TestFailover, self).tearDown() if not os.environ.get('KAFKA_VERSION'): return - cls.client.close() - for broker in cls.brokers: + self.client.close() + for broker in self.brokers: broker.close() - cls.zk.close() + self.zk.close() @kafka_versions("all") def test_switch_leader(self): topic = self.topic partition = 0 - # Test the base class Producer -- send_messages to a specific partition + # Testing the base Producer class here so that we can easily send + # messages to a specific partition, kill the leader for that partition + # and check that after another broker takes leadership the producer + # is able to resume sending messages + + # require that the server commit messages to all in-sync replicas + # so that failover doesn't lose any messages on server-side + # and we can assert that server-side message count equals client-side producer = Producer(self.client, async=False, req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) - # Send 10 random messages + # Send 100 random messages to a specific partition self._send_random_messages(producer, topic, partition, 100) # kill leader for partition @@ -80,7 +88,7 @@ class TestFailover(KafkaIntegrationTestCase): self._send_random_messages(producer, topic, partition, 100) # count number of messages - # Should be equal to 10 before + 1 recovery + 10 after + # Should be equal to 100 before + 1 recovery + 100 after self.assert_message_count(topic, 201, partitions=(partition,)) @@ -116,6 +124,45 @@ class TestFailover(KafkaIntegrationTestCase): # Should be equal to 10 before + 1 recovery + 10 after self.assert_message_count(topic, 21, partitions=(partition,)) + @kafka_versions("all") + def test_switch_leader_keyed_producer(self): + topic = self.topic + + producer = KeyedProducer(self.client, async=False) + + # Send 10 random messages + for _ in range(10): + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + + # kill leader for partition 0 + self._kill_leader(topic, 0) + + recovered = False + started = time.time() + timeout = 60 + while not recovered and (time.time() - started) < timeout: + try: + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + if producer.partitioners[topic].partition(key) == 0: + recovered = True + except (FailedPayloadsError, ConnectionError): + logging.debug("caught exception sending message -- will retry") + continue + + # Verify we successfully sent the message + self.assertTrue(recovered) + + # send some more messages just to make sure no more exceptions + for _ in range(10): + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + + def _send_random_messages(self, producer, topic, partition, n): for j in range(n): logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) |