diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2010-04-12 15:42:04 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2010-04-12 15:42:04 +0000 |
| commit | cee13443091a714e837429beab62a2c09e0911d9 (patch) | |
| tree | 703e47f611a97b537b791bd959dbc292cf5d4ad1 /qpid/java | |
| parent | c5e59c94f2a675b1fd5695590b585cb7ed05b99a (diff) | |
| download | qpid-python-cee13443091a714e837429beab62a2c09e0911d9.tar.gz | |
QPID-2346 : Addressed the problems with AcknowledgeAfterFailoverOnMessageTest, The issues were the same as AckAfterFailoverTest. So used same prepBroker approach. Test also need timeout increased as broke restarts took to long for a message to be sent/received. Finally the last change was thatthe queue needed to be re-declared so that the final queue depth check would have a queue to query.
Merged from 0.5.x-dev @ r907004
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@933281 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
4 files changed, 188 insertions, 52 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 49f94edef7..b5ad42d8e1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -486,6 +486,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null); } + + // Add creation logging to tie in with the existing close logging + if (_logger.isInfoEnabled()) + { + _logger.info("Created session:" + this); + } } /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java index 7c5db290c4..d73d012250 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java @@ -24,6 +24,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.util.FileUtils; import javax.jms.Connection; import javax.jms.JMSException; @@ -33,7 +34,25 @@ import javax.jms.Session; import javax.jms.TransactionRolledBackException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.io.File; +/** + * The AcknowlegeAfterFailoverOnMessageTests + * + * Extends the OnMessage AcknowledgeTests to validate that after the client has + * failed over that the client can still receive and ack messages. + * + * All the AcknowledgeTest ack modes are exercised here though some are disabled + * due to know issues (e.g. DupsOk, AutoAck : QPID-143 and the clientAck + * and dirtyClientAck due to QPID-1816) + * + * This class has two main test structures, overrides of AcknowledgeOnMessageTest + * to perform the clean acking based on session ack mode and a series of dirty + * ack tests that test what happends if you receive a message then try and ack + * AFTER you have failed over. + * + * + */ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest implements ConnectionListener { @@ -68,61 +87,96 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT } } - protected void prepBroker(int count) throws Exception - { - //Stop the connection whilst we repopulate the broker, or the no_ack - // test will drain the msgs before we can check we put the right number - // back on again. -// _connection.stop(); - - Connection connection = getConnection(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - // ensure destination is created. - session.createConsumer(_queue).close(); - - sendMessage(session, _queue, count, NUM_MESSAGES - count, 0); - - if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE) - { - assertEquals("Wrong number of messages on queue", count, - ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); - } - - connection.close(); - -// _connection.start(); - } - - @Override - public void doAcknowlegement(Message msg) throws JMSException + /** + * Prepare the broker for the next round. + * + * Called after acknowledging the messsage this method shuts the current + * broker down connnects to the new broker and send a new message for the + * client to failover to and receive. + * + * It ends by restarting the orignal broker so that the cycle can repeat. + * + * When we are able to cluster the java broker then will not need to do the + * message repopulation or QPID_WORK clearing. All that we will need to do + * is send the initial NUM_MESSAGES during startup and then bring the + * brokers down at the right time to cause the client to fail between them. + * + * @param index + * @throws Exception + */ + protected void prepBroker(int index) throws Exception { - //Acknowledge current message - super.doAcknowlegement(msg); + // Alternate killing the broker based on the message index we are at. - int msgCount = msg.getIntProperty(INDEX); - - if (msgCount % 2 == 0) + if (index % 2 == 0) { failBroker(getFailingPort()); + // Clean up the failed broker + FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getFailingPort()), true); } else { failBroker(getPort()); + // Clean up the failed broker + FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getPort()), true); } + _failoverCompleted = new CountDownLatch(1); + + _logger.info("AAFOMT: prepNewBroker for message send"); + Connection connection = getConnection(); + try { - prepBroker(NUM_MESSAGES - msgCount - 1); + + //Stop the connection whilst we repopulate the broker, or the no_ack + // test will drain the msgs before we can check we put the right number + // back on again. + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + + // If this is the last message then we can skip the send. + // But we MUST ensure that we have created the queue with the + // above createConsumer(_queue).close() as the test will end by + // testing the queue depth which will fail if we don't ensure we + // declare the queue. + // index is 0 based so we need to check +1 against NUM_MESSAGES + if ((index + 1) == NUM_MESSAGES) + { + return; + } + + + sendMessage(session, _queue, 1, index + 1, 0); + + // Validate that we have the message on the queue + // In NoAck mode though the messasge may already have been sent to + // the client so we have to skip the vaildation. + if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE) + { + assertEquals("Wrong number of messages on queue", 1, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + } + + } catch (Exception e) { fail("Unable to prep new broker," + e.getMessage()); } + finally + { + connection.close(); + } try { - if (msgCount % 2 == 0) + //Restart the broker + if (index % 2 == 0) { startBroker(getFailingPort()); } @@ -138,8 +192,27 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT } - int msgCount = 0; - boolean cleaned = false; + @Override + public void doAcknowlegement(Message msg) throws JMSException + { + //Acknowledge current message + super.doAcknowlegement(msg); + + try + { + prepBroker(msg.getIntProperty(INDEX)); + } + catch (Exception e) + { + // Provide details of what went wrong with the stack trace + e.printStackTrace(); + fail("Unable to prep new broker," + e); + } + } + + // Instance varilable for DirtyAcking test + int _msgCount = 0; + boolean _cleaned = false; class DirtyAckingHandler implements MessageListener { @@ -164,10 +237,10 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT try { // Check we have the next message as expected - assertNotNull("Message " + msgCount + " not correctly received.", message); - assertEquals("Incorrect message received", msgCount, message.getIntProperty(INDEX)); + assertNotNull("Message " + _msgCount + " not correctly received.", message); + assertEquals("Incorrect message received", _msgCount, message.getIntProperty(INDEX)); - if (msgCount == 0 && _failoverCompleted.getCount() != 0) + if (_msgCount == 0 && _failoverCompleted.getCount() != 0) { // This is the first message we've received so lets fail the broker @@ -180,16 +253,16 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT return; } - msgCount++; + _msgCount++; // Don't acknowlege the first message after failover so we can commit // them together - if (msgCount == 1) + if (_msgCount == 1) { - _logger.error("Received first msg after failover ignoring:" + msgCount); + _logger.error("Received first msg after failover ignoring:" + _msgCount); // Acknowledge the first message if we are now on the cleaned pass - if (cleaned) + if (_cleaned) { _receivedAll.countDown(); } @@ -202,7 +275,7 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT try { _consumerSession.commit(); - if (!cleaned) + if (!_cleaned) { fail("Session is dirty we should get an TransactionRolledBackException"); } @@ -217,7 +290,7 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT try { message.acknowledge(); - if (!cleaned) + if (!_cleaned) { fail("Session is dirty we should get an IllegalStateException"); } @@ -232,14 +305,14 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT // Acknowledge the last message if we are in a clean state // this will then trigger test teardown. - if (cleaned) + if (_cleaned) { _receivedAll.countDown(); } //Reset message count so we can try again. - msgCount = 0; - cleaned = true; + _msgCount = 0; + _cleaned = true; } catch (Exception e) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java index 8de1053fc0..1b4407f255 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java @@ -32,6 +32,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +/** + * This test extends the synchronous AcknowledgeTest to use a MessageListener + * and receive messages asynchronously. + */ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener { protected CountDownLatch _receivedAll; @@ -43,6 +47,13 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message super.setUp(); } + /** + * Override the synchronous AcknowledgeTest init to provide the _receivedAll + * CountDownLatch init and ensure that we set the MessageListener. + * @param transacted + * @param mode + * @throws Exception + */ @Override public void init(boolean transacted, int mode) throws Exception { @@ -53,11 +64,25 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message } /** + * This test overrides the testAcking from the simple recieve() model to all + * for asynchronous receiving of messages. + * + * Again the transaction/ack mode is provided to this main test run + * + * The init method is called which will setup the listener so that we can + * then sit and await using the _receivedAll CountDownLatch. We wait for up + * to 10s if no messages have been received in the last 10s then test will + * fail. + * + * If the test fails then it will attempt to retrieve any exception that the + * asynchronous delivery thread may have recorded. + * * @param transacted * @param mode * * @throws Exception */ + @Override protected void testAcking(boolean transacted, int mode) throws Exception { init(transacted, mode); @@ -69,7 +94,7 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message int lastCount = NUM_MESSAGES; // Wait for messages to arrive - boolean complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS); + boolean complete = _receivedAll.await(10000L, TimeUnit.MILLISECONDS); // If the messasges haven't arrived while (!complete) @@ -90,7 +115,7 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message lastCount = currentCount; // Wait again for messages to arrive. - complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS); + complete = _receivedAll.await(10000L, TimeUnit.MILLISECONDS); } // If we failed to receive all the messages then fail the test. @@ -105,6 +130,7 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message } else { + _logger.info("AOMT: Check QueueDepth:" + _queue); long onQueue=((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue); fail("All messages not received missing:" + _receivedAll.getCount() + "/" + NUM_MESSAGES+" On Queue:"+onQueue); @@ -136,14 +162,27 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message _consumerSession.close(); + _logger.info("AOMT: check number of message at end of test."); assertEquals("Wrong number of messages on queue", 0, ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue)); } + /** + * The MessageListener interface that recieves the message and counts down + * the _receivedAll CountDownLatch. + * + * Again like AcknowledgeTest acknowledgement is actually handled in + * doAcknowlegement. + * + * The message INDEX is validated to ensure the correct message order is + * preserved. + * + * @param message + */ public void onMessage(Message message) { // Log received Message for debugging - System.out.println("RECEIVED MESSAGE:" + message); + _logger.info("RECEIVED MESSAGE:" + message); try { @@ -164,7 +203,7 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message } catch (Exception e) { - // This will end the test run by counting down _receviedAll + // This will end the test run by counting down _receivedAll fail(e); } } @@ -176,6 +215,7 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message */ protected void fail(Exception e) { + //record the failure _causeOfFailure.set(e); // End the test. while (_receivedAll.getCount() != 0) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java index 36731107c5..efea57e5d2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java @@ -33,6 +33,12 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.MessageProducer; +/** + * Test the various JMS Acknowledge Modes the single testAcking method does all + * the work of receiving and validation of acking. + * + * The ack mode is provided from the various test methods. + */ public class AcknowledgeTest extends FailoverBaseCase { protected int NUM_MESSAGES; @@ -50,6 +56,7 @@ public class AcknowledgeTest extends FailoverBaseCase _queue = getTestQueue(); + _logger.info("AT: setup"); //Create Producer put some messages on the queue _connection = getConnection(); } @@ -68,6 +75,16 @@ public class AcknowledgeTest extends FailoverBaseCase } /** + * The main test method. + * + * Receive the initial message and then proceed to send and ack messages + * until we have processed NUM_MESSAGES worth of messages. + * + * Each message is tagged with an INDEX value and these are used to check + * that the messages are received in the correct order. + * + * The test concludes by validating that the queue depth is 0 as expected. + * * @param transacted * @param mode * |
