summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/010ExcludeList1
-rw-r--r--java/08ExcludeList2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java142
3 files changed, 72 insertions, 73 deletions
diff --git a/java/010ExcludeList b/java/010ExcludeList
index 98d6593d0d..d7287abc68 100644
--- a/java/010ExcludeList
+++ b/java/010ExcludeList
@@ -32,6 +32,7 @@ org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverWithQueueBrowser
org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated
org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser
org.apache.qpid.test.testcases.FailoverTest#*
+org.apache.qpid.test.client.failover.FailoverTest#*
// Those tests are testing 0.8 specific semantics
org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P
org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P
diff --git a/java/08ExcludeList b/java/08ExcludeList
index 1ca9279271..a3caba4df3 100644
--- a/java/08ExcludeList
+++ b/java/08ExcludeList
@@ -5,3 +5,5 @@ org.apache.qpid.test.unit.ct.DurableSubscriberTests#*
// Those tests are not finished
org.apache.qpid.test.testcases.TTLTest#*
org.apache.qpid.test.testcases.FailoverTest#*
+// This is a long running test so should exclude from normal runs
+org.apache.qpid.test.client.failover.FailoverTest#test4MinuteFailover
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index e7d7c7eba6..cab6a3a736 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -21,14 +21,15 @@
package org.apache.qpid.test.client.failover;
-import junit.framework.TestCase;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.test.utils.FailoverBaseCase;
import org.apache.log4j.Logger;
import javax.jms.Connection;
@@ -38,78 +39,62 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Queue;
+import javax.naming.NamingException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-public class FailoverTest extends TestCase implements ConnectionListener
+public class FailoverTest extends FailoverBaseCase implements ConnectionListener
{
private static final Logger _logger = Logger.getLogger(FailoverTest.class);
- private static final int NUM_BROKERS = 2;
- private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'";
private static final String QUEUE = "queue";
private static final int NUM_MESSAGES = 10;
- private Connection con;
- private AMQConnectionFactory conFactory;
- private Session prodSess;
- private AMQQueue q;
- private MessageProducer prod;
- private Session conSess;
+ private Connection connnection;
+ private Session producerSession;
+ private Queue queue;
+ private MessageProducer producer;
+ private Session consumerSession;
private MessageConsumer consumer;
private static int usedBrokers = 0;
private CountDownLatch failoverComplete;
+ private static final long DEFAULT_FAILOVER_TIME = 10000L;
@Override
protected void setUp() throws Exception
{
super.setUp();
- // Create two VM brokers
- for (int i = 0; i < NUM_BROKERS; i++)
- {
- usedBrokers++;
-
- TransportConnection.createVMBroker(usedBrokers);
- }
-
- conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers));
- _logger.info("Connecting on:" + conFactory.getConnectionURL());
- con = conFactory.createConnection();
- ((AMQConnection) con).setConnectionListener(this);
- con.start();
+ connnection = getConnection();
+ ((AMQConnection) connnection).setConnectionListener(this);
+ connnection.start();
failoverComplete = new CountDownLatch(1);
}
- private void init(boolean transacted, int mode) throws JMSException
+ private void init(boolean transacted, int mode) throws JMSException, NamingException
{
- prodSess = con.createSession(transacted, mode);
- q = new AMQQueue("amq.direct", QUEUE);
- prod = prodSess.createProducer(q);
- conSess = con.createSession(transacted, mode);
- consumer = conSess.createConsumer(q);
+ queue = (Queue) getInitialContext().lookup(QUEUE);
+
+ consumerSession = connnection.createSession(transacted, mode);
+ consumer = consumerSession.createConsumer(queue);
+
+ producerSession = connnection.createSession(transacted, mode);
+ producer = producerSession.createProducer(queue);
}
@Override
- protected void tearDown() throws Exception
+ public void tearDown() throws Exception
{
try
{
- con.close();
+ connnection.close();
}
catch (Exception e)
{
}
- try
- {
- TransportConnection.killAllVMBrokers();
- ApplicationRegistry.removeAll();
- }
- catch (Exception e)
- {
- fail("Unable to clean up");
- }
super.tearDown();
}
@@ -128,17 +113,8 @@ public class FailoverTest extends TestCase implements ConnectionListener
{
for (int i = 0; i < totalMessages; i++)
{
- prod.send(prodSess.createTextMessage("message " + i));
+ producer.send(producerSession.createTextMessage("message " + i));
}
-
-// try
-// {
-// Thread.sleep(100 * totalMessages);
-// }
-// catch (InterruptedException e)
-// {
-// //evil ignoring of IE
-// }
}
public void testP2PFailover() throws Exception
@@ -151,7 +127,7 @@ public class FailoverTest extends TestCase implements ConnectionListener
testP2PFailover(NUM_MESSAGES, false);
}
- private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException
+ private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException, NamingException
{
Message msg = null;
init(false, Session.AUTO_ACKNOWLEDGE);
@@ -168,27 +144,25 @@ public class FailoverTest extends TestCase implements ConnectionListener
_logger.info("Failing over");
- causeFailure();
+ causeFailure(DEFAULT_FAILOVER_TIME);
msg = consumer.receive(500);
- //todo: reinstate
+
assertNull("Should not have received message from new broker!", msg);
// Check that messages still sent / received
sendMessages(totalMessages);
consumeMessages(totalMessages);
}
- private void causeFailure()
+ private void causeFailure(long delay)
{
- _logger.info("Failover");
- TransportConnection.killVMBroker(usedBrokers - 1);
- ApplicationRegistry.remove(usedBrokers - 1);
+ failBroker();
_logger.info("Awaiting Failover completion");
try
{
- failoverComplete.await();
+ failoverComplete.await(delay, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
@@ -203,8 +177,7 @@ public class FailoverTest extends TestCase implements ConnectionListener
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
-
- causeFailure();
+ causeFailure(DEFAULT_FAILOVER_TIME);
Exception failure = null;
try
@@ -218,18 +191,41 @@ public class FailoverTest extends TestCase implements ConnectionListener
assertNotNull("Exception should be thrown", failure);
}
- // This test disabled so that it doesn't add 4 minnutes to the length of time it takes to run, which would be lame
- public void txest4MinuteFailover() throws Exception
+ /**
+ * The client used to have a fixed timeout of 4 minutes after which failover would no longer work.
+ * Check that this code has not regressed
+ *
+ * @throws Exception if something unexpected occurs in the test.
+ */
+ public void test4MinuteFailover() throws Exception
{
- conFactory = new AMQConnectionFactory("amqp://guest:guest@/test?brokerlist='vm://:"+(usedBrokers-1)+"?connectdelay='60000'&retries='2''");
- _logger.info("Connecting on:" + conFactory.getConnectionURL());
- con = conFactory.createConnection();
- ((AMQConnection) con).setConnectionListener(this);
- con.start();
-
- long failTime = System.currentTimeMillis() + 60000;
- causeFailure();
- assertTrue("Failover did not take long enough", System.currentTimeMillis() > failTime);
+ ConnectionURL connectionURL = getConnectionFactory().getConnectionURL();
+
+ int RETRIES = 4;
+ int DELAY = 60000;
+
+ //Set up a long delay on and large number of retries
+ BrokerDetails details = connectionURL.getBrokerDetails(1);
+ details.setProperty(BrokerDetails.OPTIONS_RETRY, String.valueOf(RETRIES));
+ details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, String.valueOf(DELAY));
+
+ connnection = new AMQConnection(connectionURL, null);
+
+ ((AMQConnection) connnection).setConnectionListener(this);
+
+ //Start the connection
+ connnection.start();
+
+ long FAILOVER_DELAY = (RETRIES * DELAY);
+
+ // Use Nano seconds as it is more accurate for comparision.
+ long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
+
+ //Fail the first broker
+ causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+
+ //Reconnection should occur
+ assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
}
public void bytesSent(long count)