diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/010ExcludeList | 1 | ||||
| -rw-r--r-- | java/08ExcludeList | 2 | ||||
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java | 142 |
3 files changed, 72 insertions, 73 deletions
diff --git a/java/010ExcludeList b/java/010ExcludeList index 98d6593d0d..d7287abc68 100644 --- a/java/010ExcludeList +++ b/java/010ExcludeList @@ -32,6 +32,7 @@ org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverWithQueueBrowser org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser org.apache.qpid.test.testcases.FailoverTest#* +org.apache.qpid.test.client.failover.FailoverTest#* // Those tests are testing 0.8 specific semantics org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P diff --git a/java/08ExcludeList b/java/08ExcludeList index 1ca9279271..a3caba4df3 100644 --- a/java/08ExcludeList +++ b/java/08ExcludeList @@ -5,3 +5,5 @@ org.apache.qpid.test.unit.ct.DurableSubscriberTests#* // Those tests are not finished org.apache.qpid.test.testcases.TTLTest#* org.apache.qpid.test.testcases.FailoverTest#* +// This is a long running test so should exclude from normal runs +org.apache.qpid.test.client.failover.FailoverTest#test4MinuteFailover diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java index e7d7c7eba6..cab6a3a736 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -21,14 +21,15 @@ package org.apache.qpid.test.client.failover; -import junit.framework.TestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.test.utils.FailoverBaseCase; import org.apache.log4j.Logger; import javax.jms.Connection; @@ -38,78 +39,62 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Queue; +import javax.naming.NamingException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -public class FailoverTest extends TestCase implements ConnectionListener +public class FailoverTest extends FailoverBaseCase implements ConnectionListener { private static final Logger _logger = Logger.getLogger(FailoverTest.class); - private static final int NUM_BROKERS = 2; - private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'"; private static final String QUEUE = "queue"; private static final int NUM_MESSAGES = 10; - private Connection con; - private AMQConnectionFactory conFactory; - private Session prodSess; - private AMQQueue q; - private MessageProducer prod; - private Session conSess; + private Connection connnection; + private Session producerSession; + private Queue queue; + private MessageProducer producer; + private Session consumerSession; private MessageConsumer consumer; private static int usedBrokers = 0; private CountDownLatch failoverComplete; + private static final long DEFAULT_FAILOVER_TIME = 10000L; @Override protected void setUp() throws Exception { super.setUp(); - // Create two VM brokers - for (int i = 0; i < NUM_BROKERS; i++) - { - usedBrokers++; - - TransportConnection.createVMBroker(usedBrokers); - } - - conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers)); - _logger.info("Connecting on:" + conFactory.getConnectionURL()); - con = conFactory.createConnection(); - ((AMQConnection) con).setConnectionListener(this); - con.start(); + connnection = getConnection(); + ((AMQConnection) connnection).setConnectionListener(this); + connnection.start(); failoverComplete = new CountDownLatch(1); } - private void init(boolean transacted, int mode) throws JMSException + private void init(boolean transacted, int mode) throws JMSException, NamingException { - prodSess = con.createSession(transacted, mode); - q = new AMQQueue("amq.direct", QUEUE); - prod = prodSess.createProducer(q); - conSess = con.createSession(transacted, mode); - consumer = conSess.createConsumer(q); + queue = (Queue) getInitialContext().lookup(QUEUE); + + consumerSession = connnection.createSession(transacted, mode); + consumer = consumerSession.createConsumer(queue); + + producerSession = connnection.createSession(transacted, mode); + producer = producerSession.createProducer(queue); } @Override - protected void tearDown() throws Exception + public void tearDown() throws Exception { try { - con.close(); + connnection.close(); } catch (Exception e) { } - try - { - TransportConnection.killAllVMBrokers(); - ApplicationRegistry.removeAll(); - } - catch (Exception e) - { - fail("Unable to clean up"); - } super.tearDown(); } @@ -128,17 +113,8 @@ public class FailoverTest extends TestCase implements ConnectionListener { for (int i = 0; i < totalMessages; i++) { - prod.send(prodSess.createTextMessage("message " + i)); + producer.send(producerSession.createTextMessage("message " + i)); } - -// try -// { -// Thread.sleep(100 * totalMessages); -// } -// catch (InterruptedException e) -// { -// //evil ignoring of IE -// } } public void testP2PFailover() throws Exception @@ -151,7 +127,7 @@ public class FailoverTest extends TestCase implements ConnectionListener testP2PFailover(NUM_MESSAGES, false); } - private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException + private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException, NamingException { Message msg = null; init(false, Session.AUTO_ACKNOWLEDGE); @@ -168,27 +144,25 @@ public class FailoverTest extends TestCase implements ConnectionListener _logger.info("Failing over"); - causeFailure(); + causeFailure(DEFAULT_FAILOVER_TIME); msg = consumer.receive(500); - //todo: reinstate + assertNull("Should not have received message from new broker!", msg); // Check that messages still sent / received sendMessages(totalMessages); consumeMessages(totalMessages); } - private void causeFailure() + private void causeFailure(long delay) { - _logger.info("Failover"); - TransportConnection.killVMBroker(usedBrokers - 1); - ApplicationRegistry.remove(usedBrokers - 1); + failBroker(); _logger.info("Awaiting Failover completion"); try { - failoverComplete.await(); + failoverComplete.await(delay, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -203,8 +177,7 @@ public class FailoverTest extends TestCase implements ConnectionListener Message msg = consumer.receive(); assertNotNull("Expected msgs not received", msg); - - causeFailure(); + causeFailure(DEFAULT_FAILOVER_TIME); Exception failure = null; try @@ -218,18 +191,41 @@ public class FailoverTest extends TestCase implements ConnectionListener assertNotNull("Exception should be thrown", failure); } - // This test disabled so that it doesn't add 4 minnutes to the length of time it takes to run, which would be lame - public void txest4MinuteFailover() throws Exception + /** + * The client used to have a fixed timeout of 4 minutes after which failover would no longer work. + * Check that this code has not regressed + * + * @throws Exception if something unexpected occurs in the test. + */ + public void test4MinuteFailover() throws Exception { - conFactory = new AMQConnectionFactory("amqp://guest:guest@/test?brokerlist='vm://:"+(usedBrokers-1)+"?connectdelay='60000'&retries='2''"); - _logger.info("Connecting on:" + conFactory.getConnectionURL()); - con = conFactory.createConnection(); - ((AMQConnection) con).setConnectionListener(this); - con.start(); - - long failTime = System.currentTimeMillis() + 60000; - causeFailure(); - assertTrue("Failover did not take long enough", System.currentTimeMillis() > failTime); + ConnectionURL connectionURL = getConnectionFactory().getConnectionURL(); + + int RETRIES = 4; + int DELAY = 60000; + + //Set up a long delay on and large number of retries + BrokerDetails details = connectionURL.getBrokerDetails(1); + details.setProperty(BrokerDetails.OPTIONS_RETRY, String.valueOf(RETRIES)); + details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, String.valueOf(DELAY)); + + connnection = new AMQConnection(connectionURL, null); + + ((AMQConnection) connnection).setConnectionListener(this); + + //Start the connection + connnection.start(); + + long FAILOVER_DELAY = (RETRIES * DELAY); + + // Use Nano seconds as it is more accurate for comparision. + long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000; + + //Fail the first broker + causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME); + + //Reconnection should occur + assertTrue("Failover did not take long enough", System.nanoTime() > failTime); } public void bytesSent(long count) |
