From 430b2744ac7cc37cef55215423fc87db9943744a Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Fri, 8 Feb 2008 18:30:04 +0000 Subject: made xa tests run, and made QpidTestCase more robust git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@619974 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/test/unit/xa/AbstractXATest.java | 131 -- .../qpid/test/unit/xa/AbstractXATestCase.java | 131 ++ .../org/apache/qpid/test/unit/xa/QueueTest.java | 643 ++++++++ .../org/apache/qpid/test/unit/xa/QueueTests.java | 643 -------- .../org/apache/qpid/test/unit/xa/TopicTest.java | 1708 ++++++++++++++++++++ .../org/apache/qpid/test/unit/xa/TopicTests.java | 1708 -------------------- .../org/apache/qpid/testutil/QpidTestCase.java | 30 +- 7 files changed, 2503 insertions(+), 2491 deletions(-) delete mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java create mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java create mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java delete mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java create mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java delete mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java (limited to 'java/client/src/test') diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java b/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java deleted file mode 100644 index ba4ebae258..0000000000 --- a/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATest.java +++ /dev/null @@ -1,131 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.test.unit.xa; - -import org.apache.qpidity.dtx.XidImpl; -import org.apache.qpid.testutil.QpidTestCase; - -import javax.transaction.xa.Xid; -import javax.transaction.xa.XAResource; -import javax.jms.*; - -/** - * - * - */ -public abstract class AbstractXATest extends QpidTestCase -{ - protected static final String _sequenceNumberPropertyName = "seqNumber"; - - /** - * the xaResource associated with the standard session - */ - protected static XAResource _xaResource = null; - - /** - * producer registered with the standard session - */ - protected static MessageProducer _producer = null; - - /** - * consumer registered with the standard session - */ - protected static MessageConsumer _consumer = null; - - /** - * a standard message - */ - protected static TextMessage _message = null; - - /** - * xid counter - */ - private static int _xidCounter = 0; - - - protected void setUp() throws Exception - { - super.setUp(); - init(); - } - - public abstract void init(); - - - - /** - * construct a new Xid - * - * @return a new Xid - */ - protected Xid getNewXid() - { - byte[] branchQualifier; - byte[] globalTransactionID; - int format = _xidCounter; - String branchQualifierSt = "branchQualifier" + _xidCounter; - String globalTransactionIDSt = "globalTransactionID" + _xidCounter; - branchQualifier = branchQualifierSt.getBytes(); - globalTransactionID = globalTransactionIDSt.getBytes(); - _xidCounter++; - return new XidImpl(branchQualifier, format, globalTransactionID); - } - - public void init(XASession session, Destination destination) - { - // get the xaResource - try - { - _xaResource = session.getXAResource(); - } - catch (Exception e) - { - fail("cannot access the xa resource: " + e.getMessage()); - } - // create standard producer - try - { - _producer = session.createProducer(destination); - _producer.setDeliveryMode(DeliveryMode.PERSISTENT); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("cannot create message producer: " + e.getMessage()); - } - // create standard consumer - try - { - _consumer = session.createConsumer(destination); - } - catch (JMSException e) - { - fail("cannot create message consumer: " + e.getMessage()); - } - // create a standard message - try - { - _message = session.createTextMessage(); - _message.setText("test XA"); - } - catch (JMSException e) - { - fail("cannot create standard message: " + e.getMessage()); - } - } -} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java b/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java new file mode 100644 index 0000000000..7c03e16258 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java @@ -0,0 +1,131 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.unit.xa; + +import org.apache.qpidity.dtx.XidImpl; +import org.apache.qpid.testutil.QpidTestCase; + +import javax.transaction.xa.Xid; +import javax.transaction.xa.XAResource; +import javax.jms.*; + +/** + * + * + */ +public abstract class AbstractXATestCase extends QpidTestCase +{ + protected static final String _sequenceNumberPropertyName = "seqNumber"; + + /** + * the xaResource associated with the standard session + */ + protected static XAResource _xaResource = null; + + /** + * producer registered with the standard session + */ + protected static MessageProducer _producer = null; + + /** + * consumer registered with the standard session + */ + protected static MessageConsumer _consumer = null; + + /** + * a standard message + */ + protected static TextMessage _message = null; + + /** + * xid counter + */ + private static int _xidCounter = 0; + + + protected void setUp() throws Exception + { + super.setUp(); + init(); + } + + public abstract void init(); + + + + /** + * construct a new Xid + * + * @return a new Xid + */ + protected Xid getNewXid() + { + byte[] branchQualifier; + byte[] globalTransactionID; + int format = _xidCounter; + String branchQualifierSt = "branchQualifier" + _xidCounter; + String globalTransactionIDSt = "globalTransactionID" + _xidCounter; + branchQualifier = branchQualifierSt.getBytes(); + globalTransactionID = globalTransactionIDSt.getBytes(); + _xidCounter++; + return new XidImpl(branchQualifier, format, globalTransactionID); + } + + public void init(XASession session, Destination destination) + { + // get the xaResource + try + { + _xaResource = session.getXAResource(); + } + catch (Exception e) + { + fail("cannot access the xa resource: " + e.getMessage()); + } + // create standard producer + try + { + _producer = session.createProducer(destination); + _producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("cannot create message producer: " + e.getMessage()); + } + // create standard consumer + try + { + _consumer = session.createConsumer(destination); + } + catch (JMSException e) + { + fail("cannot create message consumer: " + e.getMessage()); + } + // create a standard message + try + { + _message = session.createTextMessage(); + _message.setText("test XA"); + } + catch (JMSException e) + { + fail("cannot create standard message: " + e.getMessage()); + } + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java new file mode 100644 index 0000000000..a703432efb --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java @@ -0,0 +1,643 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.unit.xa; + +import javax.jms.*; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import javax.transaction.xa.XAException; + +import junit.framework.TestSuite; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QueueTest extends AbstractXATestCase +{ + /* this clas logger */ + private static final Logger _logger = LoggerFactory.getLogger(QueueTest.class); + + /** + * the queue use by all the tests + */ + private static Queue _queue = null; + /** + * the queue connection factory used by all tests + */ + private static XAQueueConnectionFactory _queueFactory = null; + /** + * standard queue connection + */ + private static XAQueueConnection _queueConnection = null; + + /** + * standard queue session created from the standard connection + */ + private static QueueSession _nonXASession = null; + + /** + * the queue name + */ + private static final String QUEUENAME = "xaQueue"; + + /** ----------------------------------------------------------------------------------- **/ + /** + * ----------------------------- JUnit support ----------------------------------------- * + */ + + /** + * Gets the test suite tests + * + * @return the test suite tests + */ + public static TestSuite getSuite() + { + return new TestSuite(QueueTest.class); + } + + /** + * Run the test suite. + * + * @param args Any command line arguments specified to this class. + */ + public static void main(String args[]) + { + junit.textui.TestRunner.run(getSuite()); + } + + public void tearDown() throws Exception + { + if (!isBroker08()) + { + try + { + _queueConnection.stop(); + _queueConnection.close(); + } + catch (Exception e) + { + fail("Exception thrown when cleaning standard connection: " + e.getStackTrace()); + } + } + super.tearDown(); + } + + /** + * Initialize standard actors + */ + public void init() + { + if (!isBroker08()) + { + // lookup test queue + try + { + _queue = (Queue) getInitialContext().lookup(QUEUENAME); + } + catch (Exception e) + { + fail("cannot lookup test queue " + e.getMessage()); + } + + // lookup connection factory + try + { + _queueFactory = getConnectionFactory(); + } + catch (Exception e) + { + fail("enable to lookup connection factory "); + } + // create standard connection + try + { + _queueConnection = getNewQueueXAConnection(); + } + catch (JMSException e) + { + fail("cannot create queue connection: " + e.getMessage()); + } + // create xa session + XAQueueSession session = null; + try + { + session = _queueConnection.createXAQueueSession(); + } + catch (JMSException e) + { + fail("cannot create queue session: " + e.getMessage()); + } + // create a standard session + try + { + _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); + } + catch (JMSException e) + { + fail("cannot create queue session: " + e.getMessage()); + } + init(session, _queue); + } + } + + /** -------------------------------------------------------------------------------------- **/ + /** ----------------------------- Test Suite -------------------------------------------- **/ + /** -------------------------------------------------------------------------------------- **/ + + /** + * Uses two transactions respectively with xid1 and xid2 that are used to send a message + * within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2. + * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1. + */ + public void testProducer() + { + if (!isBroker08()) + { + _logger.debug("running testProducer"); + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + // start the xaResource for xid1 + try + { + _xaResource.start(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + e.printStackTrace(); + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + try + { + // start the connection + _queueConnection.start(); + // produce a message with sequence number 1 + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send persistent message: " + e.getMessage()); + } + // suspend the transaction + try + { + _xaResource.end(xid1, XAResource.TMSUSPEND); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid1: " + e.getMessage()); + } + // start the xaResource for xid2 + try + { + _xaResource.start(xid2, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid2: " + e.getMessage()); + } + try + { + // produce a message + _message.setLongProperty(_sequenceNumberPropertyName, 2); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send second persistent message: " + e.getMessage()); + } + // end xid2 and start xid1 + try + { + _xaResource.end(xid2, XAResource.TMSUCCESS); + _xaResource.start(xid1, XAResource.TMRESUME); + } + catch (XAException e) + { + fail("Exception when ending and starting transactions: " + e.getMessage()); + } + // two phases commit transaction with xid2 + try + { + int resPrepare = _xaResource.prepare(xid2); + if (resPrepare != XAResource.XA_OK) + { + fail("prepare returned: " + resPrepare); + } + _xaResource.commit(xid2, false); + } + catch (XAException e) + { + fail("Exception thrown when preparing transaction with xid2: " + e.getMessage()); + } + // receive a message from queue test we expect it to be the second one + try + { + TextMessage message = (TextMessage) _consumer.receiveNoWait(); + if (message == null) + { + fail("did not receive second message as expected "); + } + else + { + if (message.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("receive wrong message its sequence number is: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + } + } + catch (JMSException e) + { + fail("Exception when receiving second message: " + e.getMessage()); + } + // end and one phase commit the first transaction + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + _xaResource.commit(xid1, true); + } + catch (XAException e) + { + fail("Exception thrown when commiting transaction with xid1"); + } + // We should now be able to receive the first message + try + { + Session nonXASession = _nonXASession; + MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); + TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 == null) + { + fail("did not receive first message as expected "); + } + else + { + if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("receive wrong message its sequence number is: " + message1 + .getLongProperty(_sequenceNumberPropertyName)); + } + } + // commit that transacted session + nonXASession.commit(); + // the queue should be now empty + message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 != null) + { + fail("receive an unexpected message "); + } + } + catch (JMSException e) + { + fail("Exception thrown when emptying the queue: " + e.getMessage()); + } + } + } + + /** + * strategy: Produce a message within Tx1 and prepare tx1. crash the server then commit tx1 and consume the message + */ + public void testSendAndRecover() + { + if (!isBroker08()) + { + _logger.debug("running testSendAndRecover"); + Xid xid1 = getNewXid(); + // start the xaResource for xid1 + try + { + _xaResource.start(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + try + { + // start the connection + _queueConnection.start(); + // produce a message with sequence number 1 + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send persistent message: " + e.getMessage()); + } + // suspend the transaction + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid1: " + e.getMessage()); + } + // prepare the transaction with xid1 + try + { + _xaResource.prepare(xid1); + } + catch (XAException e) + { + fail("Exception when preparing xid1: " + e.getMessage()); + } + + /////// stop the server now !! + try + { + _logger.debug("stopping broker"); + shutdownServer(); + } + catch (Exception e) + { + fail("Exception when stopping and restarting the server"); + } + + // get the list of in doubt transactions + try + { + Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); + if (inDoubt == null) + { + fail("the array of in doubt transactions should not be null "); + } + // At that point we expect only two indoubt transactions: + if (inDoubt.length != 1) + { + fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); + } + + // commit them + for (Xid anInDoubt : inDoubt) + { + if (anInDoubt.equals(xid1)) + { + System.out.println("commit xid1 "); + try + { + _xaResource.commit(anInDoubt, false); + } + catch (Exception e) + { + System.out.println("PB when aborted xid1"); + } + } + else + { + fail("did not receive right xid "); + } + } + } + catch (XAException e) + { + e.printStackTrace(); + fail("exception thrown when recovering transactions " + e.getMessage()); + } + // the queue should contain the first message! + try + { + Session nonXASession = _nonXASession; + MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); + _queueConnection.start(); + TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + + if (message1 == null) + { + fail("queue does not contain any message!"); + } + if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("Wrong message returned! Sequence number is " + message1 + .getLongProperty(_sequenceNumberPropertyName)); + } + nonXASession.commit(); + } + catch (JMSException e) + { + fail("Exception thrown when testin that queue test is not empty: " + e.getMessage()); + } + } + } + + /** + * strategy: Produce a message within Tx1 and prepare tx1. Produce a standard message and consume + * it within tx2 and prepare tx2. Shutdown the server and get the list of in doubt transactions: + * we expect tx1 and tx2! Then, Tx1 is aborted and tx2 is committed so we expect the test's queue to be empty! + */ + public void testRecover() + { + if (!isBroker08()) + { + _logger.debug("running testRecover"); + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + // start the xaResource for xid1 + try + { + _xaResource.start(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + try + { + // start the connection + _queueConnection.start(); + // produce a message with sequence number 1 + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send persistent message: " + e.getMessage()); + } + // suspend the transaction + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid1: " + e.getMessage()); + } + // prepare the transaction with xid1 + try + { + _xaResource.prepare(xid1); + } + catch (XAException e) + { + fail("Exception when preparing xid1: " + e.getMessage()); + } + + // send a message using the standard session + try + { + Session nonXASession = _nonXASession; + MessageProducer nonXAProducer = nonXASession.createProducer(_queue); + TextMessage message2 = nonXASession.createTextMessage(); + message2.setText("non XA "); + message2.setLongProperty(_sequenceNumberPropertyName, 2); + nonXAProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + nonXAProducer.send(message2); + // commit that transacted session + nonXASession.commit(); + } + catch (Exception e) + { + fail("Exception thrown when emptying the queue: " + e.getMessage()); + } + // start the xaResource for xid2 + try + { + _xaResource.start(xid2, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + // receive a message from queue test we expect it to be the second one + try + { + TextMessage message = (TextMessage) _consumer.receiveNoWait(); + if (message == null || message.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("did not receive second message as expected "); + } + } + catch (JMSException e) + { + fail("Exception when receiving second message: " + e.getMessage()); + } + // suspend the transaction + try + { + _xaResource.end(xid2, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid2: " + e.getMessage()); + } + // prepare the transaction with xid1 + try + { + _xaResource.prepare(xid2); + } + catch (XAException e) + { + fail("Exception when preparing xid2: " + e.getMessage()); + } + + /////// stop the server now !! + try + { + _logger.debug("stopping broker"); + shutdownServer(); + } + catch (Exception e) + { + fail("Exception when stopping and restarting the server"); + } + + // get the list of in doubt transactions + try + { + Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); + if (inDoubt == null) + { + fail("the array of in doubt transactions should not be null "); + } + // At that point we expect only two indoubt transactions: + if (inDoubt.length != 2) + { + fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); + } + + // commit them + for (Xid anInDoubt : inDoubt) + { + if (anInDoubt.equals(xid1)) + { + _logger.debug("rollback xid1 "); + try + { + _xaResource.rollback(anInDoubt); + } + catch (Exception e) + { + System.out.println("PB when aborted xid1"); + } + } + else if (anInDoubt.equals(xid2)) + { + _logger.debug("commit xid2 "); + try + { + _xaResource.commit(anInDoubt, false); + } + catch (Exception e) + { + System.out.println("PB when commiting xid2"); + } + } + } + } + catch (XAException e) + { + e.printStackTrace(); + fail("exception thrown when recovering transactions " + e.getMessage()); + } + // the queue should be empty + try + { + Session nonXASession = _nonXASession; + MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); + _queueConnection.start(); + TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 != null) + { + fail("The queue is not empty! "); + } + } + catch (JMSException e) + { + fail("Exception thrown when testin that queue test is empty: " + e.getMessage()); + } + } + } + + /** -------------------------------------------------------------------------------------- **/ + /** ----------------------------- Utility methods --------------------------------------- **/ + /** -------------------------------------------------------------------------------------- **/ + + /** + * get a new queue connection + * + * @return a new queue connection + * @throws JMSException If the JMS provider fails to create the queue connection + * due to some internal error or in case of authentication failure + */ + private XAQueueConnection getNewQueueXAConnection() throws JMSException + { + return _queueFactory.createXAQueueConnection("guest", "guest"); + } + + +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java b/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java deleted file mode 100644 index cd5b228f76..0000000000 --- a/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTests.java +++ /dev/null @@ -1,643 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.test.unit.xa; - -import javax.jms.*; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; -import javax.transaction.xa.XAException; - -import junit.framework.TestSuite; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class QueueTests extends AbstractXATest -{ - /* this clas logger */ - private static final Logger _logger = LoggerFactory.getLogger(QueueTests.class); - - /** - * the queue use by all the tests - */ - private static Queue _queue = null; - /** - * the queue connection factory used by all tests - */ - private static XAQueueConnectionFactory _queueFactory = null; - /** - * standard queue connection - */ - private static XAQueueConnection _queueConnection = null; - - /** - * standard queue session created from the standard connection - */ - private static QueueSession _nonXASession = null; - - /** - * the queue name - */ - private static final String QUEUENAME = "xaQueue"; - - /** ----------------------------------------------------------------------------------- **/ - /** - * ----------------------------- JUnit support ----------------------------------------- * - */ - - /** - * Gets the test suite tests - * - * @return the test suite tests - */ - public static TestSuite getSuite() - { - return new TestSuite(QueueTests.class); - } - - /** - * Run the test suite. - * - * @param args Any command line arguments specified to this class. - */ - public static void main(String args[]) - { - junit.textui.TestRunner.run(getSuite()); - } - - public void tearDown() throws Exception - { - if (!isBroker08()) - { - try - { - _queueConnection.stop(); - _queueConnection.close(); - } - catch (Exception e) - { - fail("Exception thrown when cleaning standard connection: " + e.getStackTrace()); - } - } - super.tearDown(); - } - - /** - * Initialize standard actors - */ - public void init() - { - if (!isBroker08()) - { - // lookup test queue - try - { - _queue = (Queue) getInitialContext().lookup(QUEUENAME); - } - catch (Exception e) - { - fail("cannot lookup test queue " + e.getMessage()); - } - - // lookup connection factory - try - { - _queueFactory = getConnectionFactory(); - } - catch (Exception e) - { - fail("enable to lookup connection factory "); - } - // create standard connection - try - { - _queueConnection = getNewQueueXAConnection(); - } - catch (JMSException e) - { - fail("cannot create queue connection: " + e.getMessage()); - } - // create xa session - XAQueueSession session = null; - try - { - session = _queueConnection.createXAQueueSession(); - } - catch (JMSException e) - { - fail("cannot create queue session: " + e.getMessage()); - } - // create a standard session - try - { - _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); - } - catch (JMSException e) - { - fail("cannot create queue session: " + e.getMessage()); - } - init(session, _queue); - } - } - - /** -------------------------------------------------------------------------------------- **/ - /** ----------------------------- Test Suite -------------------------------------------- **/ - /** -------------------------------------------------------------------------------------- **/ - - /** - * Uses two transactions respectively with xid1 and xid2 that are used to send a message - * within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2. - * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1. - */ - public void testProducer() - { - if (!isBroker08()) - { - _logger.debug("running testProducer"); - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - // start the xaResource for xid1 - try - { - _xaResource.start(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - e.printStackTrace(); - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - try - { - // start the connection - _queueConnection.start(); - // produce a message with sequence number 1 - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send persistent message: " + e.getMessage()); - } - // suspend the transaction - try - { - _xaResource.end(xid1, XAResource.TMSUSPEND); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid1: " + e.getMessage()); - } - // start the xaResource for xid2 - try - { - _xaResource.start(xid2, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid2: " + e.getMessage()); - } - try - { - // produce a message - _message.setLongProperty(_sequenceNumberPropertyName, 2); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send second persistent message: " + e.getMessage()); - } - // end xid2 and start xid1 - try - { - _xaResource.end(xid2, XAResource.TMSUCCESS); - _xaResource.start(xid1, XAResource.TMRESUME); - } - catch (XAException e) - { - fail("Exception when ending and starting transactions: " + e.getMessage()); - } - // two phases commit transaction with xid2 - try - { - int resPrepare = _xaResource.prepare(xid2); - if (resPrepare != XAResource.XA_OK) - { - fail("prepare returned: " + resPrepare); - } - _xaResource.commit(xid2, false); - } - catch (XAException e) - { - fail("Exception thrown when preparing transaction with xid2: " + e.getMessage()); - } - // receive a message from queue test we expect it to be the second one - try - { - TextMessage message = (TextMessage) _consumer.receiveNoWait(); - if (message == null) - { - fail("did not receive second message as expected "); - } - else - { - if (message.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("receive wrong message its sequence number is: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - } - } - catch (JMSException e) - { - fail("Exception when receiving second message: " + e.getMessage()); - } - // end and one phase commit the first transaction - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - _xaResource.commit(xid1, true); - } - catch (XAException e) - { - fail("Exception thrown when commiting transaction with xid1"); - } - // We should now be able to receive the first message - try - { - Session nonXASession = _nonXASession; - MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 == null) - { - fail("did not receive first message as expected "); - } - else - { - if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("receive wrong message its sequence number is: " + message1 - .getLongProperty(_sequenceNumberPropertyName)); - } - } - // commit that transacted session - nonXASession.commit(); - // the queue should be now empty - message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 != null) - { - fail("receive an unexpected message "); - } - } - catch (JMSException e) - { - fail("Exception thrown when emptying the queue: " + e.getMessage()); - } - } - } - - /** - * strategy: Produce a message within Tx1 and prepare tx1. crash the server then commit tx1 and consume the message - */ - public void testSendAndRecover() - { - if (!isBroker08()) - { - _logger.debug("running testSendAndRecover"); - Xid xid1 = getNewXid(); - // start the xaResource for xid1 - try - { - _xaResource.start(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - try - { - // start the connection - _queueConnection.start(); - // produce a message with sequence number 1 - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send persistent message: " + e.getMessage()); - } - // suspend the transaction - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid1: " + e.getMessage()); - } - // prepare the transaction with xid1 - try - { - _xaResource.prepare(xid1); - } - catch (XAException e) - { - fail("Exception when preparing xid1: " + e.getMessage()); - } - - /////// stop the server now !! - try - { - _logger.debug("stopping broker"); - shutdownServer(); - } - catch (Exception e) - { - fail("Exception when stopping and restarting the server"); - } - - // get the list of in doubt transactions - try - { - Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); - if (inDoubt == null) - { - fail("the array of in doubt transactions should not be null "); - } - // At that point we expect only two indoubt transactions: - if (inDoubt.length != 1) - { - fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); - } - - // commit them - for (Xid anInDoubt : inDoubt) - { - if (anInDoubt.equals(xid1)) - { - System.out.println("commit xid1 "); - try - { - _xaResource.commit(anInDoubt, false); - } - catch (Exception e) - { - System.out.println("PB when aborted xid1"); - } - } - else - { - fail("did not receive right xid "); - } - } - } - catch (XAException e) - { - e.printStackTrace(); - fail("exception thrown when recovering transactions " + e.getMessage()); - } - // the queue should contain the first message! - try - { - Session nonXASession = _nonXASession; - MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); - _queueConnection.start(); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - - if (message1 == null) - { - fail("queue does not contain any message!"); - } - if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("Wrong message returned! Sequence number is " + message1 - .getLongProperty(_sequenceNumberPropertyName)); - } - nonXASession.commit(); - } - catch (JMSException e) - { - fail("Exception thrown when testin that queue test is not empty: " + e.getMessage()); - } - } - } - - /** - * strategy: Produce a message within Tx1 and prepare tx1. Produce a standard message and consume - * it within tx2 and prepare tx2. Shutdown the server and get the list of in doubt transactions: - * we expect tx1 and tx2! Then, Tx1 is aborted and tx2 is committed so we expect the test's queue to be empty! - */ - public void testRecover() - { - if (!isBroker08()) - { - _logger.debug("running testRecover"); - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - // start the xaResource for xid1 - try - { - _xaResource.start(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - try - { - // start the connection - _queueConnection.start(); - // produce a message with sequence number 1 - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send persistent message: " + e.getMessage()); - } - // suspend the transaction - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid1: " + e.getMessage()); - } - // prepare the transaction with xid1 - try - { - _xaResource.prepare(xid1); - } - catch (XAException e) - { - fail("Exception when preparing xid1: " + e.getMessage()); - } - - // send a message using the standard session - try - { - Session nonXASession = _nonXASession; - MessageProducer nonXAProducer = nonXASession.createProducer(_queue); - TextMessage message2 = nonXASession.createTextMessage(); - message2.setText("non XA "); - message2.setLongProperty(_sequenceNumberPropertyName, 2); - nonXAProducer.setDeliveryMode(DeliveryMode.PERSISTENT); - nonXAProducer.send(message2); - // commit that transacted session - nonXASession.commit(); - } - catch (Exception e) - { - fail("Exception thrown when emptying the queue: " + e.getMessage()); - } - // start the xaResource for xid2 - try - { - _xaResource.start(xid2, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - // receive a message from queue test we expect it to be the second one - try - { - TextMessage message = (TextMessage) _consumer.receiveNoWait(); - if (message == null || message.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("did not receive second message as expected "); - } - } - catch (JMSException e) - { - fail("Exception when receiving second message: " + e.getMessage()); - } - // suspend the transaction - try - { - _xaResource.end(xid2, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid2: " + e.getMessage()); - } - // prepare the transaction with xid1 - try - { - _xaResource.prepare(xid2); - } - catch (XAException e) - { - fail("Exception when preparing xid2: " + e.getMessage()); - } - - /////// stop the server now !! - try - { - _logger.debug("stopping broker"); - shutdownServer(); - } - catch (Exception e) - { - fail("Exception when stopping and restarting the server"); - } - - // get the list of in doubt transactions - try - { - Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); - if (inDoubt == null) - { - fail("the array of in doubt transactions should not be null "); - } - // At that point we expect only two indoubt transactions: - if (inDoubt.length != 2) - { - fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); - } - - // commit them - for (Xid anInDoubt : inDoubt) - { - if (anInDoubt.equals(xid1)) - { - _logger.debug("rollback xid1 "); - try - { - _xaResource.rollback(anInDoubt); - } - catch (Exception e) - { - System.out.println("PB when aborted xid1"); - } - } - else if (anInDoubt.equals(xid2)) - { - _logger.debug("commit xid2 "); - try - { - _xaResource.commit(anInDoubt, false); - } - catch (Exception e) - { - System.out.println("PB when commiting xid2"); - } - } - } - } - catch (XAException e) - { - e.printStackTrace(); - fail("exception thrown when recovering transactions " + e.getMessage()); - } - // the queue should be empty - try - { - Session nonXASession = _nonXASession; - MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); - _queueConnection.start(); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 != null) - { - fail("The queue is not empty! "); - } - } - catch (JMSException e) - { - fail("Exception thrown when testin that queue test is empty: " + e.getMessage()); - } - } - } - - /** -------------------------------------------------------------------------------------- **/ - /** ----------------------------- Utility methods --------------------------------------- **/ - /** -------------------------------------------------------------------------------------- **/ - - /** - * get a new queue connection - * - * @return a new queue connection - * @throws JMSException If the JMS provider fails to create the queue connection - * due to some internal error or in case of authentication failure - */ - private XAQueueConnection getNewQueueXAConnection() throws JMSException - { - return _queueFactory.createXAQueueConnection("guest", "guest"); - } - - -} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java new file mode 100644 index 0000000000..5ea059b166 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java @@ -0,0 +1,1708 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.unit.xa; + +import javax.jms.*; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import javax.transaction.xa.XAException; + +import junit.framework.TestSuite; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + */ +public class TopicTest extends AbstractXATestCase +{ + /* this clas logger */ + private static final Logger _logger = LoggerFactory.getLogger(TopicTest.class); + + /** + * the topic use by all the tests + */ + private static Topic _topic = null; + + /** + * the topic connection factory used by all tests + */ + private static XATopicConnectionFactory _topicFactory = null; + + /** + * standard topic connection + */ + private static XATopicConnection _topicConnection = null; + + /** + * standard topic session created from the standard connection + */ + private static XATopicSession _session = null; + + private static TopicSession _nonXASession = null; + + /** + * the topic name + */ + private static final String TOPICNAME = "xaTopic"; + + /** + * Indicate that a listenere has failed + */ + private static boolean _failure = false; + + /** -------------------------------------------------------------------------------------- **/ + /** ----------------------------- JUnit support ----------------------------------------- **/ + /** -------------------------------------------------------------------------------------- **/ + + /** + * Gets the test suite tests + * + * @return the test suite tests + */ + public static TestSuite getSuite() + { + return new TestSuite(TopicTest.class); + } + + /** + * Run the test suite. + * + * @param args Any command line arguments specified to this class. + */ + public static void main(String args[]) + { + junit.textui.TestRunner.run(getSuite()); + } + + public void tearDown() throws Exception + { + if (!isBroker08()) + { + try + { + _topicConnection.stop(); + _topicConnection.close(); + } + catch (Exception e) + { + fail("Exception thrown when cleaning standard connection: " + e.getStackTrace()); + } + } + super.tearDown(); + } + + /** + * Initialize standard actors + */ + public void init() + { + if (!isBroker08()) + { + // lookup test queue + try + { + _topic = (Topic) getInitialContext().lookup(TOPICNAME); + } + catch (Exception e) + { + fail("cannot lookup test topic " + e.getMessage()); + } + // lookup connection factory + try + { + _topicFactory = getConnectionFactory(); + } + catch (Exception e) + { + fail("enable to lookup connection factory "); + } + // create standard connection + try + { + _topicConnection = getNewTopicXAConnection(); + } + catch (JMSException e) + { + fail("cannot create queue connection: " + e.getMessage()); + } + // create standard session + try + { + _session = _topicConnection.createXATopicSession(); + } + catch (JMSException e) + { + fail("cannot create queue session: " + e.getMessage()); + } + // create a standard session + try + { + _nonXASession = _topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use Options | File Templates. + } + init(_session, _topic); + } + } + + /** -------------------------------------------------------------------------------------- **/ + /** ----------------------------- Test Suite -------------------------------------------- **/ + /** -------------------------------------------------------------------------------------- **/ + + + /** + * Uses two transactions respectively with xid1 and xid2 that are use to send a message + * within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2. + * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1. + */ + public void testProducer() + { + if (!isBroker08()) + { + _logger.debug("testProducer"); + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + try + { + Session nonXASession = _nonXASession; + MessageConsumer nonXAConsumer = nonXASession.createConsumer(_topic); + _producer.setDeliveryMode(DeliveryMode.PERSISTENT); + // start the xaResource for xid1 + try + { + _logger.debug("starting tx branch xid1"); + _xaResource.start(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + e.printStackTrace(); + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + try + { + // start the connection + _topicConnection.start(); + _logger.debug("produce a message with sequence number 1"); + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send persistent message: " + e.getMessage()); + } + _logger.debug("suspend the transaction branch xid1"); + try + { + _xaResource.end(xid1, XAResource.TMSUSPEND); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid1: " + e.getMessage()); + } + _logger.debug("start the xaResource for xid2"); + try + { + _xaResource.start(xid2, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid2: " + e.getMessage()); + } + try + { + _logger.debug("produce a message"); + _message.setLongProperty(_sequenceNumberPropertyName, 2); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send second persistent message: " + e.getMessage()); + } + _logger.debug("end xid2 and start xid1"); + try + { + _xaResource.end(xid2, XAResource.TMSUCCESS); + _xaResource.start(xid1, XAResource.TMRESUME); + } + catch (XAException e) + { + fail("Exception when ending and starting transactions: " + e.getMessage()); + } + _logger.debug("two phases commit transaction with xid2"); + try + { + int resPrepare = _xaResource.prepare(xid2); + if (resPrepare != XAResource.XA_OK) + { + fail("prepare returned: " + resPrepare); + } + _xaResource.commit(xid2, false); + } + catch (XAException e) + { + fail("Exception thrown when preparing transaction with xid2: " + e.getMessage()); + } + _logger.debug("receiving a message from topic test we expect it to be the second one"); + try + { + TextMessage message = (TextMessage) _consumer.receiveNoWait(); + if (message == null) + { + fail("did not receive second message as expected "); + } + else + { + if (message.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("receive wrong message its sequence number is: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + } + } + catch (JMSException e) + { + fail("Exception when receiving second message: " + e.getMessage()); + } + _logger.debug("end and one phase commit the first transaction"); + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + _xaResource.commit(xid1, true); + } + catch (XAException e) + { + fail("Exception thrown when commiting transaction with xid1"); + } + _logger.debug("We should now be able to receive the first and second message"); + try + { + TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 == null) + { + fail("did not receive first message as expected "); + } + else + { + if (message1.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("receive wrong message its sequence number is: " + message1 + .getLongProperty(_sequenceNumberPropertyName)); + } + } + message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 == null) + { + fail("did not receive first message as expected "); + } + else + { + if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("receive wrong message its sequence number is: " + message1 + .getLongProperty(_sequenceNumberPropertyName)); + } + } + _logger.debug("commit transacted session"); + nonXASession.commit(); + _logger.debug("Test that the topic is now empty"); + message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 != null) + { + fail("receive an unexpected message "); + } + } + catch (JMSException e) + { + fail("Exception thrown when emptying the queue: " + e.getMessage()); + } + } + catch (JMSException e) + { + fail("cannot create standard consumer: " + e.getMessage()); + } + } + } + + + /** + * strategy: Produce a message within Tx1 and commit tx1. consume this message within tx2 and abort tx2. + * Consume the same message within tx3 and commit it. Check that no more message is available. + */ + public void testDurSub() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + Xid xid3 = getNewXid(); + Xid xid4 = getNewXid(); + String durSubName = "xaSubDurable"; + try + { + TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + try + { + _topicConnection.start(); + _logger.debug("start xid1"); + _xaResource.start(xid1, XAResource.TMSUCCESS); + // start the connection + _topicConnection.start(); + _logger.debug("produce a message with sequence number 1"); + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + _logger.debug("2 phases commit xid1"); + _xaResource.end(xid1, XAResource.TMSUCCESS); + if (_xaResource.prepare(xid1) != XAResource.XA_OK) + { + fail("Problem when preparing tx1 "); + } + _xaResource.commit(xid1, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid1: " + e.getMessage()); + } + try + { + _logger.debug("start xid2"); + _xaResource.start(xid2, XAResource.TMSUCCESS); + _logger.debug("receive the previously produced message"); + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + _logger.debug("rollback xid2"); + boolean rollbackOnFailure = false; + try + { + _xaResource.end(xid2, XAResource.TMFAIL); + } + catch (XAException e) + { + if (e.errorCode != XAException.XA_RBROLLBACK) + { + fail("Exception when working with xid2: " + e.getMessage()); + } + rollbackOnFailure = true; + } + if (!rollbackOnFailure) + { + if (_xaResource.prepare(xid2) != XAResource.XA_OK) + { + fail("Problem when preparing tx2 "); + } + _xaResource.rollback(xid2); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid2: " + e.getMessage()); + } + try + { + _logger.debug("start xid3"); + _xaResource.start(xid3, XAResource.TMSUCCESS); + _logger.debug(" receive the previously aborted consumed message"); + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + _logger.debug("commit xid3"); + _xaResource.end(xid3, XAResource.TMSUCCESS); + if (_xaResource.prepare(xid3) != XAResource.XA_OK) + { + fail("Problem when preparing tx3 "); + } + _xaResource.commit(xid3, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid3: " + e.getMessage()); + } + try + { + _logger.debug("start xid4"); + _xaResource.start(xid4, XAResource.TMSUCCESS); + _logger.debug("check that topic is empty"); + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message != null) + { + fail("An unexpected message was received "); + } + _logger.debug("commit xid4"); + _xaResource.end(xid4, XAResource.TMSUCCESS); + _xaResource.commit(xid4, true); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid4: " + e.getMessage()); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("problem when creating dur sub: " + e.getMessage()); + } + finally + { + try + { + _session.unsubscribe(durSubName); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("problem when unsubscribing dur sub: " + e.getMessage()); + } + } + } + } + + /** + * strategy: create a XA durable subscriber dusSub, produce 7 messages with the standard session, + * consume 2 messages respectively with tx1, tx2 and tx3 + * abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7! + * commit tx3 + * abort tx1: we now expect that only messages 5 and 6 are definitly consumed! + * start tx4 and consume messages 1 - 4 and 7 + * commit tx4 + * Now the topic should be empty! + */ + public void testMultiMessagesDurSub() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + Xid xid3 = getNewXid(); + Xid xid4 = getNewXid(); + Xid xid6 = getNewXid(); + String durSubName = "xaSubDurable"; + TextMessage message; + try + { + TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + try + { + Session txSession = _nonXASession; + MessageProducer txProducer = txSession.createProducer(_topic); + _logger.debug("produce 10 persistent messages"); + txProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + _topicConnection.start(); + for (int i = 1; i <= 7; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + txProducer.send(_message); + } + // commit txSession + txSession.commit(); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("Exception thrown when producing messages: " + e.getMessage()); + } + + try + { + _logger.debug(" consume 2 messages respectively with tx1, tx2 and tx3"); + //----- start xid1 + _xaResource.start(xid1, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 1; i <= 2; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid1, XAResource.TMSUSPEND); + //----- start xid2 + _xaResource.start(xid2, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 3; i <= 4; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid2, XAResource.TMSUSPEND); + //----- start xid3 + _xaResource.start(xid3, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 5; i <= 6; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid3, XAResource.TMSUCCESS); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown when consumming 6 first messages: " + e.getMessage()); + } + try + { + _logger.debug("abort tx2, we now expect to receive messages 3, 4 and 7"); + _xaResource.start(xid2, XAResource.TMRESUME); + _xaResource.end(xid2, XAResource.TMSUCCESS); + _xaResource.prepare(xid2); + _xaResource.rollback(xid2); + // receive 3 message within tx1: 3, 4 and 7 + _xaResource.start(xid1, XAResource.TMRESUME); + _logger.debug(" 3, 4 and 7"); + for (int i = 1; i <= 3; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + 3); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) <= 2 || 5 == message + .getLongProperty(_sequenceNumberPropertyName) || message + .getLongProperty(_sequenceNumberPropertyName) == 6) + { + fail("wrong sequence number: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage()); + } + + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + _logger.debug(" commit tx3"); + _xaResource.commit(xid3, true); + _logger.debug("abort tx1"); + _xaResource.prepare(xid1); + _xaResource.rollback(xid1); + } + catch (XAException e) + { + e.printStackTrace(); + fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage()); + } + + try + { + // consume messages 1 - 4 + 7 + //----- start xid1 + _xaResource.start(xid4, XAResource.TMSUCCESS); + for (int i = 1; i <= 5; i++) + { + + message = (TextMessage) xaDurSub.receiveNoWait(); + _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName)); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) == 5 || message + .getLongProperty(_sequenceNumberPropertyName) == 6) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid4, XAResource.TMSUCCESS); + _xaResource.prepare(xid4); + _xaResource.commit(xid4, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown in last phase: " + e.getMessage()); + } + // now the topic should be empty!! + try + { + // start xid6 + _xaResource.start(xid6, XAResource.TMSUCCESS); + // should now be empty + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message != null) + { + fail("An unexpected message was received " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + // commit xid6 + _xaResource.end(xid6, XAResource.TMSUCCESS); + _xaResource.commit(xid6, true); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid6: " + e.getMessage()); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("problem when creating dur sub: " + e.getMessage()); + } + finally + { + try + { + _session.unsubscribe(durSubName); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("problem when unsubscribing dur sub: " + e.getMessage()); + } + } + } + } + + /** + * strategy: create a XA durable subscriber dusSub, produce 10 messages with the standard session, + * consume 2 messages respectively with tx1, tx2 and tx3 + * prepare xid2 and xid3 + * crash the server + * Redo the job for xid1 that has been aborted by server crash + * abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7! + * commit tx3 + * abort tx1: we now expect that only messages 5 and 6 are definitly consumed! + * start tx4 and consume messages 1 - 4 + * start tx5 and consume messages 7 - 10 + * abort tx4 + * consume messages 1-4 with tx5 + * commit tx5 + * Now the topic should be empty! + */ + public void testMultiMessagesDurSubCrash() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + Xid xid3 = getNewXid(); + Xid xid4 = getNewXid(); + Xid xid5 = getNewXid(); + Xid xid6 = getNewXid(); + String durSubName = "xaSubDurable"; + TextMessage message; + try + { + TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + try + { + Session txSession = _nonXASession; + MessageProducer txProducer = txSession.createProducer(_topic); + // produce 10 persistent messages + txProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + _topicConnection.start(); + for (int i = 1; i <= 10; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + txProducer.send(_message); + } + // commit txSession + txSession.commit(); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("Exception thrown when producing messages: " + e.getMessage()); + } + try + { + // consume 2 messages respectively with tx1, tx2 and tx3 + //----- start xid1 + _xaResource.start(xid1, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 1; i <= 2; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid1, XAResource.TMSUCCESS); + //----- start xid2 + _xaResource.start(xid2, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 3; i <= 4; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid2, XAResource.TMSUCCESS); + //----- start xid3 + _xaResource.start(xid3, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 5; i <= 6; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid3, XAResource.TMSUCCESS); + // prepare tx2 and tx3 + + _xaResource.prepare(xid2); + _xaResource.prepare(xid3); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown when consumming 6 first messages: " + e.getMessage()); + } + /////// stop the broker now !! + try + { + shutdownServer(); + } + catch (Exception e) + { + fail("Exception when stopping and restarting the server"); + } + // get the list of in doubt transactions + try + { + _topicConnection.start(); + // reconnect to dursub! + xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); + if (inDoubt == null) + { + fail("the array of in doubt transactions should not be null "); + } + // At that point we expect only two indoubt transactions: + if (inDoubt.length != 2) + { + fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions"); + } + } + catch (XAException e) + { + e.printStackTrace(); + fail("exception thrown when recovering transactions " + e.getMessage()); + } + try + { + // xid1 has been aborted redo the job! + // consume 2 messages with tx1 + //----- start xid1 + _xaResource.start(xid1, XAResource.TMSUCCESS); + // receive the 2 first messages + for (int i = 1; i <= 2; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid1, XAResource.TMSUSPEND); + // abort tx2, we now expect to receive messages 3 and 4 first! + _xaResource.rollback(xid2); + + // receive 3 message within tx1: 3, 4 and 7 + _xaResource.start(xid1, XAResource.TMRESUME); + // receive messages 3, 4 and 7 + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + 3); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) + { + fail("wrong sequence number: " + message + .getLongProperty(_sequenceNumberPropertyName) + " 3 was expected"); + } + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + 4); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 4) + { + fail("wrong sequence number: " + message + .getLongProperty(_sequenceNumberPropertyName) + " 4 was expected"); + } + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + 7); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 7) + { + fail("wrong sequence number: " + message + .getLongProperty(_sequenceNumberPropertyName) + " 7 was expected"); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage()); + } + + try + { + _xaResource.end(xid1, XAResource.TMSUSPEND); + // commit tx3 + _xaResource.commit(xid3, false); + // abort tx1 + _xaResource.prepare(xid1); + _xaResource.rollback(xid1); + } + catch (XAException e) + { + e.printStackTrace(); + fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage()); + } + + try + { + // consume messages 1 - 4 + //----- start xid1 + _xaResource.start(xid4, XAResource.TMSUCCESS); + for (int i = 1; i <= 4; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid4, XAResource.TMSUSPEND); + // consume messages 8 - 10 + _xaResource.start(xid5, XAResource.TMSUCCESS); + for (int i = 7; i <= 10; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid5, XAResource.TMSUSPEND); + // abort tx4 + _xaResource.prepare(xid4); + _xaResource.rollback(xid4); + // consume messages 1-4 with tx5 + _xaResource.start(xid5, XAResource.TMRESUME); + for (int i = 1; i <= 4; i++) + { + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received! expected: " + i); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + } + _xaResource.end(xid5, XAResource.TMSUSPEND); + // commit tx5 + _xaResource.prepare(xid5); + _xaResource.commit(xid5, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown in last phase: " + e.getMessage()); + } + // now the topic should be empty!! + try + { + // start xid6 + _xaResource.start(xid6, XAResource.TMSUCCESS); + // should now be empty + message = (TextMessage) xaDurSub.receiveNoWait(); + if (message != null) + { + fail("An unexpected message was received " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + // commit xid6 + _xaResource.end(xid6, XAResource.TMSUSPEND); + _xaResource.commit(xid6, true); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid6: " + e.getMessage()); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("problem when creating dur sub: " + e.getMessage()); + } + finally + { + try + { + _session.unsubscribe(durSubName); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("problem when unsubscribing dur sub: " + e.getMessage()); + } + } + } + } + + + /** + * strategy: Produce a message within Tx1 and commit tx1. a durable subscriber then receives that message within tx2 + * that is then prepared. + * Shutdown the server and get the list of in doubt transactions: + * we expect tx2, Tx2 is aborted and the message consumed within tx3 that is committed we then check that the topic is empty. + */ + public void testDurSubCrash() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + Xid xid3 = getNewXid(); + Xid xid4 = getNewXid(); + String durSubName = "xaSubDurable"; + try + { + TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + try + { + _topicConnection.start(); + //----- start xid1 + _xaResource.start(xid1, XAResource.TMSUCCESS); + // start the connection + _topicConnection.start(); + // produce a message with sequence number 1 + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + // commit + _xaResource.end(xid1, XAResource.TMSUSPEND); + if (_xaResource.prepare(xid1) != XAResource.XA_OK) + { + fail("Problem when preparing tx1 "); + } + _xaResource.commit(xid1, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid1: " + e.getMessage()); + } + try + { + // start xid2 + _xaResource.start(xid2, XAResource.TMSUCCESS); + // receive the previously produced message + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + // prepare xid2 + _xaResource.end(xid2, XAResource.TMSUSPEND); + if (_xaResource.prepare(xid2) != XAResource.XA_OK) + { + fail("Problem when preparing tx2 "); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid2: " + e.getMessage()); + } + + /////// stop the server now !! + try + { + shutdownServer(); + } + catch (Exception e) + { + fail("Exception when stopping and restarting the server"); + } + + // get the list of in doubt transactions + try + { + _topicConnection.start(); + // reconnect to dursub! + xaDurSub = _session.createDurableSubscriber(_topic, durSubName); + Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); + if (inDoubt == null) + { + fail("the array of in doubt transactions should not be null "); + } + // At that point we expect only two indoubt transactions: + if (inDoubt.length != 1) + { + fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions"); + } + + // commit them + for (Xid anInDoubt : inDoubt) + { + if (anInDoubt.equals(xid2)) + { + System.out.println("aborting xid2 "); + try + { + _xaResource.rollback(anInDoubt); + } + catch (Exception e) + { + e.printStackTrace(); + fail("exception when aborting xid2 "); + } + } + else + { + System.out.println("XID2 is not in doubt "); + } + } + } + catch (XAException e) + { + e.printStackTrace(); + fail("exception thrown when recovering transactions " + e.getMessage()); + } + + try + { + // start xid3 + _xaResource.start(xid3, XAResource.TMSUCCESS); + // receive the previously produced message and aborted + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + // commit xid3 + _xaResource.end(xid3, XAResource.TMSUSPEND); + if (_xaResource.prepare(xid3) != XAResource.XA_OK) + { + fail("Problem when preparing tx3 "); + } + _xaResource.commit(xid3, false); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid3: " + e.getMessage()); + } + try + { + // start xid4 + _xaResource.start(xid4, XAResource.TMSUCCESS); + // should now be empty + TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + if (message != null) + { + fail("An unexpected message was received " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + // commit xid4 + _xaResource.end(xid4, XAResource.TMSUSPEND); + _xaResource.commit(xid4, true); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception when working with xid4: " + e.getMessage()); + } + } + catch (Exception e) + { + e.printStackTrace(); + fail("problem when creating dur sub: " + e.getMessage()); + } + finally + { + try + { + _session.unsubscribe(durSubName); + } + catch (JMSException e) + { + e.printStackTrace(); + fail("problem when unsubscribing dur sub: " + e.getMessage()); + } + } + } + } + + /** + * strategy: Produce a message within Tx1 and prepare tx1. Shutdown the server and get the list of indoubt transactions: + * we expect tx1, Tx1 is committed so we expect the test topic not to be empty! + */ + public void testRecover() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + String durSubName = "test1"; + TopicSession nonXASession1; + try + { + // create a dummy durable subscriber to be sure that messages are persisted! + nonXASession1 = _nonXASession; + nonXASession1.createDurableSubscriber(_topic, durSubName); + // start the xaResource for xid1 + try + { + _xaResource.start(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("cannot start the transaction with xid1: " + e.getMessage()); + } + try + { + // start the connection + _topicConnection.start(); + // produce a message with sequence number 1 + _message.setLongProperty(_sequenceNumberPropertyName, 1); + _producer.send(_message); + } + catch (JMSException e) + { + fail(" cannot send persistent message: " + e.getMessage()); + } + // suspend the transaction + try + { + _xaResource.end(xid1, XAResource.TMSUCCESS); + } + catch (XAException e) + { + fail("Cannot end the transaction with xid1: " + e.getMessage()); + } + // prepare the transaction with xid1 + try + { + _xaResource.prepare(xid1); + } + catch (XAException e) + { + fail("Exception when preparing xid1: " + e.getMessage()); + } + + /////// stop the server now !! + try + { + shutdownServer(); + } + catch (Exception e) + { + fail("Exception when stopping and restarting the server"); + } + + try + { + MessageConsumer nonXAConsumer = nonXASession1.createDurableSubscriber(_topic, durSubName); + _topicConnection.start(); + // get the list of in doubt transactions + try + { + Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); + if (inDoubt == null) + { + fail("the array of in doubt transactions should not be null "); + } + // At that point we expect only two indoubt transactions: + if (inDoubt.length != 1) + { + fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); + } + // commit them + for (Xid anInDoubt : inDoubt) + { + if (anInDoubt.equals(xid1)) + { + _logger.debug("committing xid1 "); + try + { + _xaResource.commit(anInDoubt, false); + } + catch (Exception e) + { + _logger.debug("PB when aborted xid1"); + e.printStackTrace(); + fail("exception when committing xid1 "); + } + } + else + { + _logger.debug("XID1 is not in doubt "); + } + } + } + catch (XAException e) + { + e.printStackTrace(); + fail("exception thrown when recovering transactions " + e.getMessage()); + } + _logger.debug("the topic should not be empty"); + TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + if (message1 == null) + { + fail("The topic is empty! "); + } + } + catch (JMSException e) + { + fail("Exception thrown when testin that queue test is empty: " + e.getMessage()); + } + } + catch (JMSException e) + { + fail("cannot create dummy durable subscriber: " + e.getMessage()); + } + finally + { + try + { + // unsubscribe the dummy durable subscriber + TopicSession nonXASession = _nonXASession; + nonXASession.unsubscribe(durSubName); + } + catch (JMSException e) + { + fail("cannot unsubscribe durable subscriber: " + e.getMessage()); + } + } + } + } + + /** + * strategy: + * create a standard durable subscriber + * produce 3 messages + * consume the first message with that durable subscriber + * close the standard session that deactivates the durable subscriber + * migrate the durable subscriber to an xa one + * consume the second message with that xa durable subscriber + * close the xa session that deactivates the durable subscriber + * reconnect to the durable subscriber with a standard session + * consume the two remaining messages and check that the topic is empty! + */ + public void testMigrateDurableSubscriber() + { + if (!isBroker08()) + { + Xid xid1 = getNewXid(); + Xid xid2 = getNewXid(); + String durSubName = "DurableSubscriberMigrate"; + try + { + Session stSession = _nonXASession; + MessageProducer producer = stSession.createProducer(_topic); + _logger.debug("Create a standard durable subscriber!"); + TopicSubscriber durSub = stSession.createDurableSubscriber(_topic, durSubName); + TopicSubscriber durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second"); + TextMessage message; + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + _topicConnection.start(); + _logger.debug("produce 3 messages"); + for (int i = 1; i <= 3; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + //producer.send( _message ); + producer.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); + stSession.commit(); + } + _logger.debug("consume the first message with that durable subscriber"); + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) + { + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + // commit the standard session + stSession.commit(); + _logger.debug("first message consumed "); + // close the session that deactivates the durable subscriber + stSession.close(); + _logger.debug("migrate the durable subscriber to an xa one"); + _xaResource.start(xid1, XAResource.TMSUCCESS); + durSub = _session.createDurableSubscriber(_topic, durSubName); + _logger.debug(" consume the second message with that xa durable subscriber and abort it"); + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("wrong sequence number, 2 expected, received: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + _xaResource.end(xid1, XAResource.TMSUCCESS); + _xaResource.prepare(xid1); + _xaResource.rollback(xid1); + _logger.debug("close the session that deactivates the durable subscriber"); + _session.close(); + _logger.debug("create a new standard session"); + stSession = _topicConnection.createTopicSession(true, 1); + _logger.debug("reconnect to the durable subscriber"); + durSub = stSession.createDurableSubscriber(_topic, durSubName); + durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second"); + _logger.debug("Reconnected to durablse subscribers"); + _logger.debug(" consume the 2 remaining messages"); + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) + { + fail("wrong sequence number, 2 expected, received: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + // consume the third message with that xa durable subscriber + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + fail("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) + { + fail("wrong sequence number, 3 expected, received: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + stSession.commit(); + _logger.debug("the topic should be empty now"); + message = (TextMessage) durSub.receiveNoWait(); + if (message != null) + { + fail("Received unexpected message "); + } + stSession.commit(); + _logger.debug(" use dursub1 to receive all the 3 messages"); + for (int i = 1; i <= 3; i++) + { + message = (TextMessage) durSub1.receiveNoWait(); + if (message == null) + { + _logger.debug("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + fail("wrong sequence number, " + i + " expected, received: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + } + stSession.commit(); + // send a non persistent message to check that all persistent messages are deleted + producer = stSession.createProducer(_topic); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(_message); + stSession.commit(); + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + fail("message not received "); + } + message = (TextMessage) durSub1.receiveNoWait(); + if (message == null) + { + fail("message not received "); + } + stSession.commit(); + stSession.close(); + _logger.debug(" now create a standard non transacted session and reconnect to the durable xubscriber"); + TopicConnection stConnection = + _topicConnection; //_topicFactory.createTopicConnection("guest", "guest"); + TopicSession autoAclSession = stConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicPublisher publisher = autoAclSession.createPublisher(_topic); + durSub = autoAclSession.createDurableSubscriber(_topic, durSubName); + stConnection.start(); + // produce 3 persistent messages + for (int i = 1; i <= 3; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + //producer.send( _message ); + publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); + } + _logger.debug(" use dursub to receive all the 3 messages"); + for (int i = 1; i <= 3; i++) + { + message = (TextMessage) durSub.receiveNoWait(); + if (message == null) + { + System.out.println("no message received "); + } + else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + { + System.out.println("wrong sequence number, " + i + " expected, received: " + message + .getLongProperty(_sequenceNumberPropertyName)); + } + } + + _logger.debug("now set a message listener"); + AtomicBoolean lock = new AtomicBoolean(true); + reset(); + stConnection.stop(); + durSub.setMessageListener(new TopicListener(1, 3, lock)); + _logger.debug(" produce 3 persistent messages"); + for (int i = 1; i <= 3; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + //producer.send( _message ); + publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); + } + // start the connection + stConnection.start(); + while (lock.get()) + { + synchronized (lock) + { + lock.wait(); + } + } + if (getFailureStatus()) + { + fail("problem with message listener"); + } + stConnection.stop(); + durSub.setMessageListener(null); + _logger.debug(" do the same with an xa session"); + // produce 3 persistent messages + for (int i = 1; i <= 3; i++) + { + _message.setLongProperty(_sequenceNumberPropertyName, i); + //producer.send( _message ); + publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); + } + //stConnection.close(); + + _logger.debug(" migrate the durable subscriber to an xa one"); + _session = _topicConnection.createXATopicSession(); + _xaResource = _session.getXAResource(); + _xaResource.start(xid2, XAResource.TMSUCCESS); + durSub = _session.createDurableSubscriber(_topic, durSubName); + lock = new AtomicBoolean(); + reset(); + _topicConnection.stop(); + durSub.setMessageListener(new TopicListener(1, 3, lock)); + // start the connection + _topicConnection.start(); + while (lock.get()) + { + synchronized (lock) + { + lock.wait(); + } + } + if (getFailureStatus()) + { + fail("problem with XA message listener"); + } + _xaResource.end(xid2, XAResource.TMSUCCESS); + _xaResource.commit(xid2, true); + } + catch (Exception e) + { + e.printStackTrace(); + fail("Exception thrown: " + e.getMessage()); + } + finally + { + try + { + _topicConnection.createXASession().unsubscribe(durSubName); + _topicConnection.createXASession().unsubscribe(durSubName + "_second"); + } + catch (JMSException e) + { + fail("Exception thrown when unsubscribing durable subscriber " + e.getMessage()); + } + } + } + } + + /** -------------------------------------------------------------------------------------- **/ + /** ----------------------------- Utility methods --------------------------------------- **/ + /** -------------------------------------------------------------------------------------- **/ + + /** + * get a new queue connection + * + * @return a new queue connection + * @throws javax.jms.JMSException If the JMS provider fails to create the queue connection + * due to some internal error or in case of authentication failure + */ + private XATopicConnection getNewTopicXAConnection() throws JMSException + { + return _topicFactory.createXATopicConnection("guest", "guest"); + } + + public static void failure() + { + _failure = true; + } + + public static void reset() + { + _failure = false; + } + + public static boolean getFailureStatus() + { + return _failure; + } + + private class TopicListener implements MessageListener + { + private long _counter; + private long _end; + private final AtomicBoolean _lock; + + public TopicListener(long init, long end, AtomicBoolean lock) + { + _counter = init; + _end = end; + _lock = lock; + } + + public void onMessage(Message message) + { + long seq = 0; + try + { + seq = message.getLongProperty(TopicTest._sequenceNumberPropertyName); + } + catch (JMSException e) + { + e.printStackTrace(); + TopicTest.failure(); + _lock.set(false); + synchronized (_lock) + { + _lock.notifyAll(); + } + } + if (seq != _counter) + { + System.out.println("received message " + seq + " expected " + _counter); + TopicTest.failure(); + _lock.set(false); + synchronized (_lock) + { + _lock.notifyAll(); + } + } + _counter++; + if (_counter > _end) + { + _lock.set(false); + synchronized (_lock) + { + _lock.notifyAll(); + } + } + } + } + +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java b/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java deleted file mode 100644 index 30b3b09449..0000000000 --- a/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTests.java +++ /dev/null @@ -1,1708 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.test.unit.xa; - -import javax.jms.*; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; -import javax.transaction.xa.XAException; - -import junit.framework.TestSuite; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * - */ -public class TopicTests extends AbstractXATest -{ - /* this clas logger */ - private static final Logger _logger = LoggerFactory.getLogger(TopicTests.class); - - /** - * the topic use by all the tests - */ - private static Topic _topic = null; - - /** - * the topic connection factory used by all tests - */ - private static XATopicConnectionFactory _topicFactory = null; - - /** - * standard topic connection - */ - private static XATopicConnection _topicConnection = null; - - /** - * standard topic session created from the standard connection - */ - private static XATopicSession _session = null; - - private static TopicSession _nonXASession = null; - - /** - * the topic name - */ - private static final String TOPICNAME = "xaTopic"; - - /** - * Indicate that a listenere has failed - */ - private static boolean _failure = false; - - /** -------------------------------------------------------------------------------------- **/ - /** ----------------------------- JUnit support ----------------------------------------- **/ - /** -------------------------------------------------------------------------------------- **/ - - /** - * Gets the test suite tests - * - * @return the test suite tests - */ - public static TestSuite getSuite() - { - return new TestSuite(TopicTests.class); - } - - /** - * Run the test suite. - * - * @param args Any command line arguments specified to this class. - */ - public static void main(String args[]) - { - junit.textui.TestRunner.run(getSuite()); - } - - public void tearDown() throws Exception - { - if (!isBroker08()) - { - try - { - _topicConnection.stop(); - _topicConnection.close(); - } - catch (Exception e) - { - fail("Exception thrown when cleaning standard connection: " + e.getStackTrace()); - } - } - super.tearDown(); - } - - /** - * Initialize standard actors - */ - public void init() - { - if (!isBroker08()) - { - // lookup test queue - try - { - _topic = (Topic) getInitialContext().lookup(TOPICNAME); - } - catch (Exception e) - { - fail("cannot lookup test topic " + e.getMessage()); - } - // lookup connection factory - try - { - _topicFactory = getConnectionFactory(); - } - catch (Exception e) - { - fail("enable to lookup connection factory "); - } - // create standard connection - try - { - _topicConnection = getNewTopicXAConnection(); - } - catch (JMSException e) - { - fail("cannot create queue connection: " + e.getMessage()); - } - // create standard session - try - { - _session = _topicConnection.createXATopicSession(); - } - catch (JMSException e) - { - fail("cannot create queue session: " + e.getMessage()); - } - // create a standard session - try - { - _nonXASession = _topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); - } - catch (JMSException e) - { - e.printStackTrace(); //To change body of catch statement use Options | File Templates. - } - init(_session, _topic); - } - } - - /** -------------------------------------------------------------------------------------- **/ - /** ----------------------------- Test Suite -------------------------------------------- **/ - /** -------------------------------------------------------------------------------------- **/ - - - /** - * Uses two transactions respectively with xid1 and xid2 that are use to send a message - * within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2. - * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1. - */ - public void testProducer() - { - if (!isBroker08()) - { - _logger.debug("testProducer"); - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - try - { - Session nonXASession = _nonXASession; - MessageConsumer nonXAConsumer = nonXASession.createConsumer(_topic); - _producer.setDeliveryMode(DeliveryMode.PERSISTENT); - // start the xaResource for xid1 - try - { - _logger.debug("starting tx branch xid1"); - _xaResource.start(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - e.printStackTrace(); - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - try - { - // start the connection - _topicConnection.start(); - _logger.debug("produce a message with sequence number 1"); - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send persistent message: " + e.getMessage()); - } - _logger.debug("suspend the transaction branch xid1"); - try - { - _xaResource.end(xid1, XAResource.TMSUSPEND); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid1: " + e.getMessage()); - } - _logger.debug("start the xaResource for xid2"); - try - { - _xaResource.start(xid2, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid2: " + e.getMessage()); - } - try - { - _logger.debug("produce a message"); - _message.setLongProperty(_sequenceNumberPropertyName, 2); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send second persistent message: " + e.getMessage()); - } - _logger.debug("end xid2 and start xid1"); - try - { - _xaResource.end(xid2, XAResource.TMSUCCESS); - _xaResource.start(xid1, XAResource.TMRESUME); - } - catch (XAException e) - { - fail("Exception when ending and starting transactions: " + e.getMessage()); - } - _logger.debug("two phases commit transaction with xid2"); - try - { - int resPrepare = _xaResource.prepare(xid2); - if (resPrepare != XAResource.XA_OK) - { - fail("prepare returned: " + resPrepare); - } - _xaResource.commit(xid2, false); - } - catch (XAException e) - { - fail("Exception thrown when preparing transaction with xid2: " + e.getMessage()); - } - _logger.debug("receiving a message from topic test we expect it to be the second one"); - try - { - TextMessage message = (TextMessage) _consumer.receiveNoWait(); - if (message == null) - { - fail("did not receive second message as expected "); - } - else - { - if (message.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("receive wrong message its sequence number is: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - } - } - catch (JMSException e) - { - fail("Exception when receiving second message: " + e.getMessage()); - } - _logger.debug("end and one phase commit the first transaction"); - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - _xaResource.commit(xid1, true); - } - catch (XAException e) - { - fail("Exception thrown when commiting transaction with xid1"); - } - _logger.debug("We should now be able to receive the first and second message"); - try - { - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 == null) - { - fail("did not receive first message as expected "); - } - else - { - if (message1.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("receive wrong message its sequence number is: " + message1 - .getLongProperty(_sequenceNumberPropertyName)); - } - } - message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 == null) - { - fail("did not receive first message as expected "); - } - else - { - if (message1.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("receive wrong message its sequence number is: " + message1 - .getLongProperty(_sequenceNumberPropertyName)); - } - } - _logger.debug("commit transacted session"); - nonXASession.commit(); - _logger.debug("Test that the topic is now empty"); - message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 != null) - { - fail("receive an unexpected message "); - } - } - catch (JMSException e) - { - fail("Exception thrown when emptying the queue: " + e.getMessage()); - } - } - catch (JMSException e) - { - fail("cannot create standard consumer: " + e.getMessage()); - } - } - } - - - /** - * strategy: Produce a message within Tx1 and commit tx1. consume this message within tx2 and abort tx2. - * Consume the same message within tx3 and commit it. Check that no more message is available. - */ - public void testDurSub() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - Xid xid3 = getNewXid(); - Xid xid4 = getNewXid(); - String durSubName = "xaSubDurable"; - try - { - TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - try - { - _topicConnection.start(); - _logger.debug("start xid1"); - _xaResource.start(xid1, XAResource.TMSUCCESS); - // start the connection - _topicConnection.start(); - _logger.debug("produce a message with sequence number 1"); - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - _logger.debug("2 phases commit xid1"); - _xaResource.end(xid1, XAResource.TMSUCCESS); - if (_xaResource.prepare(xid1) != XAResource.XA_OK) - { - fail("Problem when preparing tx1 "); - } - _xaResource.commit(xid1, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid1: " + e.getMessage()); - } - try - { - _logger.debug("start xid2"); - _xaResource.start(xid2, XAResource.TMSUCCESS); - _logger.debug("receive the previously produced message"); - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - _logger.debug("rollback xid2"); - boolean rollbackOnFailure = false; - try - { - _xaResource.end(xid2, XAResource.TMFAIL); - } - catch (XAException e) - { - if (e.errorCode != XAException.XA_RBROLLBACK) - { - fail("Exception when working with xid2: " + e.getMessage()); - } - rollbackOnFailure = true; - } - if (!rollbackOnFailure) - { - if (_xaResource.prepare(xid2) != XAResource.XA_OK) - { - fail("Problem when preparing tx2 "); - } - _xaResource.rollback(xid2); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid2: " + e.getMessage()); - } - try - { - _logger.debug("start xid3"); - _xaResource.start(xid3, XAResource.TMSUCCESS); - _logger.debug(" receive the previously aborted consumed message"); - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - _logger.debug("commit xid3"); - _xaResource.end(xid3, XAResource.TMSUCCESS); - if (_xaResource.prepare(xid3) != XAResource.XA_OK) - { - fail("Problem when preparing tx3 "); - } - _xaResource.commit(xid3, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid3: " + e.getMessage()); - } - try - { - _logger.debug("start xid4"); - _xaResource.start(xid4, XAResource.TMSUCCESS); - _logger.debug("check that topic is empty"); - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message != null) - { - fail("An unexpected message was received "); - } - _logger.debug("commit xid4"); - _xaResource.end(xid4, XAResource.TMSUCCESS); - _xaResource.commit(xid4, true); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid4: " + e.getMessage()); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("problem when creating dur sub: " + e.getMessage()); - } - finally - { - try - { - _session.unsubscribe(durSubName); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("problem when unsubscribing dur sub: " + e.getMessage()); - } - } - } - } - - /** - * strategy: create a XA durable subscriber dusSub, produce 7 messages with the standard session, - * consume 2 messages respectively with tx1, tx2 and tx3 - * abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7! - * commit tx3 - * abort tx1: we now expect that only messages 5 and 6 are definitly consumed! - * start tx4 and consume messages 1 - 4 and 7 - * commit tx4 - * Now the topic should be empty! - */ - public void testMultiMessagesDurSub() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - Xid xid3 = getNewXid(); - Xid xid4 = getNewXid(); - Xid xid6 = getNewXid(); - String durSubName = "xaSubDurable"; - TextMessage message; - try - { - TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - try - { - Session txSession = _nonXASession; - MessageProducer txProducer = txSession.createProducer(_topic); - _logger.debug("produce 10 persistent messages"); - txProducer.setDeliveryMode(DeliveryMode.PERSISTENT); - _topicConnection.start(); - for (int i = 1; i <= 7; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - txProducer.send(_message); - } - // commit txSession - txSession.commit(); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("Exception thrown when producing messages: " + e.getMessage()); - } - - try - { - _logger.debug(" consume 2 messages respectively with tx1, tx2 and tx3"); - //----- start xid1 - _xaResource.start(xid1, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 1; i <= 2; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid1, XAResource.TMSUSPEND); - //----- start xid2 - _xaResource.start(xid2, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 3; i <= 4; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid2, XAResource.TMSUSPEND); - //----- start xid3 - _xaResource.start(xid3, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 5; i <= 6; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid3, XAResource.TMSUCCESS); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown when consumming 6 first messages: " + e.getMessage()); - } - try - { - _logger.debug("abort tx2, we now expect to receive messages 3, 4 and 7"); - _xaResource.start(xid2, XAResource.TMRESUME); - _xaResource.end(xid2, XAResource.TMSUCCESS); - _xaResource.prepare(xid2); - _xaResource.rollback(xid2); - // receive 3 message within tx1: 3, 4 and 7 - _xaResource.start(xid1, XAResource.TMRESUME); - _logger.debug(" 3, 4 and 7"); - for (int i = 1; i <= 3; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + 3); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) <= 2 || 5 == message - .getLongProperty(_sequenceNumberPropertyName) || message - .getLongProperty(_sequenceNumberPropertyName) == 6) - { - fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage()); - } - - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - _logger.debug(" commit tx3"); - _xaResource.commit(xid3, true); - _logger.debug("abort tx1"); - _xaResource.prepare(xid1); - _xaResource.rollback(xid1); - } - catch (XAException e) - { - e.printStackTrace(); - fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage()); - } - - try - { - // consume messages 1 - 4 + 7 - //----- start xid1 - _xaResource.start(xid4, XAResource.TMSUCCESS); - for (int i = 1; i <= 5; i++) - { - - message = (TextMessage) xaDurSub.receiveNoWait(); - _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName)); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) == 5 || message - .getLongProperty(_sequenceNumberPropertyName) == 6) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid4, XAResource.TMSUCCESS); - _xaResource.prepare(xid4); - _xaResource.commit(xid4, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown in last phase: " + e.getMessage()); - } - // now the topic should be empty!! - try - { - // start xid6 - _xaResource.start(xid6, XAResource.TMSUCCESS); - // should now be empty - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message != null) - { - fail("An unexpected message was received " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - // commit xid6 - _xaResource.end(xid6, XAResource.TMSUCCESS); - _xaResource.commit(xid6, true); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid6: " + e.getMessage()); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("problem when creating dur sub: " + e.getMessage()); - } - finally - { - try - { - _session.unsubscribe(durSubName); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("problem when unsubscribing dur sub: " + e.getMessage()); - } - } - } - } - - /** - * strategy: create a XA durable subscriber dusSub, produce 10 messages with the standard session, - * consume 2 messages respectively with tx1, tx2 and tx3 - * prepare xid2 and xid3 - * crash the server - * Redo the job for xid1 that has been aborted by server crash - * abort tx2, we now expect to receive messages 3 and 4 first! Receive 3 messages within tx1 i.e. 34 and 7! - * commit tx3 - * abort tx1: we now expect that only messages 5 and 6 are definitly consumed! - * start tx4 and consume messages 1 - 4 - * start tx5 and consume messages 7 - 10 - * abort tx4 - * consume messages 1-4 with tx5 - * commit tx5 - * Now the topic should be empty! - */ - public void testMultiMessagesDurSubCrash() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - Xid xid3 = getNewXid(); - Xid xid4 = getNewXid(); - Xid xid5 = getNewXid(); - Xid xid6 = getNewXid(); - String durSubName = "xaSubDurable"; - TextMessage message; - try - { - TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - try - { - Session txSession = _nonXASession; - MessageProducer txProducer = txSession.createProducer(_topic); - // produce 10 persistent messages - txProducer.setDeliveryMode(DeliveryMode.PERSISTENT); - _topicConnection.start(); - for (int i = 1; i <= 10; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - txProducer.send(_message); - } - // commit txSession - txSession.commit(); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("Exception thrown when producing messages: " + e.getMessage()); - } - try - { - // consume 2 messages respectively with tx1, tx2 and tx3 - //----- start xid1 - _xaResource.start(xid1, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 1; i <= 2; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid1, XAResource.TMSUCCESS); - //----- start xid2 - _xaResource.start(xid2, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 3; i <= 4; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid2, XAResource.TMSUCCESS); - //----- start xid3 - _xaResource.start(xid3, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 5; i <= 6; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid3, XAResource.TMSUCCESS); - // prepare tx2 and tx3 - - _xaResource.prepare(xid2); - _xaResource.prepare(xid3); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown when consumming 6 first messages: " + e.getMessage()); - } - /////// stop the broker now !! - try - { - shutdownServer(); - } - catch (Exception e) - { - fail("Exception when stopping and restarting the server"); - } - // get the list of in doubt transactions - try - { - _topicConnection.start(); - // reconnect to dursub! - xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); - if (inDoubt == null) - { - fail("the array of in doubt transactions should not be null "); - } - // At that point we expect only two indoubt transactions: - if (inDoubt.length != 2) - { - fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions"); - } - } - catch (XAException e) - { - e.printStackTrace(); - fail("exception thrown when recovering transactions " + e.getMessage()); - } - try - { - // xid1 has been aborted redo the job! - // consume 2 messages with tx1 - //----- start xid1 - _xaResource.start(xid1, XAResource.TMSUCCESS); - // receive the 2 first messages - for (int i = 1; i <= 2; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid1, XAResource.TMSUSPEND); - // abort tx2, we now expect to receive messages 3 and 4 first! - _xaResource.rollback(xid2); - - // receive 3 message within tx1: 3, 4 and 7 - _xaResource.start(xid1, XAResource.TMRESUME); - // receive messages 3, 4 and 7 - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + 3); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) - { - fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 3 was expected"); - } - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + 4); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 4) - { - fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 4 was expected"); - } - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + 7); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 7) - { - fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 7 was expected"); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage()); - } - - try - { - _xaResource.end(xid1, XAResource.TMSUSPEND); - // commit tx3 - _xaResource.commit(xid3, false); - // abort tx1 - _xaResource.prepare(xid1); - _xaResource.rollback(xid1); - } - catch (XAException e) - { - e.printStackTrace(); - fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage()); - } - - try - { - // consume messages 1 - 4 - //----- start xid1 - _xaResource.start(xid4, XAResource.TMSUCCESS); - for (int i = 1; i <= 4; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid4, XAResource.TMSUSPEND); - // consume messages 8 - 10 - _xaResource.start(xid5, XAResource.TMSUCCESS); - for (int i = 7; i <= 10; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid5, XAResource.TMSUSPEND); - // abort tx4 - _xaResource.prepare(xid4); - _xaResource.rollback(xid4); - // consume messages 1-4 with tx5 - _xaResource.start(xid5, XAResource.TMRESUME); - for (int i = 1; i <= 4; i++) - { - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received! expected: " + i); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - } - _xaResource.end(xid5, XAResource.TMSUSPEND); - // commit tx5 - _xaResource.prepare(xid5); - _xaResource.commit(xid5, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown in last phase: " + e.getMessage()); - } - // now the topic should be empty!! - try - { - // start xid6 - _xaResource.start(xid6, XAResource.TMSUCCESS); - // should now be empty - message = (TextMessage) xaDurSub.receiveNoWait(); - if (message != null) - { - fail("An unexpected message was received " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - // commit xid6 - _xaResource.end(xid6, XAResource.TMSUSPEND); - _xaResource.commit(xid6, true); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid6: " + e.getMessage()); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("problem when creating dur sub: " + e.getMessage()); - } - finally - { - try - { - _session.unsubscribe(durSubName); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("problem when unsubscribing dur sub: " + e.getMessage()); - } - } - } - } - - - /** - * strategy: Produce a message within Tx1 and commit tx1. a durable subscriber then receives that message within tx2 - * that is then prepared. - * Shutdown the server and get the list of in doubt transactions: - * we expect tx2, Tx2 is aborted and the message consumed within tx3 that is committed we then check that the topic is empty. - */ - public void testDurSubCrash() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - Xid xid3 = getNewXid(); - Xid xid4 = getNewXid(); - String durSubName = "xaSubDurable"; - try - { - TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - try - { - _topicConnection.start(); - //----- start xid1 - _xaResource.start(xid1, XAResource.TMSUCCESS); - // start the connection - _topicConnection.start(); - // produce a message with sequence number 1 - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - // commit - _xaResource.end(xid1, XAResource.TMSUSPEND); - if (_xaResource.prepare(xid1) != XAResource.XA_OK) - { - fail("Problem when preparing tx1 "); - } - _xaResource.commit(xid1, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid1: " + e.getMessage()); - } - try - { - // start xid2 - _xaResource.start(xid2, XAResource.TMSUCCESS); - // receive the previously produced message - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - // prepare xid2 - _xaResource.end(xid2, XAResource.TMSUSPEND); - if (_xaResource.prepare(xid2) != XAResource.XA_OK) - { - fail("Problem when preparing tx2 "); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid2: " + e.getMessage()); - } - - /////// stop the server now !! - try - { - shutdownServer(); - } - catch (Exception e) - { - fail("Exception when stopping and restarting the server"); - } - - // get the list of in doubt transactions - try - { - _topicConnection.start(); - // reconnect to dursub! - xaDurSub = _session.createDurableSubscriber(_topic, durSubName); - Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); - if (inDoubt == null) - { - fail("the array of in doubt transactions should not be null "); - } - // At that point we expect only two indoubt transactions: - if (inDoubt.length != 1) - { - fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions"); - } - - // commit them - for (Xid anInDoubt : inDoubt) - { - if (anInDoubt.equals(xid2)) - { - System.out.println("aborting xid2 "); - try - { - _xaResource.rollback(anInDoubt); - } - catch (Exception e) - { - e.printStackTrace(); - fail("exception when aborting xid2 "); - } - } - else - { - System.out.println("XID2 is not in doubt "); - } - } - } - catch (XAException e) - { - e.printStackTrace(); - fail("exception thrown when recovering transactions " + e.getMessage()); - } - - try - { - // start xid3 - _xaResource.start(xid3, XAResource.TMSUCCESS); - // receive the previously produced message and aborted - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - // commit xid3 - _xaResource.end(xid3, XAResource.TMSUSPEND); - if (_xaResource.prepare(xid3) != XAResource.XA_OK) - { - fail("Problem when preparing tx3 "); - } - _xaResource.commit(xid3, false); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid3: " + e.getMessage()); - } - try - { - // start xid4 - _xaResource.start(xid4, XAResource.TMSUCCESS); - // should now be empty - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); - if (message != null) - { - fail("An unexpected message was received " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - // commit xid4 - _xaResource.end(xid4, XAResource.TMSUSPEND); - _xaResource.commit(xid4, true); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception when working with xid4: " + e.getMessage()); - } - } - catch (Exception e) - { - e.printStackTrace(); - fail("problem when creating dur sub: " + e.getMessage()); - } - finally - { - try - { - _session.unsubscribe(durSubName); - } - catch (JMSException e) - { - e.printStackTrace(); - fail("problem when unsubscribing dur sub: " + e.getMessage()); - } - } - } - } - - /** - * strategy: Produce a message within Tx1 and prepare tx1. Shutdown the server and get the list of indoubt transactions: - * we expect tx1, Tx1 is committed so we expect the test topic not to be empty! - */ - public void testRecover() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - String durSubName = "test1"; - TopicSession nonXASession1; - try - { - // create a dummy durable subscriber to be sure that messages are persisted! - nonXASession1 = _nonXASession; - nonXASession1.createDurableSubscriber(_topic, durSubName); - // start the xaResource for xid1 - try - { - _xaResource.start(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("cannot start the transaction with xid1: " + e.getMessage()); - } - try - { - // start the connection - _topicConnection.start(); - // produce a message with sequence number 1 - _message.setLongProperty(_sequenceNumberPropertyName, 1); - _producer.send(_message); - } - catch (JMSException e) - { - fail(" cannot send persistent message: " + e.getMessage()); - } - // suspend the transaction - try - { - _xaResource.end(xid1, XAResource.TMSUCCESS); - } - catch (XAException e) - { - fail("Cannot end the transaction with xid1: " + e.getMessage()); - } - // prepare the transaction with xid1 - try - { - _xaResource.prepare(xid1); - } - catch (XAException e) - { - fail("Exception when preparing xid1: " + e.getMessage()); - } - - /////// stop the server now !! - try - { - shutdownServer(); - } - catch (Exception e) - { - fail("Exception when stopping and restarting the server"); - } - - try - { - MessageConsumer nonXAConsumer = nonXASession1.createDurableSubscriber(_topic, durSubName); - _topicConnection.start(); - // get the list of in doubt transactions - try - { - Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN); - if (inDoubt == null) - { - fail("the array of in doubt transactions should not be null "); - } - // At that point we expect only two indoubt transactions: - if (inDoubt.length != 1) - { - fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions"); - } - // commit them - for (Xid anInDoubt : inDoubt) - { - if (anInDoubt.equals(xid1)) - { - _logger.debug("committing xid1 "); - try - { - _xaResource.commit(anInDoubt, false); - } - catch (Exception e) - { - _logger.debug("PB when aborted xid1"); - e.printStackTrace(); - fail("exception when committing xid1 "); - } - } - else - { - _logger.debug("XID1 is not in doubt "); - } - } - } - catch (XAException e) - { - e.printStackTrace(); - fail("exception thrown when recovering transactions " + e.getMessage()); - } - _logger.debug("the topic should not be empty"); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); - if (message1 == null) - { - fail("The topic is empty! "); - } - } - catch (JMSException e) - { - fail("Exception thrown when testin that queue test is empty: " + e.getMessage()); - } - } - catch (JMSException e) - { - fail("cannot create dummy durable subscriber: " + e.getMessage()); - } - finally - { - try - { - // unsubscribe the dummy durable subscriber - TopicSession nonXASession = _nonXASession; - nonXASession.unsubscribe(durSubName); - } - catch (JMSException e) - { - fail("cannot unsubscribe durable subscriber: " + e.getMessage()); - } - } - } - } - - /** - * strategy: - * create a standard durable subscriber - * produce 3 messages - * consume the first message with that durable subscriber - * close the standard session that deactivates the durable subscriber - * migrate the durable subscriber to an xa one - * consume the second message with that xa durable subscriber - * close the xa session that deactivates the durable subscriber - * reconnect to the durable subscriber with a standard session - * consume the two remaining messages and check that the topic is empty! - */ - public void testMigrateDurableSubscriber() - { - if (!isBroker08()) - { - Xid xid1 = getNewXid(); - Xid xid2 = getNewXid(); - String durSubName = "DurableSubscriberMigrate"; - try - { - Session stSession = _nonXASession; - MessageProducer producer = stSession.createProducer(_topic); - _logger.debug("Create a standard durable subscriber!"); - TopicSubscriber durSub = stSession.createDurableSubscriber(_topic, durSubName); - TopicSubscriber durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second"); - TextMessage message; - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - _topicConnection.start(); - _logger.debug("produce 3 messages"); - for (int i = 1; i <= 3; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - //producer.send( _message ); - producer.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); - stSession.commit(); - } - _logger.debug("consume the first message with that durable subscriber"); - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 1) - { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); - } - // commit the standard session - stSession.commit(); - _logger.debug("first message consumed "); - // close the session that deactivates the durable subscriber - stSession.close(); - _logger.debug("migrate the durable subscriber to an xa one"); - _xaResource.start(xid1, XAResource.TMSUCCESS); - durSub = _session.createDurableSubscriber(_topic, durSubName); - _logger.debug(" consume the second message with that xa durable subscriber and abort it"); - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("wrong sequence number, 2 expected, received: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - _xaResource.end(xid1, XAResource.TMSUCCESS); - _xaResource.prepare(xid1); - _xaResource.rollback(xid1); - _logger.debug("close the session that deactivates the durable subscriber"); - _session.close(); - _logger.debug("create a new standard session"); - stSession = _topicConnection.createTopicSession(true, 1); - _logger.debug("reconnect to the durable subscriber"); - durSub = stSession.createDurableSubscriber(_topic, durSubName); - durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second"); - _logger.debug("Reconnected to durablse subscribers"); - _logger.debug(" consume the 2 remaining messages"); - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) - { - fail("wrong sequence number, 2 expected, received: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - // consume the third message with that xa durable subscriber - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - fail("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) - { - fail("wrong sequence number, 3 expected, received: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - stSession.commit(); - _logger.debug("the topic should be empty now"); - message = (TextMessage) durSub.receiveNoWait(); - if (message != null) - { - fail("Received unexpected message "); - } - stSession.commit(); - _logger.debug(" use dursub1 to receive all the 3 messages"); - for (int i = 1; i <= 3; i++) - { - message = (TextMessage) durSub1.receiveNoWait(); - if (message == null) - { - _logger.debug("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - fail("wrong sequence number, " + i + " expected, received: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - } - stSession.commit(); - // send a non persistent message to check that all persistent messages are deleted - producer = stSession.createProducer(_topic); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.send(_message); - stSession.commit(); - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - fail("message not received "); - } - message = (TextMessage) durSub1.receiveNoWait(); - if (message == null) - { - fail("message not received "); - } - stSession.commit(); - stSession.close(); - _logger.debug(" now create a standard non transacted session and reconnect to the durable xubscriber"); - TopicConnection stConnection = - _topicConnection; //_topicFactory.createTopicConnection("guest", "guest"); - TopicSession autoAclSession = stConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicPublisher publisher = autoAclSession.createPublisher(_topic); - durSub = autoAclSession.createDurableSubscriber(_topic, durSubName); - stConnection.start(); - // produce 3 persistent messages - for (int i = 1; i <= 3; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - //producer.send( _message ); - publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); - } - _logger.debug(" use dursub to receive all the 3 messages"); - for (int i = 1; i <= 3; i++) - { - message = (TextMessage) durSub.receiveNoWait(); - if (message == null) - { - System.out.println("no message received "); - } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) - { - System.out.println("wrong sequence number, " + i + " expected, received: " + message - .getLongProperty(_sequenceNumberPropertyName)); - } - } - - _logger.debug("now set a message listener"); - AtomicBoolean lock = new AtomicBoolean(true); - reset(); - stConnection.stop(); - durSub.setMessageListener(new TopicListener(1, 3, lock)); - _logger.debug(" produce 3 persistent messages"); - for (int i = 1; i <= 3; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - //producer.send( _message ); - publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); - } - // start the connection - stConnection.start(); - while (lock.get()) - { - synchronized (lock) - { - lock.wait(); - } - } - if (getFailureStatus()) - { - fail("problem with message listener"); - } - stConnection.stop(); - durSub.setMessageListener(null); - _logger.debug(" do the same with an xa session"); - // produce 3 persistent messages - for (int i = 1; i <= 3; i++) - { - _message.setLongProperty(_sequenceNumberPropertyName, i); - //producer.send( _message ); - publisher.send(_message, DeliveryMode.PERSISTENT, 9 - i, 0); - } - //stConnection.close(); - - _logger.debug(" migrate the durable subscriber to an xa one"); - _session = _topicConnection.createXATopicSession(); - _xaResource = _session.getXAResource(); - _xaResource.start(xid2, XAResource.TMSUCCESS); - durSub = _session.createDurableSubscriber(_topic, durSubName); - lock = new AtomicBoolean(); - reset(); - _topicConnection.stop(); - durSub.setMessageListener(new TopicListener(1, 3, lock)); - // start the connection - _topicConnection.start(); - while (lock.get()) - { - synchronized (lock) - { - lock.wait(); - } - } - if (getFailureStatus()) - { - fail("problem with XA message listener"); - } - _xaResource.end(xid2, XAResource.TMSUCCESS); - _xaResource.commit(xid2, true); - } - catch (Exception e) - { - e.printStackTrace(); - fail("Exception thrown: " + e.getMessage()); - } - finally - { - try - { - _topicConnection.createXASession().unsubscribe(durSubName); - _topicConnection.createXASession().unsubscribe(durSubName + "_second"); - } - catch (JMSException e) - { - fail("Exception thrown when unsubscribing durable subscriber " + e.getMessage()); - } - } - } - } - - /** -------------------------------------------------------------------------------------- **/ - /** ----------------------------- Utility methods --------------------------------------- **/ - /** -------------------------------------------------------------------------------------- **/ - - /** - * get a new queue connection - * - * @return a new queue connection - * @throws javax.jms.JMSException If the JMS provider fails to create the queue connection - * due to some internal error or in case of authentication failure - */ - private XATopicConnection getNewTopicXAConnection() throws JMSException - { - return _topicFactory.createXATopicConnection("guest", "guest"); - } - - public static void failure() - { - _failure = true; - } - - public static void reset() - { - _failure = false; - } - - public static boolean getFailureStatus() - { - return _failure; - } - - private class TopicListener implements MessageListener - { - private long _counter; - private long _end; - private final AtomicBoolean _lock; - - public TopicListener(long init, long end, AtomicBoolean lock) - { - _counter = init; - _end = end; - _lock = lock; - } - - public void onMessage(Message message) - { - long seq = 0; - try - { - seq = message.getLongProperty(TopicTests._sequenceNumberPropertyName); - } - catch (JMSException e) - { - e.printStackTrace(); - TopicTests.failure(); - _lock.set(false); - synchronized (_lock) - { - _lock.notifyAll(); - } - } - if (seq != _counter) - { - System.out.println("received message " + seq + " expected " + _counter); - TopicTests.failure(); - _lock.set(false); - synchronized (_lock) - { - _lock.notifyAll(); - } - } - _counter++; - if (_counter > _end) - { - _lock.set(false); - synchronized (_lock) - { - _lock.notifyAll(); - } - } - } - } - -} diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java index e7c09fca65..9ab8379855 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java @@ -59,16 +59,27 @@ public class QpidTestCase extends TestCase private InitialContext _initialContext; private AMQConnectionFactory _connectionFactory; - protected void setUp() throws Exception + public void runBare() throws Throwable { - super.setUp(); + String name = getClass().getSimpleName() + "." + getName(); + _logger.info("========== start " + name + " =========="); startBroker(); - } - - protected void tearDown() throws Exception - { - stopBroker(); - super.tearDown(); + try + { + super.runBare(); + } + finally + { + try + { + stopBroker(); + } + catch (Exception e) + { + _logger.error("exception stopping broker", e); + } + _logger.info("========== stop " + name + " =========="); + } } public void startBroker() throws Exception @@ -102,7 +113,8 @@ public class QpidTestCase extends TestCase } catch (IOException e) { - _logger.info("redirector", e); + // this seems to happen regularly even when + // exits are normal } } }.start(); -- cgit v1.2.1