summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2009-02-25 23:18:54 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2009-02-25 23:18:54 +0000
commit58336d355bea1c1001356bbab67d4b758ef6b079 (patch)
treee07229bcab424476b78c3ecf134d1081f6c5617b /java
parentd626716ccce8244e6b565c3acc176eb69a472cf6 (diff)
downloadqpid-python-58336d355bea1c1001356bbab67d4b758ef6b079.tar.gz
This is related to QPID-1640
Please read the JIRA for more details. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747961 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/cpp.cluster.testprofile3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java162
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java17
-rw-r--r--java/test-provider.properties2
4 files changed, 138 insertions, 46 deletions
diff --git a/java/cpp.cluster.testprofile b/java/cpp.cluster.testprofile
index 1807ae098b..765eb714f3 100644
--- a/java/cpp.cluster.testprofile
+++ b/java/cpp.cluster.testprofile
@@ -3,3 +3,6 @@ broker=${project.root}/../cpp/src/qpidd --load-module ${project.root}/../cpp/src
test.excludesfile=${project.root}/ExcludeList ${project.root}/XAExcludeList ${project.root}/010ExcludeList
profile.clustered=true
+profile.failoverMsgCount=10
+profile.failoverIterations=10
+profile.failoverRandomSeed=20080921
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index 3a1fb50725..cd921f0971 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -21,35 +21,36 @@
package org.apache.qpid.test.client.failover;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.test.utils.FailoverBaseCase;
-import org.apache.log4j.Logger;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.Queue;
import javax.naming.NamingException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.FailoverBaseCase;
public class FailoverTest extends FailoverBaseCase implements ConnectionListener
{
private static final Logger _logger = Logger.getLogger(FailoverTest.class);
private static final String QUEUE = "queue";
- private static final int NUM_MESSAGES = 10;
+ private static final int DEFAULT_NUM_MESSAGES = 10;
+ private static final int DEFAULT_SEED = 20080921;
+ private int numMessages = 0;
private Connection connnection;
private Session producerSession;
private Queue queue;
@@ -61,12 +62,18 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
private CountDownLatch failoverComplete;
private static final long DEFAULT_FAILOVER_TIME = 10000L;
private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
-
+ private int seed;
+ private Random rand;
+
@Override
protected void setUp() throws Exception
{
super.setUp();
-
+
+ numMessages = Integer.getInteger("profile.failoverMsgCount",DEFAULT_NUM_MESSAGES);
+ seed = Integer.getInteger("profile.failoverRandomSeed",DEFAULT_SEED);
+ rand = new Random(seed);
+
connnection = getConnection();
((AMQConnection) connnection).setConnectionListener(this);
connnection.start();
@@ -99,26 +106,46 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
super.tearDown();
}
- private void consumeMessages(int toConsume, boolean transacted) throws JMSException
+ private void consumeMessages(int startIndex,int endIndex, boolean transacted) throws JMSException
{
Message msg;
- for (int i = 0; i < toConsume; i++)
+ _logger.debug("**************** Receive (Start: " + startIndex + ", End:" + endIndex + ")***********************");
+
+ for (int i = startIndex; i < endIndex; i++)
{
- msg = consumer.receive(1000);
+ msg = consumer.receive(1000);
assertNotNull("Message " + i + " was null!", msg);
- assertEquals("message " + i, ((TextMessage) msg).getText());
+
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+ _logger.debug("Received : " + ((TextMessage) msg).getText());
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+
+ assertEquals("Invalid message order","message " + i, ((TextMessage) msg).getText());
+
}
- if (transacted) {
+ _logger.debug("***********************************************************");
+
+ if (transacted)
+ {
consumerSession.commit();
}
}
- private void sendMessages(int totalMessages, boolean transacted) throws JMSException
+ private void sendMessages(int startIndex,int endIndex, boolean transacted) throws JMSException
{
- for (int i = 0; i < totalMessages; i++)
- {
+ _logger.debug("**************** Send (Start: " + startIndex + ", End:" + endIndex + ")***********************");
+
+ for (int i = startIndex; i < endIndex; i++)
+ {
producer.send(producerSession.createTextMessage("message " + i));
+
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+ _logger.debug("Sending message"+i);
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
}
+
+ _logger.debug("***********************************************************");
+
if (transacted)
{
producerSession.commit();
@@ -127,34 +154,78 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
public void testP2PFailover() throws Exception
{
- testP2PFailover(NUM_MESSAGES, true, false);
+ testP2PFailover(numMessages, true,true, false);
}
- public void testP2PFailoverWithMessagesLeft() throws Exception
+ public void testP2PFailoverWithMessagesLeftToConsumeAndProduce() throws Exception
{
- testP2PFailover(NUM_MESSAGES, false, false);
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,false, false);
+ }
}
-
+
+ public void testP2PFailoverWithMessagesLeftToConsume() throws Exception
+ {
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,true, false);
+ }
+ }
+
public void testP2PFailoverTransacted() throws Exception
{
- testP2PFailover(NUM_MESSAGES, true, false);
+ testP2PFailover(numMessages, true,true, false);
}
- private void testP2PFailover(int totalMessages, boolean consumeAll, boolean transacted) throws JMSException, NamingException
+ public void testP2PFailoverTransactedWithMessagesLeftToConsumeAndProduce() throws Exception
{
- Message msg = null;
+ // Currently the cluster does not support transactions that span a failover
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,false, false);
+ }
+ }
+
+ private void testP2PFailover(int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws JMSException, NamingException
+ {
init(transacted, Session.AUTO_ACKNOWLEDGE);
- sendMessages(totalMessages, transacted);
+ runP2PFailover(totalMessages,consumeAll, produceAll , transacted);
+ }
+
+ private void runP2PFailover(int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws JMSException, NamingException
+ {
+ Message msg = null;
+ int toProduce = totalMessages;
+
+ _logger.debug("===================================================================");
+ _logger.debug("Total messages used for the test " + totalMessages + " messages");
+ _logger.debug("===================================================================");
+
+ if (!produceAll)
+ {
+ toProduce = totalMessages - rand.nextInt(totalMessages);
+ }
+
+ _logger.debug("==================");
+ _logger.debug("Sending " + toProduce + " messages");
+ _logger.debug("==================");
+
+ sendMessages(0,toProduce, transacted);
// Consume some messages
- int toConsume = totalMessages;
+ int toConsume = toProduce;
if (!consumeAll)
{
- toConsume = totalMessages / 2;
+ toConsume = toProduce - rand.nextInt(toProduce);
}
+
+ consumeMessages(0,toConsume, transacted);
- consumeMessages(toConsume, transacted);
-
+ _logger.debug("==================");
+ _logger.debug("Consuming " + toConsume + " messages");
+ _logger.debug("==================");
+
_logger.info("Failing over");
causeFailure(DEFAULT_FAILOVER_TIME);
@@ -165,9 +236,17 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
assertNull("Should not have received message from new broker!", msg);
}
- // Check that messages still sent / received
- sendMessages(totalMessages, transacted);
- consumeMessages(totalMessages, transacted);
+ // Check that you produce and consume the rest of messages.
+ _logger.debug("==================");
+ _logger.debug("Sending " + (totalMessages-toProduce) + " messages");
+ _logger.debug("==================");
+
+ sendMessages(toProduce,totalMessages, transacted);
+ consumeMessages(toConsume,totalMessages, transacted);
+
+ _logger.debug("==================");
+ _logger.debug("Consuming " + (totalMessages-toConsume) + " messages");
+ _logger.debug("==================");
}
private void causeFailure(long delay)
@@ -188,11 +267,11 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
//evil ignore IE.
}
}
-
+
public void testClientAckFailover() throws Exception
{
init(false, Session.CLIENT_ACKNOWLEDGE);
- sendMessages(1, false);
+ sendMessages(0,1, false);
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
@@ -216,6 +295,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
*
* @throws Exception if something unexpected occurs in the test.
*/
+
public void test4MinuteFailover() throws Exception
{
ConnectionURL connectionURL = getConnectionFactory().getConnectionURL();
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index 2a44c444e0..159bc04502 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.test.utils;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-
import javax.jms.Connection;
public class FailoverBaseCase extends QpidTestCase
@@ -33,7 +30,7 @@ public class FailoverBaseCase extends QpidTestCase
private boolean failedOver = false;
- private int getFailingPort()
+ protected int getFailingPort()
{
if (_broker.equals(VM))
{
@@ -90,4 +87,16 @@ public class FailoverBaseCase extends QpidTestCase
throw new RuntimeException(e);
}
}
+
+ protected void setFailingPort(int p)
+ {
+ if (_broker.equals(VM))
+ {
+ FAILING_VM_PORT = p;
+ }
+ else
+ {
+ FAILING_PORT = p;
+ }
+ }
}
diff --git a/java/test-provider.properties b/java/test-provider.properties
index 8066256f4f..5e2ab9c9cf 100644
--- a/java/test-provider.properties
+++ b/java/test-provider.properties
@@ -23,7 +23,7 @@ connectionfactory.default = amqp://username:password@clientid/test?brokerlist='t
connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'
connectionfactory.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5671?ssl='true''
-connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5673;tcp://localhost:5672'
+connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5673;tcp://localhost:5672'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'
connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:5673'