diff options
Diffstat (limited to 'qpid/java')
7 files changed, 192 insertions, 204 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java index dfb5cde247..83f0f87bc5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java @@ -63,7 +63,6 @@ import java.util.concurrent.TimeUnit; */ public class DeepQueueConsumeWithSelector extends QpidTestCase implements MessageListener { - private static final String INDEX = "index"; private static final int MESSAGE_COUNT = 10000; private static final int BATCH_SIZE = MESSAGE_COUNT / 10; @@ -129,9 +128,7 @@ public class DeepQueueConsumeWithSelector extends QpidTestCase implements Messag @Override public Message createNextMessage(Session session, int msgCount) throws JMSException { - Message message = session.createTextMessage("Message :" + msgCount); - - message.setIntProperty(INDEX, msgCount); + Message message = super.createNextMessage(session,msgCount); if ((msgCount % BATCH_SIZE) == 0 ) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index 2e625f95c0..f9cf48a2b1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -497,7 +497,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase if (msgCount == failPoint) { - failBroker(); + failBroker(getFailingPort()); } } @@ -529,7 +529,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase sendMessages("connection2", messages); } - failBroker(); + failBroker(getFailingPort()); checkQueueDepth(messages); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java index dfc3bb7b42..c307176f3f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -37,7 +37,6 @@ import javax.naming.NamingException; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; @@ -58,13 +57,12 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener private Session consumerSession; private MessageConsumer consumer; - private static int usedBrokers = 0; private CountDownLatch failoverComplete; - private static final long DEFAULT_FAILOVER_TIME = 10000L; private boolean CLUSTERED = Boolean.getBoolean("profile.clustered"); private int seed; private Random rand; - + private int _currentPort = getFailingPort(); + @Override protected void setUp() throws Exception { @@ -227,7 +225,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener _logger.info("Failing over"); - causeFailure(DEFAULT_FAILOVER_TIME); + causeFailure(_currentPort, DEFAULT_FAILOVER_TIME); // Check that you produce and consume the rest of messages. _logger.debug("=================="); @@ -242,10 +240,10 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener _logger.debug("=================="); } - private void causeFailure(long delay) + private void causeFailure(int port, long delay) { - failBroker(); + failBroker(port); _logger.info("Awaiting Failover completion"); try @@ -268,7 +266,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener Message msg = consumer.receive(); assertNotNull("Expected msgs not received", msg); - causeFailure(DEFAULT_FAILOVER_TIME); + causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME); Exception failure = null; try @@ -314,7 +312,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000; //Fail the first broker - causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME); + causeFailure(getFailingPort(), FAILOVER_DELAY + DEFAULT_FAILOVER_TIME); //Reconnection should occur assertTrue("Failover did not take long enough", System.nanoTime() > failTime); @@ -344,15 +342,15 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener _logger.debug("==================================================================="); runP2PFailover(numMessages, false,false, false); - startBroker(getFailingPort()); + startBroker(_currentPort); if (useAltPort) { - setFailingPort(altPort); + _currentPort = altPort; useAltPort = false; } else { - setFailingPort(stdPort); + _currentPort = stdPort; useAltPort = true; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 7434fcbb30..4a123cb1dc 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -23,8 +23,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Session; -import org.apache.qpid.test.utils.QpidTestCase; - +import org.apache.qpid.test.utils.FailoverBaseCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,16 +34,21 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.TextMessage; - import java.util.concurrent.atomic.AtomicInteger; -public class RecoverTest extends QpidTestCase +public class RecoverTest extends FailoverBaseCase { - private static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class); + static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class); private Exception _error; private AtomicInteger count; + protected AMQConnection _connection; + protected Session _consumerSession; + protected MessageConsumer _consumer; + static final int SENT_COUNT = 4; + + @Override protected void setUp() throws Exception { super.setUp(); @@ -52,134 +56,110 @@ public class RecoverTest extends QpidTestCase count = new AtomicInteger(); } - protected void tearDown() throws Exception + protected void initTest() throws Exception { - super.tearDown(); - count = null; - } + _connection = (AMQConnection) getConnection("guest", "guest"); - public void testRecoverResendsMsgs() throws Exception - { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), - new AMQShortString("someQ"), false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - // force synch to ensure the consumer has resulted in a bound queue - // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - // This is the default now + _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = _consumerSession.createQueue(getTestQueueName()); - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); + _consumer = _consumerSession.createConsumer(queue); _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - + sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT); _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - tm.acknowledge(); - _logger.info("Received and acknowledged first message"); - consumer.receive(); - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. Calling recover with three outstanding messages"); - // no ack for last three messages so when I call recover I expect to get three messages back - consumerSession.recover(); - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg2", tm.getText()); + _connection.start(); + } - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg3", tm.getText()); + protected Message validateNextMessages(int nextCount, int startIndex) throws JMSException + { + Message message = null; + for (int index = 0; index < nextCount; index++) + { + message = _consumer.receive(3000); + assertEquals(startIndex + index, message.getIntProperty(INDEX)); + } + return message; + } - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm.getText()); + protected void validateRemainingMessages(int remaining) throws JMSException + { + int index = SENT_COUNT - remaining; - _logger.info("Received redelivery of three messages. Acknowledging last message"); - tm.acknowledge(); + Message message = null; + while (index != SENT_COUNT) + { + message = _consumer.receive(3000); + assertEquals(index++, message.getIntProperty(INDEX)); + } + + if (message != null) + { + _logger.info("Received redelivery of three messages. Acknowledging last message"); + message.acknowledge(); + } _logger.info("Calling acknowledge with no outstanding messages"); // all acked so no messages to be delivered - consumerSession.recover(); + _consumerSession.recover(); - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); + message = _consumer.receiveNoWait(); + assertNull(message); _logger.info("No messages redelivered as is expected"); - - con.close(); } - public void testRecoverResendsMsgsAckOnEarlier() throws Exception + public void testRecoverResendsMsgs() throws Exception { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + initTest(); - Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), - new AMQShortString("someQ"), false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - // force synch to ensure the consumer has resulted in a bound queue - // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - // This is the default now + Message message = validateNextMessages(1, 0); + message.acknowledge(); + _logger.info("Received and acknowledged first message"); - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); + _consumer.receive(); + _consumer.receive(); + _consumer.receive(); + _logger.info("Received all four messages. Calling recover with three outstanding messages"); + // no ack for last three messages so when I call recover I expect to get three messages back - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); + _consumerSession.recover(); - con2.close(); + validateRemainingMessages(3); + } - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - consumer.receive(); - tm.acknowledge(); + public void testRecoverResendsMsgsAckOnEarlier() throws Exception + { + initTest(); + + Message message = validateNextMessages(2, 0); + message.acknowledge(); _logger.info("Received 2 messages, acknowledge() first message, should acknowledge both"); - consumer.receive(); - consumer.receive(); + _consumer.receive(); + _consumer.receive(); _logger.info("Received all four messages. Calling recover with two outstanding messages"); // no ack for last three messages so when I call recover I expect to get three messages back - consumerSession.recover(); - TextMessage tm3 = (TextMessage) consumer.receive(3000); - assertEquals("msg3", tm3.getText()); + _consumerSession.recover(); + + Message message2 = _consumer.receive(3000); + assertEquals(2, message2.getIntProperty(INDEX)); - TextMessage tm4 = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm4.getText()); + Message message3 = _consumer.receive(3000); + assertEquals(3, message3.getIntProperty(INDEX)); _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message"); - ((org.apache.qpid.jms.Message) tm3).acknowledgeThis(); + ((org.apache.qpid.jms.Message) message2).acknowledgeThis(); _logger.info("Calling recover"); // all acked so no messages to be delivered - consumerSession.recover(); + _consumerSession.recover(); - tm4 = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm4.getText()); - ((org.apache.qpid.jms.Message) tm4).acknowledgeThis(); + message3 = _consumer.receive(3000); + assertEquals(3, message3.getIntProperty(INDEX)); + ((org.apache.qpid.jms.Message) message3).acknowledgeThis(); - _logger.info("Calling recover"); // all acked so no messages to be delivered - consumerSession.recover(); - - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - - con.close(); + validateRemainingMessages(0); } public void testAcknowledgePerConsumer() throws Exception @@ -188,11 +168,11 @@ public class RecoverTest extends QpidTestCase Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), - false, true); + new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), + false, true); Queue queue2 = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"), - false, true); + new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"), + false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageConsumer consumer2 = consumerSession.createConsumer(queue2); @@ -231,8 +211,8 @@ public class RecoverTest extends QpidTestCase final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"), - false, true); + new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"), + false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageProducer producer = consumerSession.createProducer(queue); producer.send(consumerSession.createTextMessage("hello")); @@ -240,50 +220,50 @@ public class RecoverTest extends QpidTestCase final Object lock = new Object(); consumer.setMessageListener(new MessageListener() - { + { - public void onMessage(Message message) + public void onMessage(Message message) + { + try { - try + count.incrementAndGet(); + if (count.get() == 1) { - count.incrementAndGet(); - if (count.get() == 1) + if (message.getJMSRedelivered()) { - if (message.getJMSRedelivered()) - { - setError( + setError( new Exception("Message marked as redilvered on what should be first delivery attempt")); - } - - consumerSession.recover(); } - else if (count.get() == 2) + + consumerSession.recover(); + } + else if (count.get() == 2) + { + if (!message.getJMSRedelivered()) { - if (!message.getJMSRedelivered()) - { - setError( + setError( new Exception( - "Message not marked as redilvered on what should be second delivery attempt")); - } - } - else - { - System.err.println(message); - fail("Message delivered too many times!: " + count); + "Message not marked as redilvered on what should be second delivery attempt")); } } - catch (JMSException e) + else { - _logger.error("Error recovering session: " + e, e); - setError(e); + System.err.println(message); + fail("Message delivered too many times!: " + count); } + } + catch (JMSException e) + { + _logger.error("Error recovering session: " + e, e); + setError(e); + } - synchronized (lock) - { - lock.notify(); - } + synchronized (lock) + { + lock.notify(); } - }); + } + }); con.start(); @@ -323,9 +303,4 @@ public class RecoverTest extends QpidTestCase { _error = e; } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(RecoverTest.class); - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index 8ca65988b5..5b5bb4a6a2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -22,45 +22,41 @@ package org.apache.qpid.test.utils; import org.apache.qpid.util.FileUtils; -import javax.jms.Connection; +import javax.naming.NamingException; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FailoverBaseCase extends QpidTestCase { - -<<<<<<< HEAD:qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java -======= protected static final Logger _logger = LoggerFactory.getLogger(FailoverBaseCase.class); ->>>>>>> be4ef1c... Update to FBC to ensure second broker is shutdown in the event of an exception during super.tearDown. This may have been the cause of CI stuck brokers:qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java public static int FAILING_VM_PORT = 2; public static int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt")); + public static final long DEFAULT_FAILOVER_TIME = 10000L; protected int failingPort; - - private boolean failedOver = false; - public FailoverBaseCase() + protected int getFailingPort() { if (_broker.equals(VM)) { - failingPort = FAILING_VM_PORT; + return FAILING_VM_PORT; } else { - failingPort = FAILING_PORT; + return FAILING_PORT; } } - - protected int getFailingPort() - { - return failingPort; - } protected void setUp() throws java.lang.Exception { super.setUp(); - setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK")+"/"+getFailingPort()); - startBroker(failingPort); + // Set QPID_WORK to $QPID_WORK/<getFailingPort()> + // or /tmp/<getFailingPort()> if QPID_WORK not set. + setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort()); + startBroker(getFailingPort()); } /** @@ -69,16 +65,25 @@ public class FailoverBaseCase extends QpidTestCase * @return a connection * @throws Exception */ - public Connection getConnection() throws Exception + @Override + public AMQConnectionFactory getConnectionFactory() throws NamingException { - Connection conn = - (Boolean.getBoolean("profile.use_ssl"))? - getConnectionFactory("failover.ssl").createConnection("guest", "guest"): - getConnectionFactory("failover").createConnection("guest", "guest"); - _connections.add(conn); - return conn; + _logger.info("get ConnectionFactory"); + if (_connectionFactory == null) + { + if (Boolean.getBoolean("profile.use_ssl")) + { + _connectionFactory = getConnectionFactory("failover.ssl"); + } + else + { + _connectionFactory = getConnectionFactory("failover"); + } + } + return _connectionFactory; } + public void tearDown() throws Exception { try @@ -95,24 +100,17 @@ public class FailoverBaseCase extends QpidTestCase } - /** - * Only used of VM borker. - */ - public void failBroker() + public void failBroker(int port) { - failedOver = true; try { - stopBroker(getFailingPort()); + stopBroker(port); } catch (Exception e) { throw new RuntimeException(e); } } - - protected void setFailingPort(int p) - { - failingPort = p; - } + + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java index 95ce3a06b5..a908286fc9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -22,8 +22,10 @@ import junit.framework.TestResult; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; @@ -81,6 +83,8 @@ public class QpidTestCase extends TestCase private XMLConfiguration _testConfiguration = new XMLConfiguration(); + protected static final String INDEX = "index"; + /** * Some tests are excluded when the property test.excludes is set to true. * An exclusion list is either a file (prop test.excludesfile) which contains one test name @@ -183,7 +187,7 @@ public class QpidTestCase extends TestCase private Map<Integer, Process> _brokers = new HashMap<Integer, Process>(); private InitialContext _initialContext; - private AMQConnectionFactory _connectionFactory; + protected AMQConnectionFactory _connectionFactory; private String _testName; @@ -193,8 +197,6 @@ public class QpidTestCase extends TestCase public static final String TOPIC = "topic"; /** Map to hold test defined environment properties */ private Map<String, String> _env; - protected static final String INDEX = "index"; - ; public QpidTestCase(String name) { @@ -929,7 +931,7 @@ public class QpidTestCase extends TestCase { if (Boolean.getBoolean("profile.use_ssl")) { - _connectionFactory = getConnectionFactory("ssl"); + _connectionFactory = getConnectionFactory("default.ssl"); } else { @@ -1019,6 +1021,17 @@ public class QpidTestCase extends TestCase return getClass().getSimpleName() + "-" + getName(); } + /** + * Return a Queue specific for this test. + * Uses getTestQueueName() as the name of the queue + * @return + */ + public Queue getTestQueue() + { + return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName()); + } + + protected void tearDown() throws java.lang.Exception { try @@ -1070,17 +1083,23 @@ public class QpidTestCase extends TestCase public List<Message> sendMessage(Session session, Destination destination, int count) throws Exception { - return sendMessage(session, destination, count, 0); + return sendMessage(session, destination, count, 0, 0); } public List<Message> sendMessage(Session session, Destination destination, int count, int batchSize) throws Exception { + return sendMessage(session, destination, count, 0, batchSize); + } + + public List<Message> sendMessage(Session session, Destination destination, + int count, int offset, int batchSize) throws Exception + { List<Message> messages = new ArrayList<Message>(count); MessageProducer producer = session.createProducer(destination); - for (int i = 0; i < count; i++) + for (int i = offset; i < (count + offset); i++) { Message next = createNextMessage(session, i); @@ -1099,8 +1118,11 @@ public class QpidTestCase extends TestCase } // Ensure we commit the last messages - if (session.getTransacted() && (batchSize > 0) && - (count / batchSize != 0)) + // Commit the session if we are transacted and + // we have no batchSize or + // our count is not divible by batchSize. + if (session.getTransacted() && + ( batchSize == 0 || count % batchSize != 0)) { session.commit(); } @@ -1111,7 +1133,6 @@ public class QpidTestCase extends TestCase public Message createNextMessage(Session session, int msgCount) throws JMSException { Message message = session.createMessage(); - message.setIntProperty(INDEX, msgCount); return message; diff --git a/qpid/java/test-profiles/test-provider.properties b/qpid/java/test-profiles/test-provider.properties index a349b0fcbf..70a2672263 100644 --- a/qpid/java/test-profiles/test-provider.properties +++ b/qpid/java/test-profiles/test-provider.properties @@ -29,14 +29,13 @@ test.port.alt.ssl=25671 connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}' +connectionfactory.default.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true'' connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1' -connectionfactory.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true'' connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' - connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' - connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1' + connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}' connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}' connectionfactory.connection1.vm = amqp://username:password@clientid/test?brokerlist='vm://:1' |
