diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-10-05 15:02:24 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-10-05 15:02:24 +0000 |
| commit | 41628c7e879480eeb59cf29b235976e092cf1fee (patch) | |
| tree | ade09daa682d76ee903228e5e23f5ccf6b91a7c9 /java | |
| parent | 74ebccda962ff1ae8998bcc317b1b0f79a7eb523 (diff) | |
| download | qpid-python-41628c7e879480eeb59cf29b235976e092cf1fee.tar.gz | |
QPID-1816 : Updated AcknowledgeTests to cover all ack modes and the synchronous receive and asynchronous onMessage approaches. Also added tests that failover between acking. Currently these tests are setup to repoulate the broker between failover. However, in a clustered environment that would not be requried. Investigation is need on how best to setup failover tests in general so that clustered and non-clustered environments can be easily tested.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@821822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
5 files changed, 601 insertions, 88 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java new file mode 100644 index 0000000000..4b45a96c20 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.FailoverBaseCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +public class Acknowledge2ConsumersTest extends FailoverBaseCase +{ + protected static int NUM_MESSAGES = 100; + protected Connection _con; + protected Queue _queue; + private Session _producerSession; + private Session _consumerSession; + private MessageConsumer _consumerA; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _queue = (Queue) getInitialContext().lookup("queue"); + + //Create Producer put some messages on the queue + _con = getConnection(); + } + + private void init(boolean transacted, int mode) throws JMSException + { + _producerSession = _con.createSession(true, Session.SESSION_TRANSACTED); + _consumerSession = _con.createSession(transacted, mode); + _consumerA = _consumerSession.createConsumer(_queue); + _con.start(); + } + + /** + * Produces Messages that + * + * @param transacted + * @param mode + * + * @throws Exception + */ + private void test2ConsumersAcking(boolean transacted, int mode) throws Exception + { + init(transacted, mode); + + // These should all end up being prefetched by sessionA + sendMessage(_producerSession, _queue, NUM_MESSAGES / 2); + + //Create a second consumer (consumerB) to consume some of the messages + MessageConsumer consumerB = _consumerSession.createConsumer(_queue); + + // These messages should be roundrobined between A and B + sendMessage(_producerSession, _queue, NUM_MESSAGES / 2); + + int count = 0; + //Use consumerB to receive messages it has + Message msg = consumerB.receive(1500); + while (msg != null) + { + if (mode == Session.CLIENT_ACKNOWLEDGE) + { + msg.acknowledge(); + } + count++; + msg = consumerB.receive(1500); + } + if (transacted) + { + _consumerSession.commit(); + } + + // Close the consumers + _consumerA.close(); + consumerB.close(); + + // and close the session to release any prefetched messages. + _consumerSession.close(); + assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, + ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue)); + + // Clean up messages that may be left on the queue + _consumerSession = _con.createSession(transacted, mode); + _consumerA = _consumerSession.createConsumer(_queue); + msg = _consumerA.receive(1500); + while (msg != null) + { + if (mode == Session.CLIENT_ACKNOWLEDGE) + { + msg.acknowledge(); + } + msg = _consumerA.receive(1500); + } + _consumerA.close(); + if (transacted) + { + _consumerSession.commit(); + } + _consumerSession.close(); + } + + public void test2ConsumersAutoAck() throws Exception + { + test2ConsumersAcking(false, Session.AUTO_ACKNOWLEDGE); + } + + public void test2ConsumersClientAck() throws Exception + { + test2ConsumersAcking(false, Session.CLIENT_ACKNOWLEDGE); + } + + public void test2ConsumersTx() throws Exception + { + test2ConsumersAcking(true, Session.SESSION_TRANSACTED); + } + + + +// +// /** +// * Check that session level acknowledge does correctly ack all previous +// * values. Send 3 messages(0,1,2) then ack 1 and 2. If session ack is +// * working correctly then acking 1 will also ack 0. Acking 2 will not +// * attempt to re-ack 0 and 1. +// * +// * @throws Exception +// */ +// public void testSessionAck() throws Exception +// { +// init(false, Session.CLIENT_ACKNOWLEDGE); +// +// sendMessage(_producerSession, _queue, 3); +// Message msg; +// +// // Drop msg 0 +// _consumerA.receive(RECEIVE_TIMEOUT); +// +// // Take msg 1 +// msg = _consumerA.receive(RECEIVE_TIMEOUT); +// +// assertNotNull("Message 1 not correctly received.", msg); +// assertEquals("Incorrect message received", 1, msg.getIntProperty(INDEX)); +// +// // This should also ack msg 0 +// msg.acknowledge(); +// +// // Take msg 2 +// msg = _consumerA.receive(RECEIVE_TIMEOUT); +// +// assertNotNull("Message 2 not correctly received.", msg); +// assertEquals("Incorrect message received", 2, msg.getIntProperty(INDEX)); +// +// // This should just ack msg 2 +// msg.acknowledge(); +// +// _consumerA.close(); +// _consumerSession.close(); +// +// assertEquals("Queue not empty.", 0, +// ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue)); +// _con.close(); +// +// +// } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java new file mode 100644 index 0000000000..66ebec76e7 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQDestination; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.Message; +import javax.jms.JMSException; + +public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest +{ + + @Override + public void setUp() throws Exception + { + super.setUp(); + NUM_MESSAGES = 10; + } + + protected void prepBroker(int count) throws Exception + { + //Stop the connection whilst we repopulate the broker, or the no_ack + // test will drain the msgs before we can check we put the right number + // back on again. +// _connection.stop(); + + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + sendMessage(session, _queue, count, NUM_MESSAGES - count, 0); + + if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE) + { + assertEquals("Wrong number of messages on queue", count, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + } + + connection.close(); + +// _connection.start(); + } + + @Override + public void doAcknowlegement(Message msg) throws JMSException + { + //Acknowledge current message + super.doAcknowlegement(msg); + + int msgCount = msg.getIntProperty(INDEX); + + if (msgCount % 2 == 0) + { + failBroker(getFailingPort()); + } + else + { + failBroker(getPort()); + } + + try + { + prepBroker(NUM_MESSAGES - msgCount - 1); + } + catch (Exception e) + { + fail("Unable to prep new broker," + e.getMessage()); + } + + try + { + + if (msgCount % 2 == 0) + { + startBroker(getFailingPort()); + } + else + { + startBroker(getPort()); + } + } + catch (Exception e) + { + fail("Unable to start failover broker," + e.getMessage()); + } + + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java new file mode 100644 index 0000000000..cad54131f7 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; + +public class AcknowledgeAfterFailoverTest extends AcknowledgeTest +{ + + @Override + public void setUp() throws Exception + { + super.setUp(); + NUM_MESSAGES = 10; + } + + protected void prepBroker(int count) throws Exception + { + //Stop the connection whilst we repopulate the broker, or the no_ack + // test will drain the msgs before we can check we put the right number + // back on again. +// _connection.stop(); + + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + sendMessage(session, _queue, count, NUM_MESSAGES - count, 0); + + if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE) + { + assertEquals("Wrong number of messages on queue", count, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + } + + connection.close(); + +// _connection.start(); + } + + @Override + public void doAcknowlegement(Message msg) throws JMSException + { + //Acknowledge current message + super.doAcknowlegement(msg); + + int msgCount = msg.getIntProperty(INDEX); + + if (msgCount % 2 == 0) + { + failBroker(getFailingPort()); + } + else + { + failBroker(getPort()); + } + + try + { + prepBroker(NUM_MESSAGES - msgCount - 1); + } + catch (Exception e) + { + fail("Unable to prep new broker," + e.getMessage()); + } + + try + { + if (msgCount % 2 == 0) + { + startBroker(getFailingPort()); + } + else + { + startBroker(getPort()); + } + } + catch (Exception e) + { + fail("Unable to start failover broker," + e.getMessage()); + } + + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java new file mode 100644 index 0000000000..6558fb4d09 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; + +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener +{ + private CountDownLatch _receviedAll; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _receviedAll = new CountDownLatch(NUM_MESSAGES); + } + + /** + * @param transacted + * @param mode + * + * @throws Exception + */ + protected void testAcking(boolean transacted, int mode) throws Exception + { + init(transacted, mode); + + _consumer.setMessageListener(this); + + _connection.start(); + + if (!_receviedAll.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS)) + { + fail("failover did not complete"); + } + + _consumer.close(); + _consumerSession.close(); + + assertEquals("Wrong number of messages on queue", 0, + ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue)); + } + + public void onMessage(Message message) + { + try + { + int count = NUM_MESSAGES - (int) _receviedAll.getCount(); + + assertEquals("Incorrect message received", count, message.getIntProperty(INDEX)); + + count++; + if (count < NUM_MESSAGES) + { + //Send the next message + _producer.send(createNextMessage(_consumerSession, count)); + } + + doAcknowlegement(message); + + _receviedAll.countDown(); + } + catch (Exception e) + { + fail(e); + } + } + + protected void fail(Exception e) + { + + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java index c367a0856c..f67efa668a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java @@ -1,7 +1,5 @@ -package org.apache.qpid.test.unit.ack; - /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,133 +19,138 @@ package org.apache.qpid.test.unit.ack; * */ +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.FailoverBaseCase; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.MessageProducer; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.test.utils.QpidTestCase; - -public class AcknowledgeTest extends QpidTestCase +public class AcknowledgeTest extends FailoverBaseCase { - protected static int NUM_MESSAGES = 100; - protected Connection _con; + protected int NUM_MESSAGES; + protected Connection _connection; protected Queue _queue; - private MessageProducer _producer; - private Session _producerSession; - private Session _consumerSession; - private MessageConsumer _consumerA; + protected Session _consumerSession; + protected MessageConsumer _consumer; + protected MessageProducer _producer; @Override protected void setUp() throws Exception { super.setUp(); - _queue = (Queue) getInitialContext().lookup("queue"); + NUM_MESSAGES = 10; + + _queue = getTestQueue(); //Create Producer put some messages on the queue - _con = getConnection(); - _con.start(); + _connection = getConnection(); } - private void init(boolean transacted, int mode) throws JMSException { - _producerSession = _con.createSession(true, Session.AUTO_ACKNOWLEDGE); - _consumerSession = _con.createSession(transacted, mode); - _producer = _producerSession.createProducer(_queue); - _consumerA = _consumerSession.createConsumer(_queue); - } + protected void init(boolean transacted, int mode) throws Exception + { + _consumerSession = _connection.createSession(transacted, mode); + _consumer = _consumerSession.createConsumer(_queue); + _producer = _consumerSession.createProducer(_queue); + + // These should all end up being prefetched by session + sendMessage(_consumerSession, _queue, 1); + + assertEquals("Wrong number of messages on queue", 1, + ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); + } /** - * Produces and consumes messages an either ack or commit the receipt of those messages - * * @param transacted * @param mode + * * @throws Exception */ - private void testMessageAck(boolean transacted, int mode) throws Exception + protected void testAcking(boolean transacted, int mode) throws Exception { - init(transacted, mode); - sendMessage(_producerSession, _queue, NUM_MESSAGES/2); - _producerSession.commit(); - MessageConsumer consumerB = _consumerSession.createConsumer(_queue); - sendMessage(_producerSession, _queue, NUM_MESSAGES/2); - _producerSession.commit(); + init(transacted, mode); + + _connection.start(); + + Message msg = _consumer.receive(1500); + int count = 0; - Message msg = consumerB.receive(1500); - while (msg != null) + while (count < NUM_MESSAGES) { - if (mode == Session.CLIENT_ACKNOWLEDGE) + assertNotNull("Message " + count + " not correctly received.", msg); + assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX)); + count++; + + if (count < NUM_MESSAGES) { - msg.acknowledge(); + //Send the next message + _producer.send(createNextMessage(_consumerSession, count)); } - count++; - msg = consumerB.receive(1500); + + doAcknowlegement(msg); + + msg = _consumer.receive(1500); } - if (transacted) - { - _consumerSession.commit(); - } - _consumerA.close(); - consumerB.close(); - _consumerSession.close(); - assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, - ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue)); - - // Clean up messages that may be left on the queue - _consumerSession = _con.createSession(transacted, mode); - _consumerA = _consumerSession.createConsumer(_queue); - msg = _consumerA.receive(1500); - while (msg != null) + + assertEquals("Wrong number of messages on queue", 0, + ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); + } + + /** + * Perform the acknowledgement of messages if additionally required. + * + * @param msg + * + * @throws JMSException + */ + protected void doAcknowlegement(Message msg) throws JMSException + { + if (_consumerSession.getTransacted()) { - if (mode == Session.CLIENT_ACKNOWLEDGE) - { - msg.acknowledge(); - } - msg = _consumerA.receive(1500); + _consumerSession.commit(); } - _consumerA.close(); - if (transacted) + + if (_consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { - _consumerSession.commit(); + msg.acknowledge(); } - _consumerSession.close(); } - - public void test2ConsumersAutoAck() throws Exception + + public void testClientAck() throws Exception { - testMessageAck(false, Session.AUTO_ACKNOWLEDGE); + testAcking(false, Session.CLIENT_ACKNOWLEDGE); } - public void test2ConsumersClientAck() throws Exception + public void testAutoAck() throws Exception { - testMessageAck(true, Session.CLIENT_ACKNOWLEDGE); + testAcking(false, Session.AUTO_ACKNOWLEDGE); } - - public void test2ConsumersTx() throws Exception + + public void testTransacted() throws Exception { - testMessageAck(true, Session.AUTO_ACKNOWLEDGE); + testAcking(true, Session.SESSION_TRANSACTED); } - - public void testIndividualAck() throws Exception + + public void testDupsOk() throws Exception { - init(false, Session.CLIENT_ACKNOWLEDGE); - sendMessage(_producerSession, _queue, 3); - _producerSession.commit(); - Message msg = null; - for (int i = 0; i < 2; i++) - { - msg = _consumerA.receive(RECEIVE_TIMEOUT); - ((AbstractJMSMessage)msg).acknowledgeThis(); - } - msg = _consumerA.receive(RECEIVE_TIMEOUT); - msg.acknowledge(); - _con.close(); + testAcking(false, Session.DUPS_OK_ACKNOWLEDGE); } - + + public void testNoAck() throws Exception + { + testAcking(false, AMQSession.NO_ACKNOWLEDGE); + } + + public void testPreAck() throws Exception + { + testAcking(false, AMQSession.PRE_ACKNOWLEDGE); + } + } |
