diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2009-02-25 23:18:54 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2009-02-25 23:18:54 +0000 |
| commit | 58336d355bea1c1001356bbab67d4b758ef6b079 (patch) | |
| tree | e07229bcab424476b78c3ecf134d1081f6c5617b /java | |
| parent | d626716ccce8244e6b565c3acc176eb69a472cf6 (diff) | |
| download | qpid-python-58336d355bea1c1001356bbab67d4b758ef6b079.tar.gz | |
This is related to QPID-1640
Please read the JIRA for more details.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747961 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
4 files changed, 138 insertions, 46 deletions
diff --git a/java/cpp.cluster.testprofile b/java/cpp.cluster.testprofile index 1807ae098b..765eb714f3 100644 --- a/java/cpp.cluster.testprofile +++ b/java/cpp.cluster.testprofile @@ -3,3 +3,6 @@ broker=${project.root}/../cpp/src/qpidd --load-module ${project.root}/../cpp/src test.excludesfile=${project.root}/ExcludeList ${project.root}/XAExcludeList ${project.root}/010ExcludeList profile.clustered=true +profile.failoverMsgCount=10 +profile.failoverIterations=10 +profile.failoverRandomSeed=20080921 diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java index 3a1fb50725..cd921f0971 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -21,35 +21,36 @@ package org.apache.qpid.test.client.failover; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.test.utils.FailoverBaseCase; -import org.apache.log4j.Logger; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import javax.jms.Queue; import javax.naming.NamingException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.test.utils.FailoverBaseCase; public class FailoverTest extends FailoverBaseCase implements ConnectionListener { private static final Logger _logger = Logger.getLogger(FailoverTest.class); private static final String QUEUE = "queue"; - private static final int NUM_MESSAGES = 10; + private static final int DEFAULT_NUM_MESSAGES = 10; + private static final int DEFAULT_SEED = 20080921; + private int numMessages = 0; private Connection connnection; private Session producerSession; private Queue queue; @@ -61,12 +62,18 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener private CountDownLatch failoverComplete; private static final long DEFAULT_FAILOVER_TIME = 10000L; private boolean CLUSTERED = Boolean.getBoolean("profile.clustered"); - + private int seed; + private Random rand; + @Override protected void setUp() throws Exception { super.setUp(); - + + numMessages = Integer.getInteger("profile.failoverMsgCount",DEFAULT_NUM_MESSAGES); + seed = Integer.getInteger("profile.failoverRandomSeed",DEFAULT_SEED); + rand = new Random(seed); + connnection = getConnection(); ((AMQConnection) connnection).setConnectionListener(this); connnection.start(); @@ -99,26 +106,46 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener super.tearDown(); } - private void consumeMessages(int toConsume, boolean transacted) throws JMSException + private void consumeMessages(int startIndex,int endIndex, boolean transacted) throws JMSException { Message msg; - for (int i = 0; i < toConsume; i++) + _logger.debug("**************** Receive (Start: " + startIndex + ", End:" + endIndex + ")***********************"); + + for (int i = startIndex; i < endIndex; i++) { - msg = consumer.receive(1000); + msg = consumer.receive(1000); assertNotNull("Message " + i + " was null!", msg); - assertEquals("message " + i, ((TextMessage) msg).getText()); + + _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"); + _logger.debug("Received : " + ((TextMessage) msg).getText()); + _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"); + + assertEquals("Invalid message order","message " + i, ((TextMessage) msg).getText()); + } - if (transacted) { + _logger.debug("***********************************************************"); + + if (transacted) + { consumerSession.commit(); } } - private void sendMessages(int totalMessages, boolean transacted) throws JMSException + private void sendMessages(int startIndex,int endIndex, boolean transacted) throws JMSException { - for (int i = 0; i < totalMessages; i++) - { + _logger.debug("**************** Send (Start: " + startIndex + ", End:" + endIndex + ")***********************"); + + for (int i = startIndex; i < endIndex; i++) + { producer.send(producerSession.createTextMessage("message " + i)); + + _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"); + _logger.debug("Sending message"+i); + _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"); } + + _logger.debug("***********************************************************"); + if (transacted) { producerSession.commit(); @@ -127,34 +154,78 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener public void testP2PFailover() throws Exception { - testP2PFailover(NUM_MESSAGES, true, false); + testP2PFailover(numMessages, true,true, false); } - public void testP2PFailoverWithMessagesLeft() throws Exception + public void testP2PFailoverWithMessagesLeftToConsumeAndProduce() throws Exception { - testP2PFailover(NUM_MESSAGES, false, false); + if (CLUSTERED) + { + testP2PFailover(numMessages, false,false, false); + } } - + + public void testP2PFailoverWithMessagesLeftToConsume() throws Exception + { + if (CLUSTERED) + { + testP2PFailover(numMessages, false,true, false); + } + } + public void testP2PFailoverTransacted() throws Exception { - testP2PFailover(NUM_MESSAGES, true, false); + testP2PFailover(numMessages, true,true, false); } - private void testP2PFailover(int totalMessages, boolean consumeAll, boolean transacted) throws JMSException, NamingException + public void testP2PFailoverTransactedWithMessagesLeftToConsumeAndProduce() throws Exception { - Message msg = null; + // Currently the cluster does not support transactions that span a failover + if (CLUSTERED) + { + testP2PFailover(numMessages, false,false, false); + } + } + + private void testP2PFailover(int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws JMSException, NamingException + { init(transacted, Session.AUTO_ACKNOWLEDGE); - sendMessages(totalMessages, transacted); + runP2PFailover(totalMessages,consumeAll, produceAll , transacted); + } + + private void runP2PFailover(int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws JMSException, NamingException + { + Message msg = null; + int toProduce = totalMessages; + + _logger.debug("==================================================================="); + _logger.debug("Total messages used for the test " + totalMessages + " messages"); + _logger.debug("==================================================================="); + + if (!produceAll) + { + toProduce = totalMessages - rand.nextInt(totalMessages); + } + + _logger.debug("=================="); + _logger.debug("Sending " + toProduce + " messages"); + _logger.debug("=================="); + + sendMessages(0,toProduce, transacted); // Consume some messages - int toConsume = totalMessages; + int toConsume = toProduce; if (!consumeAll) { - toConsume = totalMessages / 2; + toConsume = toProduce - rand.nextInt(toProduce); } + + consumeMessages(0,toConsume, transacted); - consumeMessages(toConsume, transacted); - + _logger.debug("=================="); + _logger.debug("Consuming " + toConsume + " messages"); + _logger.debug("=================="); + _logger.info("Failing over"); causeFailure(DEFAULT_FAILOVER_TIME); @@ -165,9 +236,17 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener assertNull("Should not have received message from new broker!", msg); } - // Check that messages still sent / received - sendMessages(totalMessages, transacted); - consumeMessages(totalMessages, transacted); + // Check that you produce and consume the rest of messages. + _logger.debug("=================="); + _logger.debug("Sending " + (totalMessages-toProduce) + " messages"); + _logger.debug("=================="); + + sendMessages(toProduce,totalMessages, transacted); + consumeMessages(toConsume,totalMessages, transacted); + + _logger.debug("=================="); + _logger.debug("Consuming " + (totalMessages-toConsume) + " messages"); + _logger.debug("=================="); } private void causeFailure(long delay) @@ -188,11 +267,11 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener //evil ignore IE. } } - + public void testClientAckFailover() throws Exception { init(false, Session.CLIENT_ACKNOWLEDGE); - sendMessages(1, false); + sendMessages(0,1, false); Message msg = consumer.receive(); assertNotNull("Expected msgs not received", msg); @@ -216,6 +295,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener * * @throws Exception if something unexpected occurs in the test. */ + public void test4MinuteFailover() throws Exception { ConnectionURL connectionURL = getConnectionFactory().getConnectionURL(); diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index 2a44c444e0..159bc04502 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.test.utils; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.server.registry.ApplicationRegistry; - import javax.jms.Connection; public class FailoverBaseCase extends QpidTestCase @@ -33,7 +30,7 @@ public class FailoverBaseCase extends QpidTestCase private boolean failedOver = false; - private int getFailingPort() + protected int getFailingPort() { if (_broker.equals(VM)) { @@ -90,4 +87,16 @@ public class FailoverBaseCase extends QpidTestCase throw new RuntimeException(e); } } + + protected void setFailingPort(int p) + { + if (_broker.equals(VM)) + { + FAILING_VM_PORT = p; + } + else + { + FAILING_PORT = p; + } + } } diff --git a/java/test-provider.properties b/java/test-provider.properties index 8066256f4f..5e2ab9c9cf 100644 --- a/java/test-provider.properties +++ b/java/test-provider.properties @@ -23,7 +23,7 @@ connectionfactory.default = amqp://username:password@clientid/test?brokerlist='t connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1' connectionfactory.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5671?ssl='true'' -connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5673;tcp://localhost:5672' +connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5673;tcp://localhost:5672'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1' connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672' connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5673' |
