summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java253
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java68
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java39
-rw-r--r--qpid/java/test-profiles/test-provider.properties5
7 files changed, 192 insertions, 204 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
index dfb5cde247..83f0f87bc5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
@@ -63,7 +63,6 @@ import java.util.concurrent.TimeUnit;
*/
public class DeepQueueConsumeWithSelector extends QpidTestCase implements MessageListener
{
- private static final String INDEX = "index";
private static final int MESSAGE_COUNT = 10000;
private static final int BATCH_SIZE = MESSAGE_COUNT / 10;
@@ -129,9 +128,7 @@ public class DeepQueueConsumeWithSelector extends QpidTestCase implements Messag
@Override
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
- Message message = session.createTextMessage("Message :" + msgCount);
-
- message.setIntProperty(INDEX, msgCount);
+ Message message = super.createNextMessage(session,msgCount);
if ((msgCount % BATCH_SIZE) == 0 )
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index 2e625f95c0..f9cf48a2b1 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -497,7 +497,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
if (msgCount == failPoint)
{
- failBroker();
+ failBroker(getFailingPort());
}
}
@@ -529,7 +529,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
sendMessages("connection2", messages);
}
- failBroker();
+ failBroker(getFailingPort());
checkQueueDepth(messages);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index dfc3bb7b42..c307176f3f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -37,7 +37,6 @@ import javax.naming.NamingException;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
@@ -58,13 +57,12 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
private Session consumerSession;
private MessageConsumer consumer;
- private static int usedBrokers = 0;
private CountDownLatch failoverComplete;
- private static final long DEFAULT_FAILOVER_TIME = 10000L;
private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
private int seed;
private Random rand;
-
+ private int _currentPort = getFailingPort();
+
@Override
protected void setUp() throws Exception
{
@@ -227,7 +225,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.info("Failing over");
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(_currentPort, DEFAULT_FAILOVER_TIME);
// Check that you produce and consume the rest of messages.
_logger.debug("==================");
@@ -242,10 +240,10 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.debug("==================");
}
- private void causeFailure(long delay)
+ private void causeFailure(int port, long delay)
{
- failBroker();
+ failBroker(port);
_logger.info("Awaiting Failover completion");
try
@@ -268,7 +266,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME);
Exception failure = null;
try
@@ -314,7 +312,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
//Fail the first broker
- causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
//Reconnection should occur
assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
@@ -344,15 +342,15 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.debug("===================================================================");
runP2PFailover(numMessages, false,false, false);
- startBroker(getFailingPort());
+ startBroker(_currentPort);
if (useAltPort)
{
- setFailingPort(altPort);
+ _currentPort = altPort;
useAltPort = false;
}
else
{
- setFailingPort(stdPort);
+ _currentPort = stdPort;
useAltPort = true;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 7434fcbb30..4a123cb1dc 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -23,8 +23,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.test.utils.QpidTestCase;
-
+import org.apache.qpid.test.utils.FailoverBaseCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,16 +34,21 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
-
import java.util.concurrent.atomic.AtomicInteger;
-public class RecoverTest extends QpidTestCase
+public class RecoverTest extends FailoverBaseCase
{
- private static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
+ static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
private Exception _error;
private AtomicInteger count;
+ protected AMQConnection _connection;
+ protected Session _consumerSession;
+ protected MessageConsumer _consumer;
+ static final int SENT_COUNT = 4;
+
+ @Override
protected void setUp() throws Exception
{
super.setUp();
@@ -52,134 +56,110 @@ public class RecoverTest extends QpidTestCase
count = new AtomicInteger();
}
- protected void tearDown() throws Exception
+ protected void initTest() throws Exception
{
- super.tearDown();
- count = null;
- }
+ _connection = (AMQConnection) getConnection("guest", "guest");
- public void testRecoverResendsMsgs() throws Exception
- {
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"),
- new AMQShortString("someQ"), false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- // force synch to ensure the consumer has resulted in a bound queue
- // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- // This is the default now
+ _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = _consumerSession.createQueue(getTestQueueName());
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ _consumer = _consumerSession.createConsumer(queue);
_logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
-
- con2.close();
-
+ sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT);
_logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- tm.acknowledge();
- _logger.info("Received and acknowledged first message");
- consumer.receive();
- consumer.receive();
- consumer.receive();
- _logger.info("Received all four messages. Calling recover with three outstanding messages");
- // no ack for last three messages so when I call recover I expect to get three messages back
- consumerSession.recover();
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg2", tm.getText());
+ _connection.start();
+ }
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm.getText());
+ protected Message validateNextMessages(int nextCount, int startIndex) throws JMSException
+ {
+ Message message = null;
+ for (int index = 0; index < nextCount; index++)
+ {
+ message = _consumer.receive(3000);
+ assertEquals(startIndex + index, message.getIntProperty(INDEX));
+ }
+ return message;
+ }
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm.getText());
+ protected void validateRemainingMessages(int remaining) throws JMSException
+ {
+ int index = SENT_COUNT - remaining;
- _logger.info("Received redelivery of three messages. Acknowledging last message");
- tm.acknowledge();
+ Message message = null;
+ while (index != SENT_COUNT)
+ {
+ message = _consumer.receive(3000);
+ assertEquals(index++, message.getIntProperty(INDEX));
+ }
+
+ if (message != null)
+ {
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ message.acknowledge();
+ }
_logger.info("Calling acknowledge with no outstanding messages");
// all acked so no messages to be delivered
- consumerSession.recover();
+ _consumerSession.recover();
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
+ message = _consumer.receiveNoWait();
+ assertNull(message);
_logger.info("No messages redelivered as is expected");
-
- con.close();
}
- public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ public void testRecoverResendsMsgs() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ initTest();
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"),
- new AMQShortString("someQ"), false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- // force synch to ensure the consumer has resulted in a bound queue
- // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- // This is the default now
+ Message message = validateNextMessages(1, 0);
+ message.acknowledge();
+ _logger.info("Received and acknowledged first message");
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ _consumer.receive();
+ _consumer.receive();
+ _consumer.receive();
+ _logger.info("Received all four messages. Calling recover with three outstanding messages");
+ // no ack for last three messages so when I call recover I expect to get three messages back
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
+ _consumerSession.recover();
- con2.close();
+ validateRemainingMessages(3);
+ }
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- consumer.receive();
- tm.acknowledge();
+ public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ {
+ initTest();
+
+ Message message = validateNextMessages(2, 0);
+ message.acknowledge();
_logger.info("Received 2 messages, acknowledge() first message, should acknowledge both");
- consumer.receive();
- consumer.receive();
+ _consumer.receive();
+ _consumer.receive();
_logger.info("Received all four messages. Calling recover with two outstanding messages");
// no ack for last three messages so when I call recover I expect to get three messages back
- consumerSession.recover();
- TextMessage tm3 = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm3.getText());
+ _consumerSession.recover();
+
+ Message message2 = _consumer.receive(3000);
+ assertEquals(2, message2.getIntProperty(INDEX));
- TextMessage tm4 = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm4.getText());
+ Message message3 = _consumer.receive(3000);
+ assertEquals(3, message3.getIntProperty(INDEX));
_logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
- ((org.apache.qpid.jms.Message) tm3).acknowledgeThis();
+ ((org.apache.qpid.jms.Message) message2).acknowledgeThis();
_logger.info("Calling recover");
// all acked so no messages to be delivered
- consumerSession.recover();
+ _consumerSession.recover();
- tm4 = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm4.getText());
- ((org.apache.qpid.jms.Message) tm4).acknowledgeThis();
+ message3 = _consumer.receive(3000);
+ assertEquals(3, message3.getIntProperty(INDEX));
+ ((org.apache.qpid.jms.Message) message3).acknowledgeThis();
- _logger.info("Calling recover");
// all acked so no messages to be delivered
- consumerSession.recover();
-
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
- _logger.info("No messages redelivered as is expected");
-
- con.close();
+ validateRemainingMessages(0);
}
public void testAcknowledgePerConsumer() throws Exception
@@ -188,11 +168,11 @@ public class RecoverTest extends QpidTestCase
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"),
+ false, true);
Queue queue2 =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"),
+ false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
@@ -231,8 +211,8 @@ public class RecoverTest extends QpidTestCase
final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"),
+ false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageProducer producer = consumerSession.createProducer(queue);
producer.send(consumerSession.createTextMessage("hello"));
@@ -240,50 +220,50 @@ public class RecoverTest extends QpidTestCase
final Object lock = new Object();
consumer.setMessageListener(new MessageListener()
- {
+ {
- public void onMessage(Message message)
+ public void onMessage(Message message)
+ {
+ try
{
- try
+ count.incrementAndGet();
+ if (count.get() == 1)
{
- count.incrementAndGet();
- if (count.get() == 1)
+ if (message.getJMSRedelivered())
{
- if (message.getJMSRedelivered())
- {
- setError(
+ setError(
new Exception("Message marked as redilvered on what should be first delivery attempt"));
- }
-
- consumerSession.recover();
}
- else if (count.get() == 2)
+
+ consumerSession.recover();
+ }
+ else if (count.get() == 2)
+ {
+ if (!message.getJMSRedelivered())
{
- if (!message.getJMSRedelivered())
- {
- setError(
+ setError(
new Exception(
- "Message not marked as redilvered on what should be second delivery attempt"));
- }
- }
- else
- {
- System.err.println(message);
- fail("Message delivered too many times!: " + count);
+ "Message not marked as redilvered on what should be second delivery attempt"));
}
}
- catch (JMSException e)
+ else
{
- _logger.error("Error recovering session: " + e, e);
- setError(e);
+ System.err.println(message);
+ fail("Message delivered too many times!: " + count);
}
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error recovering session: " + e, e);
+ setError(e);
+ }
- synchronized (lock)
- {
- lock.notify();
- }
+ synchronized (lock)
+ {
+ lock.notify();
}
- });
+ }
+ });
con.start();
@@ -323,9 +303,4 @@ public class RecoverTest extends QpidTestCase
{
_error = e;
}
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(RecoverTest.class);
- }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index 8ca65988b5..5b5bb4a6a2 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -22,45 +22,41 @@ package org.apache.qpid.test.utils;
import org.apache.qpid.util.FileUtils;
-import javax.jms.Connection;
+import javax.naming.NamingException;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FailoverBaseCase extends QpidTestCase
{
-
-<<<<<<< HEAD:qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
-=======
protected static final Logger _logger = LoggerFactory.getLogger(FailoverBaseCase.class);
->>>>>>> be4ef1c... Update to FBC to ensure second broker is shutdown in the event of an exception during super.tearDown. This may have been the cause of CI stuck brokers:qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
public static int FAILING_VM_PORT = 2;
public static int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt"));
+ public static final long DEFAULT_FAILOVER_TIME = 10000L;
protected int failingPort;
-
- private boolean failedOver = false;
- public FailoverBaseCase()
+ protected int getFailingPort()
{
if (_broker.equals(VM))
{
- failingPort = FAILING_VM_PORT;
+ return FAILING_VM_PORT;
}
else
{
- failingPort = FAILING_PORT;
+ return FAILING_PORT;
}
}
-
- protected int getFailingPort()
- {
- return failingPort;
- }
protected void setUp() throws java.lang.Exception
{
super.setUp();
- setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK")+"/"+getFailingPort());
- startBroker(failingPort);
+ // Set QPID_WORK to $QPID_WORK/<getFailingPort()>
+ // or /tmp/<getFailingPort()> if QPID_WORK not set.
+ setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort());
+ startBroker(getFailingPort());
}
/**
@@ -69,16 +65,25 @@ public class FailoverBaseCase extends QpidTestCase
* @return a connection
* @throws Exception
*/
- public Connection getConnection() throws Exception
+ @Override
+ public AMQConnectionFactory getConnectionFactory() throws NamingException
{
- Connection conn =
- (Boolean.getBoolean("profile.use_ssl"))?
- getConnectionFactory("failover.ssl").createConnection("guest", "guest"):
- getConnectionFactory("failover").createConnection("guest", "guest");
- _connections.add(conn);
- return conn;
+ _logger.info("get ConnectionFactory");
+ if (_connectionFactory == null)
+ {
+ if (Boolean.getBoolean("profile.use_ssl"))
+ {
+ _connectionFactory = getConnectionFactory("failover.ssl");
+ }
+ else
+ {
+ _connectionFactory = getConnectionFactory("failover");
+ }
+ }
+ return _connectionFactory;
}
+
public void tearDown() throws Exception
{
try
@@ -95,24 +100,17 @@ public class FailoverBaseCase extends QpidTestCase
}
- /**
- * Only used of VM borker.
- */
- public void failBroker()
+ public void failBroker(int port)
{
- failedOver = true;
try
{
- stopBroker(getFailingPort());
+ stopBroker(port);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
-
- protected void setFailingPort(int p)
- {
- failingPort = p;
- }
+
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
index 95ce3a06b5..a908286fc9 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -22,8 +22,10 @@ import junit.framework.TestResult;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
@@ -81,6 +83,8 @@ public class QpidTestCase extends TestCase
private XMLConfiguration _testConfiguration = new XMLConfiguration();
+ protected static final String INDEX = "index";
+
/**
* Some tests are excluded when the property test.excludes is set to true.
* An exclusion list is either a file (prop test.excludesfile) which contains one test name
@@ -183,7 +187,7 @@ public class QpidTestCase extends TestCase
private Map<Integer, Process> _brokers = new HashMap<Integer, Process>();
private InitialContext _initialContext;
- private AMQConnectionFactory _connectionFactory;
+ protected AMQConnectionFactory _connectionFactory;
private String _testName;
@@ -193,8 +197,6 @@ public class QpidTestCase extends TestCase
public static final String TOPIC = "topic";
/** Map to hold test defined environment properties */
private Map<String, String> _env;
- protected static final String INDEX = "index";
- ;
public QpidTestCase(String name)
{
@@ -929,7 +931,7 @@ public class QpidTestCase extends TestCase
{
if (Boolean.getBoolean("profile.use_ssl"))
{
- _connectionFactory = getConnectionFactory("ssl");
+ _connectionFactory = getConnectionFactory("default.ssl");
}
else
{
@@ -1019,6 +1021,17 @@ public class QpidTestCase extends TestCase
return getClass().getSimpleName() + "-" + getName();
}
+ /**
+ * Return a Queue specific for this test.
+ * Uses getTestQueueName() as the name of the queue
+ * @return
+ */
+ public Queue getTestQueue()
+ {
+ return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName());
+ }
+
+
protected void tearDown() throws java.lang.Exception
{
try
@@ -1070,17 +1083,23 @@ public class QpidTestCase extends TestCase
public List<Message> sendMessage(Session session, Destination destination,
int count) throws Exception
{
- return sendMessage(session, destination, count, 0);
+ return sendMessage(session, destination, count, 0, 0);
}
public List<Message> sendMessage(Session session, Destination destination,
int count, int batchSize) throws Exception
{
+ return sendMessage(session, destination, count, 0, batchSize);
+ }
+
+ public List<Message> sendMessage(Session session, Destination destination,
+ int count, int offset, int batchSize) throws Exception
+ {
List<Message> messages = new ArrayList<Message>(count);
MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < count; i++)
+ for (int i = offset; i < (count + offset); i++)
{
Message next = createNextMessage(session, i);
@@ -1099,8 +1118,11 @@ public class QpidTestCase extends TestCase
}
// Ensure we commit the last messages
- if (session.getTransacted() && (batchSize > 0) &&
- (count / batchSize != 0))
+ // Commit the session if we are transacted and
+ // we have no batchSize or
+ // our count is not divible by batchSize.
+ if (session.getTransacted() &&
+ ( batchSize == 0 || count % batchSize != 0))
{
session.commit();
}
@@ -1111,7 +1133,6 @@ public class QpidTestCase extends TestCase
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
Message message = session.createMessage();
-
message.setIntProperty(INDEX, msgCount);
return message;
diff --git a/qpid/java/test-profiles/test-provider.properties b/qpid/java/test-profiles/test-provider.properties
index a349b0fcbf..70a2672263 100644
--- a/qpid/java/test-profiles/test-provider.properties
+++ b/qpid/java/test-profiles/test-provider.properties
@@ -29,14 +29,13 @@ test.port.alt.ssl=25671
connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
+connectionfactory.default.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true''
connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'
-connectionfactory.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true''
connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-
connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-
connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'
+
connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}'
connectionfactory.connection1.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'