summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-08-26 17:10:41 -0700
committerDana Powers <dana.powers@rd.io>2014-08-26 19:24:33 -0700
commit26042ae7f53fbcfd4ccdd6c4d5b11c54ba04f312 (patch)
tree9d3237c59674a7451a958180f0fa2a3bf2095a66 /test
parentd107420bedc5c4ebdd8f53fd39883b88b7789ed4 (diff)
downloadkafka-python-26042ae7f53fbcfd4ccdd6c4d5b11c54ba04f312.tar.gz
Improve leader failover tests; note that async produce failover is broken -- so skip that test for now
Diffstat (limited to 'test')
-rw-r--r--test/test_failover_integration.py112
1 files changed, 67 insertions, 45 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 6298f62..20a7f28 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -1,11 +1,15 @@
+import logging
import os
import time
+import unittest2
from kafka import * # noqa
from kafka.common import * # noqa
+from kafka.producer import Producer
from fixtures import ZookeeperFixture, KafkaFixture
from testutil import *
+
class TestFailover(KafkaIntegrationTestCase):
create_client = False
@@ -39,82 +43,100 @@ class TestFailover(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0
- producer = SimpleProducer(self.client)
- for i in range(1, 4):
+ # Test the base class Producer -- send_messages to a specific partition
+ producer = Producer(self.client, async=False)
- # XXX unfortunately, the conns dict needs to be warmed for this to work
- # XXX unfortunately, for warming to work, we need at least as many partitions as brokers
- self._send_random_messages(producer, self.topic, 10)
+ # Send 10 random messages
+ self._send_random_messages(producer, topic, partition, 10)
- # kil leader for partition 0
- broker = self._kill_leader(topic, partition)
+ # kill leader for partition
+ broker = self._kill_leader(topic, partition)
- # expect failure, reload meta data
- with self.assertRaises(FailedPayloadsError):
- producer.send_messages(self.topic, 'part 1')
- producer.send_messages(self.topic, 'part 2')
- time.sleep(1)
+ # expect failure, but dont wait more than 60 secs to recover
+ recovered = False
+ started = time.time()
+ timeout = 60
+ while not recovered and (time.time() - started) < timeout:
+ try:
+ logging.debug("attempting to send 'success' message after leader killed")
+ producer.send_messages(topic, partition, 'success')
+ logging.debug("success!")
+ recovered = True
+ except FailedPayloadsError, ConnectionError:
+ logging.debug("caught exception sending message -- will retry")
+ continue
- # send to new leader
- self._send_random_messages(producer, self.topic, 10)
+ # Verify we successfully sent the message
+ self.assertTrue(recovered)
- broker.open()
- time.sleep(3)
+ # send some more messages to new leader
+ self._send_random_messages(producer, topic, partition, 10)
- # count number of messages
- count = self._count_messages('test_switch_leader group %s' % i, topic)
- self.assertIn(count, range(20 * i, 22 * i + 1))
+ # count number of messages
+ count = self._count_messages('test_switch_leader group', topic, partition)
- producer.stop()
+ # Should be equal to 10 before + 1 recovery + 10 after
+ self.assertEquals(count, 21)
- @kafka_versions("all")
+
+ #@kafka_versions("all")
+ @unittest2.skip("async producer does not support reliable failover yet")
def test_switch_leader_async(self):
key, topic, partition = random_string(5), self.topic, 0
- producer = SimpleProducer(self.client, async=True)
-
- for i in range(1, 4):
- self._send_random_messages(producer, self.topic, 10)
+ # Test the base class Producer -- send_messages to a specific partition
+ producer = Producer(self.client, async=True)
- # kil leader for partition 0
- broker = self._kill_leader(topic, partition)
+ # Send 10 random messages
+ self._send_random_messages(producer, topic, partition, 10)
- # expect failure, reload meta data
- producer.send_messages(self.topic, 'part 1')
- producer.send_messages(self.topic, 'part 2')
- time.sleep(1)
+ # kill leader for partition
+ broker = self._kill_leader(topic, partition)
- # send to new leader
- self._send_random_messages(producer, self.topic, 10)
+ logging.debug("attempting to send 'success' message after leader killed")
- broker.open()
- time.sleep(3)
+ # in async mode, this should return immediately
+ producer.send_messages(topic, partition, 'success')
- # count number of messages
- count = self._count_messages('test_switch_leader_async group %s' % i, topic)
- self.assertIn(count, range(20 * i, 22 * i + 1))
+ # send to new leader
+ self._send_random_messages(producer, topic, partition, 10)
+ # wait until producer queue is empty
+ while not producer.queue.empty():
+ time.sleep(0.1)
producer.stop()
- def _send_random_messages(self, producer, topic, n):
+ # count number of messages
+ count = self._count_messages('test_switch_leader_async group', topic, partition)
+
+ # Should be equal to 10 before + 1 recovery + 10 after
+ self.assertEquals(count, 21)
+
+
+ def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):
- resp = producer.send_messages(topic, random_string(10))
+ logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
+ resp = producer.send_messages(topic, partition, random_string(10))
if len(resp) > 0:
self.assertEquals(resp[0].error, 0)
- time.sleep(1) # give it some time
+ logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
broker = self.brokers[leader.nodeId]
broker.close()
- time.sleep(1) # give it some time
return broker
- def _count_messages(self, group, topic):
- hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
+ def _count_messages(self, group, topic, timeout=1):
+ hosts = ','.join(['%s:%d' % (broker.host, broker.port)
+ for broker in self.brokers])
+
client = KafkaClient(hosts)
- consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
+ consumer = SimpleConsumer(client, group, topic,
+ auto_commit=False,
+ iter_timeout=timeout)
+
all_messages = []
for message in consumer:
all_messages.append(message)