diff options
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r-- | test/test_failover_integration.py | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 3c74a44..167a231 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -7,6 +7,7 @@ from . import unittest from kafka import KafkaClient, SimpleConsumer from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.producer.base import Producer +from kafka.producer import KeyedProducer from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( @@ -116,6 +117,45 @@ class TestFailover(KafkaIntegrationTestCase): # Should be equal to 10 before + 1 recovery + 10 after self.assert_message_count(topic, 21, partitions=(partition,)) + @kafka_versions("all") + def test_switch_leader_keyed_producer(self): + topic = self.topic + + producer = KeyedProducer(self.client, async=False) + + # Send 10 random messages + for _ in range(10): + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + + # kill leader for partition 0 + self._kill_leader(topic, 0) + + recovered = False + started = time.time() + timeout = 60 + while not recovered and (time.time() - started) < timeout: + try: + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + if producer.partitioners[topic].partition(key) == 0: + recovered = True + except (FailedPayloadsError, ConnectionError): + logging.debug("caught exception sending message -- will retry") + continue + + # Verify we successfully sent the message + self.assertTrue(recovered) + + # send some more messages just to make sure no more exceptions + for _ in range(10): + key = random_string(3) + msg = random_string(10) + producer.send_messages(topic, key, msg) + + def _send_random_messages(self, producer, topic, partition, n): for j in range(n): logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) |