summaryrefslogtreecommitdiff
path: root/test/test_failover_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_failover_integration.py')
-rw-r--r--test/test_failover_integration.py53
1 files changed, 30 insertions, 23 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 6c0e662..5e737b0 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -3,11 +3,14 @@ import os
import time
import unittest2
-from kafka import * # noqa
-from kafka.common import * # noqa
+from kafka import KafkaClient, SimpleConsumer
+from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer import Producer
-from fixtures import ZookeeperFixture, KafkaFixture
-from testutil import *
+
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import (
+ KafkaIntegrationTestCase, kafka_versions, random_string
+)
class TestFailover(KafkaIntegrationTestCase):
@@ -42,16 +45,18 @@ class TestFailover(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_switch_leader(self):
- key, topic, partition = random_string(5), self.topic, 0
+ topic = self.topic
+ partition = 0
# Test the base class Producer -- send_messages to a specific partition
- producer = Producer(self.client, async=False)
+ producer = Producer(self.client, async=False,
+ req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
# Send 10 random messages
- self._send_random_messages(producer, topic, partition, 10)
+ self._send_random_messages(producer, topic, partition, 100)
# kill leader for partition
- broker = self._kill_leader(topic, partition)
+ self._kill_leader(topic, partition)
# expect failure, but dont wait more than 60 secs to recover
recovered = False
@@ -71,20 +76,18 @@ class TestFailover(KafkaIntegrationTestCase):
self.assertTrue(recovered)
# send some more messages to new leader
- self._send_random_messages(producer, topic, partition, 10)
+ self._send_random_messages(producer, topic, partition, 100)
# count number of messages
- count = self._count_messages('test_switch_leader group', topic,
- partitions=(partition,))
-
# Should be equal to 10 before + 1 recovery + 10 after
- self.assertEquals(count, 21)
+ self.assert_message_count(topic, 201, partitions=(partition,))
#@kafka_versions("all")
@unittest2.skip("async producer does not support reliable failover yet")
def test_switch_leader_async(self):
- key, topic, partition = random_string(5), self.topic, 0
+ topic = self.topic
+ partition = 0
# Test the base class Producer -- send_messages to a specific partition
producer = Producer(self.client, async=True)
@@ -93,7 +96,7 @@ class TestFailover(KafkaIntegrationTestCase):
self._send_random_messages(producer, topic, partition, 10)
# kill leader for partition
- broker = self._kill_leader(topic, partition)
+ self._kill_leader(topic, partition)
logging.debug("attempting to send 'success' message after leader killed")
@@ -109,12 +112,8 @@ class TestFailover(KafkaIntegrationTestCase):
producer.stop()
# count number of messages
- count = self._count_messages('test_switch_leader_async group', topic,
- partitions=(partition,))
-
# Should be equal to 10 before + 1 recovery + 10 after
- self.assertEquals(count, 21)
-
+ self.assert_message_count(topic, 21, partitions=(partition,))
def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):
@@ -130,17 +129,25 @@ class TestFailover(KafkaIntegrationTestCase):
broker.close()
return broker
- def _count_messages(self, group, topic, timeout=1, partitions=None):
+ def assert_message_count(self, topic, check_count, timeout=10, partitions=None):
hosts = ','.join(['%s:%d' % (broker.host, broker.port)
for broker in self.brokers])
client = KafkaClient(hosts)
+ group = random_string(10)
consumer = SimpleConsumer(client, group, topic,
partitions=partitions,
auto_commit=False,
iter_timeout=timeout)
- count = consumer.pending(partitions)
+ started_at = time.time()
+ pending = consumer.pending(partitions)
+
+ # Keep checking if it isn't immediately correct, subject to timeout
+ while pending != check_count and (time.time() - started_at < timeout):
+ pending = consumer.pending(partitions)
+
consumer.stop()
client.close()
- return count
+
+ self.assertEqual(pending, check_count)