diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-02-23 22:15:29 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-02-23 22:15:29 +0000 |
| commit | afeb5065ec226e39bb0b6855db63952d9a1ba89c (patch) | |
| tree | 2341895495043707530f839a0be87774f804553e /qpid/java/systests/src | |
| parent | 1b4a67ccc4b501b2b7872881609ee9f0d8c2e3eb (diff) | |
| download | qpid-python-afeb5065ec226e39bb0b6855db63952d9a1ba89c.tar.gz | |
AMQP-24 : [Java Broker] Implement distributed transactions for AMQP 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1292984 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
5 files changed, 73 insertions, 26 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java index f337908134..c8ffe8571d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java @@ -43,7 +43,7 @@ public class XAResourceTest extends QpidBrokerTestCase public void testIsSameRMSingleCF() throws Exception { XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME); - XAConnection conn = factory.createXAConnection(); + XAConnection conn = factory.createXAConnection("guest","guest"); XASession session = conn.createXASession(); XAResource xaResource1 = session.getXAResource(); XAResource xaResource2 = session.getXAResource(); @@ -68,9 +68,9 @@ public class XAResourceTest extends QpidBrokerTestCase XAConnectionFactory factory2 = new AMQConnectionFactory(url); XAConnectionFactory factory3 = getConnectionFactory(ALT_FACTORY_NAME); - XAConnection conn = factory.createXAConnection(); - XAConnection conn2 = factory2.createXAConnection(); - XAConnection conn3 = factory3.createXAConnection(); + XAConnection conn = factory.createXAConnection("guest","guest"); + XAConnection conn2 = factory2.createXAConnection("guest","guest"); + XAConnection conn3 = factory3.createXAConnection("guest","guest"); XASession session = conn.createXASession(); XASession session2 = conn2.createXASession(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 54abb76b6d..8ffc09930e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -313,6 +313,17 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore _underlying.abortTran(); doPostDelay("abortTran"); } + + public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException + { + _underlying.removeXid(format, globalId, branchId); + } + + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + throws AMQStoreException + { + _underlying.recordXid(format, globalId, branchId, enqueues, dequeues); + } } public void updateQueue(AMQQueue queue) throws AMQStoreException diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java index 86ba5c2cb7..e940a73bbb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java @@ -118,7 +118,7 @@ public class FaultTest extends AbstractXATestCase _queueFactory = getConnectionFactory(); _xaqueueConnection = _queueFactory.createXAQueueConnection("guest", "guest"); XAQueueSession session = _xaqueueConnection.createXAQueueSession(); - _queueConnection = _queueFactory.createQueueConnection(); + _queueConnection = _queueFactory.createQueueConnection("guest","guest"); _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); init(session, _queue); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java index eb19504dec..3fbe76323a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/QueueTest.java @@ -162,11 +162,12 @@ public class QueueTest extends AbstractXATestCase // create a standard session try { - _queueConnection = _queueFactory.createQueueConnection(); + _queueConnection = _queueFactory.createQueueConnection("guest", "guest"); _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { + e.printStackTrace(); fail("cannot create queue session: " + e.getMessage()); } init(session, _queue); @@ -638,7 +639,8 @@ public class QueueTest extends AbstractXATestCase TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000); if (message1 != null) { - fail("The queue is not empty! "); + + fail("The queue is not empty! " + message1.getLongProperty(_sequenceNumberPropertyName)); } } catch (JMSException e) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java index 3a6d573205..d955979ad6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java @@ -18,6 +18,7 @@ package org.apache.qpid.test.unit.xa; import junit.framework.TestSuite; +import org.apache.qpid.configuration.ClientProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,8 @@ import javax.jms.*; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -103,7 +106,7 @@ public class TopicTest extends AbstractXATestCase } catch (Exception e) { - fail("Exception thrown when cleaning standard connection: " + e.getStackTrace()); + fail("Exception thrown when cleaning standard connection: " + e); } } super.tearDown(); @@ -116,6 +119,7 @@ public class TopicTest extends AbstractXATestCase { if (!isBroker08()) { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1"); // lookup test queue try { @@ -650,7 +654,12 @@ public class TopicTest extends AbstractXATestCase { message = (TextMessage) xaDurSub.receive(1000); - _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName)); + + if(message != null) + { + _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName)); + } + if (message == null) { fail("no message received! expected: " + i); @@ -882,35 +891,40 @@ public class TopicTest extends AbstractXATestCase // receive 3 message within tx1: 3, 4 and 7 _xaResource.start(xid1, XAResource.TMRESUME); // receive messages 3, 4 and 7 + Set<Long> expected = new HashSet<Long>(); + expected.add(3L); + expected.add(4L); + expected.add(7L); message = (TextMessage) xaDurSub.receive(1000); if (message == null) { - fail("no message received! expected: " + 3); + fail("no message received! expected one of: " + expected); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) + else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) { fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 3 was expected"); + .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected); } message = (TextMessage) xaDurSub.receive(1000); if (message == null) { - fail("no message received! expected: " + 4); + fail("no message received! expected one of: " + expected); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 4) + else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) { + fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 4 was expected"); + .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected); } message = (TextMessage) xaDurSub.receive(1000); if (message == null) { - fail("no message received! expected: " + 7); + fail("no message received! expected one of: " + expected); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != 7) + else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) { fail("wrong sequence number: " + message - .getLongProperty(_sequenceNumberPropertyName) + " 7 was expected"); + .getLongProperty(_sequenceNumberPropertyName) + " expected one from " + expected); } } catch (Exception e) @@ -936,8 +950,18 @@ public class TopicTest extends AbstractXATestCase try { - // consume messages 1 - 4 - //----- start xid1 + // consume messages: could be any from (1 - 4, 7-10) + //----- start xid4 + Set<Long> expected = new HashSet<Long>(); + Set<Long> xid4msgs = new HashSet<Long>(); + for(long l = 1; l <= 4l; l++) + { + expected.add(l); + } + for(long l = 7; l <= 10l; l++) + { + expected.add(l); + } _xaResource.start(xid4, XAResource.TMNOFLAGS); for (int i = 1; i <= 4; i++) { @@ -946,9 +970,14 @@ public class TopicTest extends AbstractXATestCase { fail("no message received! expected: " + i); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + + long seqNo = message.getLongProperty(_sequenceNumberPropertyName); + xid4msgs.add(seqNo); + + if (!expected.remove(seqNo)) { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + fail("wrong sequence number: " + seqNo + + " expected one from " + expected); } } _xaResource.end(xid4, XAResource.TMSUSPEND); @@ -961,15 +990,17 @@ public class TopicTest extends AbstractXATestCase { fail("no message received! expected: " + i); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName) + + " expected one from " + expected); } } _xaResource.end(xid5, XAResource.TMSUSPEND); // abort tx4 _xaResource.prepare(xid4); _xaResource.rollback(xid4); + expected.addAll(xid4msgs); // consume messages 1-4 with tx5 _xaResource.start(xid5, XAResource.TMRESUME); for (int i = 1; i <= 4; i++) @@ -979,13 +1010,15 @@ public class TopicTest extends AbstractXATestCase { fail("no message received! expected: " + i); } - else if (message.getLongProperty(_sequenceNumberPropertyName) != i) + else if (!expected.remove(message.getLongProperty(_sequenceNumberPropertyName))) { - fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); + fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName) + + " expected one from " + expected); } } _xaResource.end(xid5, XAResource.TMSUSPEND); // commit tx5 + _xaResource.prepare(xid5); _xaResource.commit(xid5, false); } @@ -1602,6 +1635,7 @@ public class TopicTest extends AbstractXATestCase } _xaResource.end(xid2, XAResource.TMSUCCESS); _xaResource.commit(xid2, true); + _session.close(); } catch (Exception e) { |
