diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 22:58:57 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 22:58:57 +0000 |
| commit | 953ed819249457a5a6c4349c3b215f26d1abba16 (patch) | |
| tree | 48340ad71f89a641111000ef4c6b63d8b2ce1ad7 /java/broker/src/test | |
| parent | a581be3f131e53b3f18aff392d5d28222d20e71d (diff) | |
| download | qpid-python-953ed819249457a5a6c4349c3b215f26d1abba16.tar.gz | |
Merged from java-broker-0-10 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@829675 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test')
43 files changed, 1266 insertions, 1573 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java index e3889162ad..3cdb9c0c2d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHost; public class AMQBrokerManagerMBeanTest extends TestCase @@ -46,7 +47,7 @@ public class AMQBrokerManagerMBeanTest extends TestCase assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null); - ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject()); + ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject()); mbean.createNewExchange(exchange1, "direct", false); mbean.createNewExchange(exchange2, "topic", false); mbean.createNewExchange(exchange3, "headers", false); @@ -68,7 +69,7 @@ public class AMQBrokerManagerMBeanTest extends TestCase { String queueName = "testQueue_" + System.currentTimeMillis(); - ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject()); + ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject()); assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null); diff --git a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java index 105056fe3d..d2408ba21f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java @@ -22,23 +22,22 @@ package org.apache.qpid.server; import junit.framework.TestCase; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; -import org.apache.qpid.server.queue.MockQueueEntry; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.SimpleQueueEntryList; import org.apache.qpid.server.queue.MockAMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.queue.QueueEntryIterator; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.MockSubscription; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.AMQException; import java.util.Map; import java.util.LinkedHashMap; import java.util.LinkedList; -import java.util.Iterator; /** * QPID-1385 : Race condition between added to unacked map and resending due to a rollback. @@ -63,6 +62,7 @@ public class ExtractResendAndRequeueTest extends TestCase UnacknowledgedMessageMapImpl _unacknowledgedMessageMap; private static final int INITIAL_MSG_COUNT = 10; private AMQQueue _queue = new MockAMQQueue(getName()); + private MessageStore _messageStore = new MemoryMessageStore(); private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>(); @Override @@ -89,7 +89,7 @@ public class ExtractResendAndRequeueTest extends TestCase while(queueEntries.advance()) { QueueEntry entry = queueEntries.getNode(); - _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(), entry); + _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry); // Store the entry for future inspection _referenceList.add(entry); @@ -137,7 +137,7 @@ public class ExtractResendAndRequeueTest extends TestCase // requeueIfUnabletoResend doesn't matter here. _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, new StoreContext())); + msgToResend, true, _messageStore)); assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size()); assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); @@ -166,7 +166,7 @@ public class ExtractResendAndRequeueTest extends TestCase // requeueIfUnabletoResend doesn't matter here. _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, new StoreContext())); + msgToResend, true, _messageStore)); assertEquals("Message count for resend not correct.", 0, msgToResend.size()); assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size()); @@ -187,7 +187,7 @@ public class ExtractResendAndRequeueTest extends TestCase // requeueIfUnabletoResend = true so all messages should go to msgToRequeue _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, new StoreContext())); + msgToResend, true, _messageStore)); assertEquals("Message count for resend not correct.", 0, msgToResend.size()); assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size()); @@ -208,7 +208,7 @@ public class ExtractResendAndRequeueTest extends TestCase // requeueIfUnabletoResend = false so all messages should be dropped all maps should be empty _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, false, new StoreContext())); + msgToResend, false, _messageStore)); assertEquals("Message count for resend not correct.", 0, msgToResend.size()); assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); @@ -240,7 +240,7 @@ public class ExtractResendAndRequeueTest extends TestCase // requeueIfUnabletoResend : value doesn't matter here as queue has been deleted _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, false, new StoreContext())); + msgToResend, false, _messageStore)); assertEquals("Message count for resend not correct.", 0, msgToResend.size()); assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java deleted file mode 100644 index 06d6b6de8b..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.ack; - -import junit.framework.TestCase; - -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.AMQMessageHandle; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.txn.TransactionalContext; - -import java.util.*; - -public class TxAckTest extends TestCase -{ - private Scenario individual; - private Scenario multiple; - private Scenario combined; - - protected void setUp() throws Exception - { - super.setUp(); - - - // Highlight that this test will cause the creation of an AR - ApplicationRegistry.getInstance(); - - //ack only 5th msg - individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l)); - individual.update(5, false); - - //ack all up to and including 5th msg - multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l)); - multiple.update(5, true); - - //leave only 8th and 9th unacked - combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l)); - combined.update(3, false); - combined.update(5, true); - combined.update(7, true); - combined.update(2, true);//should be ignored - combined.update(1, false);//should be ignored - combined.update(10, false); - } - - @Override - protected void tearDown() throws Exception - { - individual.stop(); - multiple.stop(); - combined.stop(); - - // Ensure we close the AR we created - ApplicationRegistry.remove(); - super.tearDown(); - } - - public void testPrepare() throws AMQException - { - individual.prepare(); - multiple.prepare(); - combined.prepare(); - } - - public void testUndoPrepare() throws AMQException - { - individual.undoPrepare(); - multiple.undoPrepare(); - combined.undoPrepare(); - } - - public void testCommit() throws AMQException - { - individual.commit(); - multiple.commit(); - combined.commit(); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TxAckTest.class); - } - - private class Scenario - { - private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000); - private final TxAck _op = new TxAck(_map); - private final List<Long> _acked; - private final List<Long> _unacked; - private StoreContext _storeContext = new StoreContext(); - private AMQQueue _queue; - - Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception - { - TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(), - _storeContext, null, - new LinkedList<RequiredDeliveryException>() - ); - - VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); - - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, - virtualHost, null); - - for (int i = 0; i < messageCount; i++) - { - long deliveryTag = i + 1; - - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; - - TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); - _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message)); - } - _acked = acked; - _unacked = unacked; - } - - void update(long deliverytag, boolean multiple) - { - _op.update(deliverytag, multiple); - } - - private void assertCount(List<Long> tags, int expected) - { - for (long tag : tags) - { - QueueEntry u = _map.get(tag); - assertTrue("Message not found for tag " + tag, u != null); - ((TestMessage) u.getMessage()).assertCountEquals(expected); - } - } - - void prepare() throws AMQException - { - _op.consolidate(); - _op.prepare(_storeContext); - - assertCount(_acked, -1); - assertCount(_unacked, 0); - - } - - void undoPrepare() - { - _op.consolidate(); - _op.undoPrepare(); - - assertCount(_acked, 1); - assertCount(_unacked, 0); - } - - void commit() - { - _op.consolidate(); - _op.commit(_storeContext); - - //check acked messages are removed from map - Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags()); - keys.retainAll(_acked); - assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty()); - //check unacked messages are still in map - keys = new HashSet<Long>(_unacked); - keys.removeAll(_map.getDeliveryTags()); - assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty()); - } - - public void stop() - { - _queue.stop(); - } - } - - private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) - { - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - null, - false); - try - { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), - publishBody, - new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); - } - catch (AMQException e) - { - // won't happen - } - - - return amqMessageHandle; - } - - - private class TestMessage extends AMQMessage - { - private final long _tag; - private int _count; - - TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) - throws AMQException - { - super(createMessageHandle(messageId, publishBody), storeContext, publishBody); - _tag = tag; - } - - - public boolean incrementReference() - { - _count++; - return true; - } - - public void decrementReference(StoreContext context) - { - _count--; - } - - void assertCountEquals(int expected) - { - assertEquals("Wrong count for message with tag " + _tag, expected, _count); - } - } -} diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index c5f291a0cb..5bd739c0af 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,6 +27,7 @@ import java.io.RandomAccessFile; import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Collections; import junit.framework.TestCase; @@ -482,12 +483,17 @@ public class ServerConfigurationTest extends TestCase { // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); - assertEquals(5672, serverConfig.getPort()); + assertNotNull(serverConfig.getPorts()); + assertEquals(1, serverConfig.getPorts().size()); + assertEquals(5672, serverConfig.getPorts().get(0)); + // Check value we set - _config.setProperty("connector.port", 10); + _config.setProperty("connector.port", "10"); serverConfig = new ServerConfiguration(_config); - assertEquals(10, serverConfig.getPort()); + assertNotNull(serverConfig.getPorts()); + assertEquals(1, serverConfig.getPorts().size()); + assertEquals("10", serverConfig.getPorts().get(0)); } public void testGetBind() throws ConfigurationException @@ -723,7 +729,9 @@ public class ServerConfigurationTest extends TestCase ServerConfiguration config = new ServerConfiguration(mainFile.getAbsoluteFile()); assertEquals(4235, config.getSSLPort()); // From first file, not // overriden by second - assertEquals(2342, config.getPort()); // From the first file, not + assertNotNull(config.getPorts()); + assertEquals(1, config.getPorts().size()); + assertEquals("2342", config.getPorts().get(0)); // From the first file, not // present in the second assertEquals(true, config.getQpidNIO()); // From the second file, not // present in the first @@ -967,7 +975,7 @@ public class ServerConfigurationTest extends TestCase out.write("\t<rule access=\"deny\" network=\"127.0.0.1\"/>"); out.write("</firewall>\n"); out.close(); - + reg.getConfiguration().reparseConfigFile(); assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost)); diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index 5d3b4e681a..b65020395c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -14,21 +14,20 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. + * */ package org.apache.qpid.server.configuration; import junit.framework.TestCase; -import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class VirtualHostConfigurationTest extends TestCase { @@ -55,50 +54,50 @@ public class VirtualHostConfigurationTest extends TestCase super.tearDown(); } - + public void testQueuePriority() throws Exception { // Set up queue with 5 priorities - configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "atest"); - configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", + configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", "amq.direct"); - configXml.addProperty("virtualhost.test.queues.queue.atest.priorities", + configXml.addProperty("virtualhost.test.queues.queue.atest.priorities", "5"); // Set up queue with JMS style priorities - configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "ptest"); - configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange", + configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange", "amq.direct"); - configXml.addProperty("virtualhost.test.queues.queue.ptest.priority", + configXml.addProperty("virtualhost.test.queues.queue.ptest.priority", "true"); - + // Set up queue with no priorities - configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "ntest"); - configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange", + configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange", "amq.direct"); - configXml.addProperty("virtualhost.test.queues.queue.ntest.priority", + configXml.addProperty("virtualhost.test.queues.queue.ntest.priority", "false"); - - VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test"))); - + + VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test"))); + // Check that atest was a priority queue with 5 priorities AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest")); assertTrue(atest instanceof AMQPriorityQueue); assertEquals(5, ((AMQPriorityQueue) atest).getPriorities()); - + // Check that ptest was a priority queue with 10 priorities AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest")); assertTrue(ptest instanceof AMQPriorityQueue); assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities()); - + // Check that ntest wasn't a priority queue AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest")); assertFalse(ntest instanceof AMQPriorityQueue); } - + public void testQueueAlerts() throws Exception { // Set up queue with 5 priorities @@ -106,7 +105,7 @@ public class VirtualHostConfigurationTest extends TestCase configXml.addProperty("virtualhost.test.queues.maximumQueueDepth", "1"); configXml.addProperty("virtualhost.test.queues.maximumMessageSize", "2"); configXml.addProperty("virtualhost.test.queues.maximumMessageAge", "3"); - + configXml.addProperty("virtualhost.test.queues(-1).queue(1).name(1)", "atest"); configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", "amq.direct"); configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumQueueDepth", "4"); @@ -114,21 +113,21 @@ public class VirtualHostConfigurationTest extends TestCase configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumMessageAge", "6"); configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "btest"); - - VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test"))); - + + VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test"))); + // Check specifically configured values AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest")); assertEquals(4, aTest.getMaximumQueueDepth()); assertEquals(5, aTest.getMaximumMessageSize()); assertEquals(6, aTest.getMaximumMessageAge()); - - // Check default values + + // Check default values AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest")); assertEquals(1, bTest.getMaximumQueueDepth()); assertEquals(2, bTest.getMaximumMessageSize()); assertEquals(3, bTest.getMaximumMessageAge()); - + } - + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index f8dd266bd2..e26b5b048c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -27,18 +27,18 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.log4j.Logger; import java.util.*; +import java.util.concurrent.atomic.AtomicLong; public class AbstractHeadersExchangeTestBase extends TestCase { @@ -52,10 +52,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase */ private MessageStore _store = new MemoryMessageStore(); - private StoreContext _storeContext = new StoreContext(); - - private MessageHandleFactory _handleFactory = new MessageHandleFactory(); - private int count; public void testDoNothing() @@ -91,14 +87,18 @@ public class AbstractHeadersExchangeTestBase extends TestCase } - protected void route(Message m) throws AMQException + protected int route(Message m) throws AMQException { + m.getIncomingMessage().headersReceived(); m.route(exchange); - m.getIncomingMessage().routingComplete(_store, _handleFactory); if(m.getIncomingMessage().allContentReceived()) { - m.getIncomingMessage().deliverToQueues(); + for(AMQQueue q : m.getIncomingMessage().getDestinationQueues()) + { + q.enqueue(m); + } } + return m.getIncomingMessage().getDestinationQueues().size(); } protected void routeAndTest(Message m, TestQueue... expected) throws AMQException @@ -118,10 +118,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException { - try - { - route(m); - assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn); + int queueCount = route(m); + for (TestQueue q : queues) { if (expected.contains(q)) @@ -135,12 +133,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; } } - } - catch (NoRouteException ex) - { - assertTrue("Expected "+m+" not to be returned",expectReturn); - } + if(expectReturn) + { + assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount); + } } @@ -242,6 +239,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase { final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); + public String toString() + { + return getName().toString(); + } + public TestQueue(AMQShortString name) throws AMQException { super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test")); @@ -256,9 +258,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase * @throws AMQException */ @Override - public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException + public QueueEntry enqueue(ServerMessage msg) throws AMQException { - messages.add( new HeadersExchangeTest.Message(msg)); + messages.add( new HeadersExchangeTest.Message((AMQMessage) msg)); return new QueueEntry() { @@ -317,6 +319,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase return false; //To change body of implemented methods use File | Settings | File Templates. } + public boolean isAcquiredBy(Subscription subscription) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + public void setDeliveredToSubscription() { //To change body of implemented methods use File | Settings | File Templates. @@ -327,9 +334,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } - public String debugIdentity() + public boolean releaseButRetain() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean immediateAndNotDelivered() @@ -337,11 +344,26 @@ public class AbstractHeadersExchangeTestBase extends TestCase return false; //To change body of implemented methods use File | Settings | File Templates. } - public void setRedelivered(boolean b) + public void setRedelivered() { //To change body of implemented methods use File | Settings | File Templates. } + public AMQMessageHeader getMessageHeader() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isPersistent() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isRedelivered() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + public Subscription getDeliveredSubscription() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -362,17 +384,22 @@ public class AbstractHeadersExchangeTestBase extends TestCase return false; //To change body of implemented methods use File | Settings | File Templates. } - public void requeue(StoreContext storeContext) throws AMQException + public void requeue() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void requeue(Subscription subscription) { //To change body of implemented methods use File | Settings | File Templates. } - public void dequeue(final StoreContext storeContext) throws FailedDequeueException + public void dequeue() { //To change body of implemented methods use File | Settings | File Templates. } - public void dispose(final StoreContext storeContext) throws MessageCleanupException + public void dispose() { //To change body of implemented methods use File | Settings | File Templates. } @@ -382,7 +409,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException + public void discard() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void routeToAlternate() { //To change body of implemented methods use File | Settings | File Templates. } @@ -421,15 +453,16 @@ public class AbstractHeadersExchangeTestBase extends TestCase */ static class Message extends AMQMessage { + private static AtomicLong _messageId = new AtomicLong(); + private class TestIncomingMessage extends IncomingMessage { public TestIncomingMessage(final long messageId, final MessagePublishInfo info, - final TransactionalContext txnContext, final AMQProtocolSession publisher) { - super(messageId, info, txnContext, publisher); + super(info); } @@ -439,7 +472,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase } - public ContentHeaderBody getContentHeaderBody() + public ContentHeaderBody getContentHeader() { try { @@ -454,15 +487,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase private IncomingMessage _incoming; - private static MessageStore _messageStore = new SkeletonMessageStore(); - - private static StoreContext _storeContext = new StoreContext(); - - - private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, - null, - new LinkedList<RequiredDeliveryException>() - ); Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException { @@ -471,7 +495,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException { - this(protocolSession, _messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null); + this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST); } public IncomingMessage getIncomingMessage() @@ -484,46 +508,34 @@ public class AbstractHeadersExchangeTestBase extends TestCase ContentHeaderBody header, List<ContentBody> bodies) throws AMQException { - super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish); + super(new MockStoredMessage(messageId, publish, header)); + + StoredMessage<MessageMetaData> storedMessage = getStoredMessage(); + int pos = 0; + for(ContentBody body : bodies) + { + storedMessage.addContent(pos, body.payload.duplicate().buf()); + pos += body.payload.limit(); + } - - _incoming = new TestIncomingMessage(getMessageId(),publish, _txnContext, protocolsession); + _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession); _incoming.setContentHeaderBody(header); } - private static AMQMessageHandle createMessageHandle(final long messageId, - final MessagePublishInfo publish, - final ContentHeaderBody header) - { - - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - _messageStore, - true); - - try - { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header); - } - catch (AMQException e) - { - - } - return amqMessageHandle; - } private Message(AMQMessage msg) throws AMQException { - super(msg); + super(msg.getStoredMessage()); } void route(Exchange exchange) throws AMQException { - exchange.route(_incoming); + _incoming.enqueue(exchange.route(_incoming)); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java index ac12e050b4..016f7eacbe 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 86ba96bf5d..dc47951548 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -22,16 +22,95 @@ package org.apache.qpid.server.exchange; import java.util.Map; import java.util.HashMap; +import java.util.Set; import junit.framework.TestCase; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.message.AMQMessageHeader; /** */ public class HeadersBindingTest extends TestCase { + + private class MockHeader implements AMQMessageHeader + { + + private final Map<String, Object> _headers = new HashMap<String, Object>(); + + public String getCorrelationId() + { + return null; + } + + public long getExpiration() + { + return 0; + } + + public String getMessageId() + { + return null; + } + + public String getMimeType() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public String getEncoding() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public byte getPriority() + { + return 0; + } + + public long getTimestamp() + { + return 0; + } + + public String getType() + { + return null; + } + + public String getReplyTo() + { + return null; + } + + public Object getHeader(String name) + { + return _headers.get(name); + } + + public boolean containsHeaders(Set<String> names) + { + return _headers.keySet().containsAll(names); + } + + public boolean containsHeader(String name) + { + return _headers.containsKey(name); + } + + public void setString(String key, String value) + { + setObject(key,value); + } + + public void setObject(String key, Object value) + { + _headers.put(key,value); + } + } + private FieldTable bindHeaders = new FieldTable(); - private FieldTable matchHeaders = new FieldTable(); + private MockHeader matchHeaders = new MockHeader(); public void testDefault_1() { diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 750a1cd081..580bc78b8d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -35,7 +35,8 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase super.setUp(); // AR will use the NullAR by default // Just use the first vhost. - VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); + VirtualHost + virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); _protocolSession = new InternalTestProtocolSession(virtualHost); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 82b8f76efc..9d7a323b6d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -14,9 +14,9 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.server.exchange; @@ -25,28 +25,24 @@ import junit.framework.Assert; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import java.util.LinkedList; - -public class DestWildExchangeTest extends TestCase +public class TopicExchangeTest extends TestCase { TopicExchange _exchange; VirtualHost _vhost; MessageStore _store; - StoreContext _context; InternalTestProtocolSession _protocolSession; @@ -56,27 +52,26 @@ public class DestWildExchangeTest extends TestCase _exchange = new TopicExchange(); _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); _store = new MemoryMessageStore(); - _context = new StoreContext(); _protocolSession = new InternalTestProtocolSession(_vhost); } public void tearDown() { - ApplicationRegistry.remove(); + ApplicationRegistry.remove(); } public void testNoRoute() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null); _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null); MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b")); - IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession); + IncomingMessage message = new IncomingMessage(info); - _exchange.route(message); + message.enqueue(_exchange.route(message)); Assert.assertEquals(0, queue.getMessageCount()); } @@ -89,33 +84,20 @@ public class DestWildExchangeTest extends TestCase IncomingMessage message = createMessage("a.b"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has route and should be routed"); - } + routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.c"); - try - { - routeMessage(message); - fail("Message has no route and should fail to be routed"); - } - catch (AMQException nre) - { - } + int queueCount = routeMessage(message); + Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } @@ -129,52 +111,33 @@ public class DestWildExchangeTest extends TestCase IncomingMessage message = createMessage("a.b"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has route and should be routed"); - } + routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.c"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has route and should be routed"); - } + int queueCount = routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a"); - try - { - routeMessage(message); - fail("Message has no route and should fail to be routed"); - } - catch (AMQException nre) - { - } + + queueCount = routeMessage(message); + Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } @@ -187,89 +150,56 @@ public class DestWildExchangeTest extends TestCase IncomingMessage message = createMessage("a.b.c"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has route and should be routed"); - } + int queueCount = routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.b"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has route and should be routed"); - } + queueCount = routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.c"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has route and should be routed"); - } + queueCount = routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has route and should be routed"); - } + queueCount = routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("b"); - try - { - routeMessage(message); - fail("Message has no route and should fail to be routed"); - } - catch (AMQException nre) - { - } + + queueCount = routeMessage(message); + Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } @@ -283,38 +213,24 @@ public class DestWildExchangeTest extends TestCase IncomingMessage message = createMessage("a.c.d.b"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has no route and should be routed"); - } + routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.c.b"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has no route and should be routed"); - } + routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); } @@ -327,66 +243,39 @@ public class DestWildExchangeTest extends TestCase IncomingMessage message = createMessage("a.c.b.b"); - try - { - routeMessage(message); - fail("Message has route and should not be routed"); - } - catch (AMQException nre) - { - } + int queueCount = routeMessage(message); + Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.a.b.c"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has no route and should be routed"); - } + routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.b.c.b"); - try - { - routeMessage(message); - fail("Message has route and should not be routed"); - } - catch (AMQException nre) - { - } + queueCount = routeMessage(message); + Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.b.c.b.c"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has no route and should be routed"); - - } + routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); } @@ -400,34 +289,21 @@ public class DestWildExchangeTest extends TestCase IncomingMessage message = createMessage("a.c.b.b.c"); - try - { - routeMessage(message); - fail("Message has route and should not be routed"); - } - catch (AMQException nre) - { - } + int queueCount = routeMessage(message); + Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.a.b.c.d"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has no route and should be routed"); - } + routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); } @@ -440,33 +316,20 @@ public class DestWildExchangeTest extends TestCase IncomingMessage message = createMessage("a.c.b.b.c"); - try - { - routeMessage(message); - fail("Message has route and should not be routed"); - } - catch (AMQException nre) - { - } + int queueCount = routeMessage(message); + Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); message = createMessage("a.a.b.c.d"); - try - { - routeMessage(message); - } - catch (AMQException nre) - { - fail("Message has no route and should be routed"); - } + routeMessage(message); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); - queue.deleteMessageFromTop(_context); + queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); } @@ -479,25 +342,26 @@ public class DestWildExchangeTest extends TestCase IncomingMessage message = createMessage("a.b.c"); - try - { - routeMessage(message); - fail("Message has route and should not be routed"); - } - catch (AMQException nre) - { - } + int queueCount = routeMessage(message); + Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } - private void routeMessage(final IncomingMessage message) + private int routeMessage(final IncomingMessage message) throws AMQException { - _exchange.route(message); - message.routingComplete(_store, new MessageHandleFactory()); - message.deliverToQueues(); + MessageMetaData mmd = message.headersReceived(); + message.setStoredMessage(_store.addMessage(mmd)); + + message.enqueue(_exchange.route(message)); + AMQMessage msg = new AMQMessage(message.getStoredMessage()); + for(AMQQueue q : message.getDestinationQueues()) + { + q.enqueue(msg); + } + return message.getDestinationQueues().size(); } public void testMoreRouting() throws AMQException @@ -508,14 +372,8 @@ public class DestWildExchangeTest extends TestCase IncomingMessage message = createMessage("a.b.c"); - try - { - routeMessage(message); - fail("Message has route and should not be routed"); - } - catch (AMQException nre) - { - } + int queueCount = routeMessage(message); + Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -529,14 +387,8 @@ public class DestWildExchangeTest extends TestCase IncomingMessage message = createMessage("a"); - try - { - routeMessage(message); - fail("Message has route and should not be routed"); - } - catch (AMQException nre) - { - } + int queueCount = routeMessage(message); + Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -546,12 +398,11 @@ public class DestWildExchangeTest extends TestCase { MessagePublishInfo info = new PublishInfo(new AMQShortString(s)); - TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null, - new LinkedList<RequiredDeliveryException>() - ); - - IncomingMessage message = new IncomingMessage(0L, info, trancontext,_protocolSession); - message.setContentHeaderBody( new ContentHeaderBody()); + IncomingMessage message = new IncomingMessage(info); + final ContentHeaderBody chb = new ContentHeaderBody(); + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + chb.properties = props; + message.setContentHeaderBody(chb); return message; @@ -574,7 +425,7 @@ public class DestWildExchangeTest extends TestCase public void setExchange(AMQShortString exchange) { - + } public boolean isImmediate() diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java index 0fa126dedb..46dc677921 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java @@ -121,7 +121,7 @@ public class AMQPChannelActorTest extends TestCase // Verify that the message has the correct type assertTrue("Message contains the [con: prefix", logs.get(0).toString().contains("[con:")); - + // Verify that all the values were presented to the MessageFormatter // so we will not end up with '{n}' entries in the log. diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java index 1b25844a14..98c14efe4d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java @@ -75,7 +75,7 @@ public class AMQPConnectionActorTest extends TestCase // Correctly Close the AR we created ApplicationRegistry.remove(); - super.tearDown(); + super.tearDown(); } private void setUpWithConfig(ServerConfiguration serverConfig) throws AMQException diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java index e02993b840..e3280a4076 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java @@ -41,12 +41,12 @@ public class MessageStoreMessagesTest extends AbstractTestMessages { String location = "/path/to/the/message/store.files"; - _logMessage = MessageStoreMessages.MST_1002(location); + _logMessage = ConfigStoreMessages.CFG_1002(location); List<Object> log = performLog(); String[] expected = {"Store location :", location}; - validateLogMessage(log, "MST-1002", expected); + validateLogMessage(log, "CFG-1002", expected); } public void testMessage1003() @@ -59,7 +59,7 @@ public class MessageStoreMessagesTest extends AbstractTestMessages validateLogMessage(log, "MST-1003", expected); } - public void testMessage1004() + /* public void testMessage1004() { _logMessage = MessageStoreMessages.MST_1004(null,false); List<Object> log = performLog(); @@ -91,7 +91,7 @@ public class MessageStoreMessagesTest extends AbstractTestMessages // Here we use MessageFormat to ensure the messasgeCount of 2000 is // reformated for display as '2,000' - String[] expected = {"Recovered ", + String[] expected = {"Recovered ", MessageFormat.format("{0,number}", messasgeCount), "messages for queue", queueName}; @@ -119,5 +119,5 @@ public class MessageStoreMessagesTest extends AbstractTestMessages validateLogMessage(log, "MST-1006", expected); } - + */ } diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index bc36c61382..e6561a06b9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -23,19 +23,16 @@ package org.apache.qpid.server.protocol; import junit.framework.TestCase; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import javax.management.JMException; -import java.security.Principal; /** Test class to test MBean operations for AMQMinaProtocolSession. */ public class AMQProtocolSessionMBeanTest extends TestCase diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index ec7bf1cb72..681e513ecb 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -21,19 +21,19 @@ package org.apache.qpid.server.protocol; import java.security.Principal; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.transport.TestNetworkDriver; public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter @@ -70,6 +70,16 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr return (byte) 8; } + public void writeReturn(MessagePublishInfo messagePublishInfo, + ContentHeaderBody header, + MessageContentSource msgContent, + int channelId, + int replyCode, + AMQShortString replyText) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + public byte getProtocolMinorVersion() { return (byte) 0; @@ -82,12 +92,12 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr synchronized (_channelDelivers) { List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag); - + if (all == null) { return new ArrayList<DeliveryPair>(0); } - + List<DeliveryPair> msgs = all.subList(0, count); List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs); @@ -108,7 +118,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { } - public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException + public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { _deliveryCount.incrementAndGet(); @@ -130,11 +140,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr consumers.put(consumerTag, consumerDelivers); } - consumerDelivers.add(new DeliveryPair(deliveryTag, message)); + consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage())); } } - public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException + public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException { } diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java index e37492bcb0..13e712dbac 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java @@ -22,17 +22,10 @@ package org.apache.qpid.server.protocol; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; - -import java.security.Principal; /** Test class to test MBean operations for AMQMinaProtocolSession. */ public class MaxChannelsTest extends TestCase @@ -66,14 +59,14 @@ public class MaxChannelsTest extends TestCase } assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size())); } - + @Override public void setUp() { //Highlight that this test will cause a new AR to be created ApplicationRegistry.getInstance(); } - + @Override public void tearDown() throws Exception { @@ -87,7 +80,7 @@ public class MaxChannelsTest extends TestCase { // Correctly Close the AR we created ApplicationRegistry.remove(); - } + } } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index aff7af6952..dd013e6ad5 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -1,6 +1,6 @@ package org.apache.qpid.server.queue; /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,21 +8,22 @@ package org.apache.qpid.server.queue; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ import java.util.ArrayList; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import junit.framework.AssertionFailedError; @@ -42,38 +43,38 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest { // Enqueue messages in order - _queue.enqueue(null, createMessage(1L, (byte) 10)); - _queue.enqueue(null, createMessage(2L, (byte) 4)); - _queue.enqueue(null, createMessage(3L, (byte) 0)); - + _queue.enqueue(createMessage(1L, (byte) 10)); + _queue.enqueue(createMessage(2L, (byte) 4)); + _queue.enqueue(createMessage(3L, (byte) 0)); + // Enqueue messages in reverse order - _queue.enqueue(null, createMessage(4L, (byte) 0)); - _queue.enqueue(null, createMessage(5L, (byte) 4)); - _queue.enqueue(null, createMessage(6L, (byte) 10)); - + _queue.enqueue(createMessage(4L, (byte) 0)); + _queue.enqueue(createMessage(5L, (byte) 4)); + _queue.enqueue(createMessage(6L, (byte) 10)); + // Enqueue messages out of order - _queue.enqueue(null, createMessage(7L, (byte) 4)); - _queue.enqueue(null, createMessage(8L, (byte) 10)); - _queue.enqueue(null, createMessage(9L, (byte) 0)); - + _queue.enqueue(createMessage(7L, (byte) 4)); + _queue.enqueue(createMessage(8L, (byte) 10)); + _queue.enqueue(createMessage(9L, (byte) 0)); + // Register subscriber _queue.registerSubscription(_subscription, false); Thread.sleep(150); - + ArrayList<QueueEntry> msgs = _subscription.getMessages(); try { - assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId()); - assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId()); - assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId()); + assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber()); + assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber()); + assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber()); - assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId()); - assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId()); - assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId()); + assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber()); + assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber()); + assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber()); - assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId()); - assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId()); - assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId()); + assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber()); + assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber()); + assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber()); } catch (AssertionFailedError afe) { @@ -81,7 +82,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest int index = 1; for (QueueEntry qe : msgs) { - System.err.println(index + ":" + qe.getMessage().getMessageId()); + System.err.println(index + ":" + qe.getMessage().getMessageNumber()); index++; } @@ -98,10 +99,10 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest msg.getContentHeaderBody().properties = props; return msg; } - + protected AMQMessage createMessage(Long id) throws AMQException { return createMessage(id, (byte) 0); } - + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 19470e6226..5f0d77afea 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -20,21 +20,17 @@ */ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.LinkedList; - -import javax.management.Notification; - import junit.framework.TestCase; - import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.protocol.AMQProtocolEngine; import org.apache.qpid.server.protocol.InternalTestProtocolSession; @@ -42,16 +38,16 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import javax.management.Notification; +import java.util.ArrayList; + /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ public class AMQQueueAlertTest extends TestCase -{ +{ private final static long MAX_MESSAGE_COUNT = 50; private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB @@ -61,11 +57,6 @@ public class AMQQueueAlertTest extends TestCase private VirtualHost _virtualHost; private AMQProtocolEngine _protocolSession; private MessageStore _messageStore = new MemoryMessageStore(); - private StoreContext _storeContext = new StoreContext(); - private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, - null, - new LinkedList<RequiredDeliveryException>() - ); private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE; /** @@ -75,6 +66,10 @@ public class AMQQueueAlertTest extends TestCase */ public void testMessageCountAlert() throws Exception { + _protocolSession = new InternalTestProtocolSession(_virtualHost); + AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); + _protocolSession.addChannel(channel); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"), false, _virtualHost, null); @@ -82,7 +77,7 @@ public class AMQQueueAlertTest extends TestCase _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); - sendMessages(MAX_MESSAGE_COUNT, 256l); + sendMessages(channel, MAX_MESSAGE_COUNT, 256l); assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT); Notification lastNotification = _queueMBean.getLastNotification(); @@ -99,6 +94,10 @@ public class AMQQueueAlertTest extends TestCase */ public void testMessageSizeAlert() throws Exception { + _protocolSession = new InternalTestProtocolSession(_virtualHost); + AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); + _protocolSession.addChannel(channel); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"), false, _virtualHost, null); @@ -106,7 +105,7 @@ public class AMQQueueAlertTest extends TestCase _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE); - sendMessages(1, MAX_MESSAGE_SIZE * 2); + sendMessages(channel, 1, MAX_MESSAGE_SIZE * 2); assertTrue(_queueMBean.getMessageCount() == 1); Notification lastNotification = _queueMBean.getLastNotification(); @@ -125,6 +124,10 @@ public class AMQQueueAlertTest extends TestCase */ public void testQueueDepthAlertNoSubscriber() throws Exception { + _protocolSession = new InternalTestProtocolSession(_virtualHost); + AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); + _protocolSession.addChannel(channel); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"), false, _virtualHost, null); @@ -134,7 +137,7 @@ public class AMQQueueAlertTest extends TestCase while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH) { - sendMessages(1, MAX_MESSAGE_SIZE); + sendMessages(channel, 1, MAX_MESSAGE_SIZE); } Notification lastNotification = _queueMBean.getLastNotification(); @@ -154,6 +157,10 @@ public class AMQQueueAlertTest extends TestCase */ public void testMessageAgeAlert() throws Exception { + _protocolSession = new InternalTestProtocolSession(_virtualHost); + AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); + _protocolSession.addChannel(channel); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"), false, _virtualHost, null); @@ -161,7 +168,7 @@ public class AMQQueueAlertTest extends TestCase _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE); - sendMessages(1, MAX_MESSAGE_SIZE); + sendMessages(channel, 1, MAX_MESSAGE_SIZE); // Ensure message sits on queue long enough to age. Thread.sleep(MAX_MESSAGE_AGE * 2); @@ -201,7 +208,7 @@ public class AMQQueueAlertTest extends TestCase // Send messages(no of message to be little more than what can cause a Queue_Depth alert) int messageCount = Math.round(MAX_QUEUE_DEPTH / MAX_MESSAGE_SIZE) + 10; long totalSize = (messageCount * MAX_MESSAGE_SIZE); - sendMessages(messageCount, MAX_MESSAGE_SIZE); + sendMessages(channel, messageCount, MAX_MESSAGE_SIZE); // Check queueDepth. There should be no messages on the queue and as the subscriber is listening // so there should be no Queue_Deoth alert raised @@ -228,7 +235,7 @@ public class AMQQueueAlertTest extends TestCase _queue.registerSubscription( subscription2, false); - + while (_queue.getUndeliveredMessageCount()!= 0) { Thread.sleep(100); @@ -247,7 +254,7 @@ public class AMQQueueAlertTest extends TestCase _queueMBean.clearQueue(); assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth())); } - + protected IncomingMessage message(final boolean immediate, long size) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() @@ -280,8 +287,10 @@ public class AMQQueueAlertTest extends TestCase }; ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + contentHeaderBody.properties = props; contentHeaderBody.bodySize = size; // in bytes - IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession); + IncomingMessage message = new IncomingMessage(publish); message.setContentHeaderBody(contentHeaderBody); return message; @@ -305,16 +314,19 @@ public class AMQQueueAlertTest extends TestCase } - private void sendMessages(long messageCount, final long size) throws AMQException + private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException { IncomingMessage[] messages = new IncomingMessage[(int) messageCount]; + MessageMetaData[] metaData = new MessageMetaData[(int) messageCount]; for (int i = 0; i < messages.length; i++) { messages[i] = message(false, size); ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); + metaData[i] = messages[i].headersReceived(); + messages[i].setStoredMessage(_messageStore.addMessage(metaData[i])); + messages[i].enqueue(qs); - messages[i].routingComplete(_messageStore, new MessageHandleFactory()); } @@ -324,6 +336,10 @@ public class AMQQueueAlertTest extends TestCase ByteBuffer _data = ByteBuffer.allocate((int)size); + { + _data.limit((int)size); + } + public int getSize() { return (int) size; @@ -336,10 +352,12 @@ public class AMQQueueAlertTest extends TestCase public void reduceToFit() { - + } }); - messages[i].deliverToQueues(); + + _queue.enqueue(new AMQMessage(messages[i].getStoredMessage())); + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index e692069663..97f061fdd1 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -25,7 +25,6 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; public class AMQQueueFactoryTest extends TestCase { @@ -55,31 +54,18 @@ public class AMQQueueFactoryTest extends TestCase FieldTable fieldTable = new FieldTable(); fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5); - try - { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false, - _virtualHost, fieldTable); - assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass()); - } - catch (AMQException e) - { - fail(e.getMessage()); - } + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false, + _virtualHost, fieldTable); + + assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass()); } public void testSimpleQueueRegistration() { - try - { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false, - _virtualHost, null); - assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); - } - catch (AMQException e) - { - fail(e.getMessage()); - } + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false, + _virtualHost, null); + assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 03eb7021ad..3bb8d397be 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -29,7 +29,10 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactory; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; @@ -38,19 +41,15 @@ import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.mina.common.ByteBuffer; +import org.apache.commons.configuration.PropertiesConfiguration; import javax.management.JMException; import java.util.ArrayList; -import java.util.LinkedList; -import java.util.Collections; /** * Test class to test AMQQueueMBean attribtues and operations @@ -61,8 +60,6 @@ public class AMQQueueMBeanTest extends TestCase private AMQQueue _queue; private AMQQueueMBean _queueMBean; private MessageStore _messageStore; - private StoreContext _storeContext = new StoreContext(); - private TransactionalContext _transactionalContext; private VirtualHost _virtualHost; private AMQProtocolSession _protocolSession; private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE; @@ -108,7 +105,7 @@ public class AMQQueueMBeanTest extends TestCase //Ensure that the data has been removed from the Store verifyBrokerState(); } - + public void testDeleteMessages() throws Exception { int messageCount = 10; @@ -129,9 +126,9 @@ public class AMQQueueMBeanTest extends TestCase } catch(Exception e) { - + } - + //delete last message, leaving 2nd to 9th _queueMBean.deleteMessages(10L,10L); assertTrue(_queueMBean.getMessageCount() == (messageCount - 2)); @@ -143,7 +140,7 @@ public class AMQQueueMBeanTest extends TestCase } catch(Exception e) { - + } //delete remaining messages, leaving none @@ -159,18 +156,16 @@ public class AMQQueueMBeanTest extends TestCase private void verifyBrokerState() { - TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore()); + TestableMemoryMessageStore store = (TestableMemoryMessageStore)_virtualHost.getMessageStore(); // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up. - assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap()); - assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size()); - assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap()); - assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size()); + + assertEquals("Store should have no messages:" + store.getMessageCount(), 0, store.getMessageCount()); } public void testConsumerCount() throws AMQException { - + assertTrue(_queue.getActiveConsumerCount() == 0); assertTrue(_queueMBean.getActiveConsumerCount() == 0); @@ -182,7 +177,7 @@ public class AMQQueueMBeanTest extends TestCase Subscription subscription = SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), protocolSession, new AMQShortString("test"), false, null, false, channel.getCreditManager()); - + _queue.registerSubscription(subscription, false); assertEquals(1,(int)_queueMBean.getActiveConsumerCount()); @@ -225,7 +220,6 @@ public class AMQQueueMBeanTest extends TestCase assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth)); assertTrue(_queueMBean.getName().equals("testQueue")); - assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest")); assertFalse(_queueMBean.isAutoDelete()); assertFalse(_queueMBean.isDurable()); } @@ -261,7 +255,7 @@ public class AMQQueueMBeanTest extends TestCase { } - + try { long end = Integer.MAX_VALUE; @@ -275,17 +269,22 @@ public class AMQQueueMBeanTest extends TestCase } IncomingMessage msg = message(false, false); - long id = msg.getMessageId(); - _queue.clearQueue(_storeContext); + _queue.clearQueue(); ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_messageStore, new MessageHandleFactory()); + MessageMetaData mmd = msg.headersReceived(); + msg.setStoredMessage(_messageStore.addMessage(mmd)); + long id = msg.getMessageNumber(); msg.addContentBodyFrame(new ContentChunk() { ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE); + { + _data.limit((int)MESSAGE_SIZE); + } + public int getSize() { return (int) MESSAGE_SIZE; @@ -301,7 +300,12 @@ public class AMQQueueMBeanTest extends TestCase } }); - msg.deliverToQueues(); + + AMQMessage m = new AMQMessage(msg.getStoredMessage()); + for(AMQQueue q : msg.getDestinationQueues()) + { + q.enqueue(m); + } // _queue.process(_storeContext, new QueueEntry(_queue, msg), false); _queueMBean.viewMessageContent(id); try @@ -350,7 +354,7 @@ public class AMQQueueMBeanTest extends TestCase contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1)); - IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession); + IncomingMessage msg = new IncomingMessage(publish); msg.setContentHeaderBody(contentHeaderBody); return msg; @@ -360,15 +364,17 @@ public class AMQQueueMBeanTest extends TestCase protected void setUp() throws Exception { super.setUp(); - IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); + + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName()); + IApplicationRegistry applicationRegistry = new TestApplicationRegistry(new ServerConfiguration(configuration)); + ApplicationRegistry.initialise(applicationRegistry ); + + configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName()); + _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); _messageStore = _virtualHost.getMessageStore(); - _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, - null, - new LinkedList<RequiredDeliveryException>() - ); - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost, null); _queueMBean = new AMQQueueMBean(_queue); @@ -391,7 +397,8 @@ public class AMQQueueMBeanTest extends TestCase currentMessage.enqueue(qs); // route header - currentMessage.routingComplete(_messageStore, new MessageHandleFactory()); + MessageMetaData mmd = currentMessage.headersReceived(); + currentMessage.setStoredMessage(_messageStore.addMessage(mmd)); // Add the body so we have somthing to test later currentMessage.addContentBodyFrame( @@ -400,7 +407,12 @@ public class AMQQueueMBeanTest extends TestCase .convertToContentChunk( new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE), MESSAGE_SIZE))); - currentMessage.deliverToQueues(); + + AMQMessage m = new AMQMessage(currentMessage.getStoredMessage()); + for(AMQQueue q : currentMessage.getDestinationQueues()) + { + q.enqueue(m); + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 79f7d75aa9..d64e533f72 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -28,7 +28,10 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -39,15 +42,9 @@ import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.util.NullApplicationRegistry; import java.util.ArrayList; -import java.util.LinkedList; import java.util.Set; -import java.util.Collections; /** * Tests that acknowledgements are handled correctly. @@ -62,8 +59,6 @@ public class AckTest extends TestCase private TestMemoryMessageStore _messageStore; - private StoreContext _storeContext = new StoreContext(); - private AMQChannel _channel; private AMQQueue _queue; @@ -99,11 +94,7 @@ public class AckTest extends TestCase private void publishMessages(int count, boolean persistent) throws AMQException { - TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, - new LinkedList<RequiredDeliveryException>() - ); _queue.registerSubscription(_subscription,false); - MessageHandleFactory factory = new MessageHandleFactory(); for (int i = 1; i <= count; i++) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) @@ -136,31 +127,50 @@ public class AckTest extends TestCase return new AMQShortString("rk"); } }; - IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession); + final IncomingMessage msg = new IncomingMessage(publishBody); //IncomingMessage msg2 = null; + BasicContentHeaderProperties b = new BasicContentHeaderProperties(); + ContentHeaderBody cb = new ContentHeaderBody(); + cb.properties = b; + if (persistent) { - BasicContentHeaderProperties b = new BasicContentHeaderProperties(); //This is DeliveryMode.PERSISTENT b.setDeliveryMode((byte) 2); - ContentHeaderBody cb = new ContentHeaderBody(); - cb.properties = b; - msg.setContentHeaderBody(cb); - } - else - { - msg.setContentHeaderBody(new ContentHeaderBody()); } + + msg.setContentHeaderBody(cb); + // we increment the reference here since we are not delivering the messaging to any queues, which is where // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_messageStore, factory); + MessageMetaData mmd = msg.headersReceived(); + msg.setStoredMessage(_messageStore.addMessage(mmd)); if(msg.allContentReceived()) { - msg.deliverToQueues(); + ServerTransaction txn = new AutoCommitTransaction(_messageStore); + txn.enqueue(_queue, msg, new ServerTransaction.Action() { + public void postCommit() + { + try + { + _queue.enqueue(new AMQMessage(msg.getStoredMessage())); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + + public void onRollback() + { + //To change body of implemented methods use File | Settings | File Templates. + } + }); + } // we manually send the message to the subscription //_subscription.send(new QueueEntry(_queue,msg), _queue); @@ -178,8 +188,7 @@ public class AckTest extends TestCase publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); + assertEquals("",msgCount,map.size()); Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -191,8 +200,6 @@ public class AckTest extends TestCase assertTrue(unackedMsg.getQueue() == _queue); } - assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); } /** @@ -207,8 +214,8 @@ public class AckTest extends TestCase UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); - assertTrue(_messageStore.getContentBodyMap().size() == 0); + assertTrue(_messageStore.getMessageCount() == 0); + } @@ -224,8 +231,8 @@ public class AckTest extends TestCase UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); - assertTrue(_messageStore.getContentBodyMap().size() == 0); + assertTrue(_messageStore.getMessageCount() == 0); + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java index 355ba6a362..7000df157e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java @@ -20,25 +20,18 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.message.AMQMessage; public class MockAMQMessage extends AMQMessage { public MockAMQMessage(long messageId) throws AMQException { - super(new MockAMQMessageHandle(messageId) , - (StoreContext)null, - (MessagePublishInfo)new MockMessagePublishInfo()); + super(new MockStoredMessage(messageId)); } - protected MockAMQMessage(AMQMessage msg) - throws AMQException - { - super(msg); - } + @Override @@ -46,4 +39,5 @@ public class MockAMQMessage extends AMQMessage { return 0l; } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java deleted file mode 100644 index bdb0707c27..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.store.StoreContext; - -public class MockAMQMessageHandle extends InMemoryMessageHandle -{ - public MockAMQMessageHandle(final Long messageId) - { - super(messageId); - } - - @Override - public long getBodySize(StoreContext store) - { - return 0l; - } -} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index a131c2a465..7c1f728664 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -23,23 +23,19 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.AMQException; -import org.apache.commons.configuration.Configuration; import java.util.List; import java.util.Set; import java.util.Map; -import java.util.HashMap; -import java.util.LinkedList; public class MockAMQQueue implements AMQQueue { @@ -47,6 +43,10 @@ public class MockAMQQueue implements AMQQueue private AMQShortString _name; private VirtualHost _virtualhost; + private PrincipalHolder _principalHolder; + + private Object _exclusiveOwner; + public MockAMQQueue(String name) { _name = new AMQShortString(name); @@ -57,6 +57,11 @@ public class MockAMQQueue implements AMQQueue return _name; } + public void setNoLocal(boolean b) + { + + } + public boolean isDurable() { return false; //To change body of implemented methods use File | Settings | File Templates. @@ -162,17 +167,22 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException + public QueueEntry enqueue(ServerMessage message) throws AMQException { return null; //To change body of implemented methods use File | Settings | File Templates. } - public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException + public void requeue(QueueEntry entry) { //To change body of implemented methods use File | Settings | File Templates. } - public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException + public void requeue(QueueEntryImpl storeContext, Subscription subscription) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void dequeue(QueueEntry entry) { //To change body of implemented methods use File | Settings | File Templates. } @@ -211,23 +221,23 @@ public class MockAMQQueue implements AMQQueue { return null; //To change body of implemented methods use File | Settings | File Templates. } - + public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition) { return null; } - public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext) + public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext) { //To change body of implemented methods use File | Settings | File Templates. } - public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext) + public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext) { //To change body of implemented methods use File | Settings | File Templates. } - public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext) + public void removeMessagesFromQueue(long fromMessageId, long toMessageId) { //To change body of implemented methods use File | Settings | File Templates. } @@ -286,16 +296,16 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - public void deleteMessageFromTop(StoreContext storeContext) throws AMQException + public void deleteMessageFromTop() { //To change body of implemented methods use File | Settings | File Templates. } - public long clearQueue(StoreContext storeContext) throws AMQException + public long clearQueue() { return 0; //To change body of implemented methods use File | Settings | File Templates. } - + public void checkMessageStatus() throws AMQException { @@ -327,8 +337,28 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } + public boolean isExclusive() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public Exchange getAlternateExchange() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setAlternateExchange(Exchange exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Map<String, Object> getArguments() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public void checkCapacity(AMQChannel channel) - { + { } public ManagedObject getManagedObject() @@ -343,7 +373,7 @@ public class MockAMQQueue implements AMQQueue public void setMinimumAlertRepeatGap(long value) { - + } public long getCapacity() @@ -368,7 +398,32 @@ public class MockAMQQueue implements AMQQueue public void configure(QueueConfiguration config) { - + + } + + public PrincipalHolder getPrincipalHolder() + { + return _principalHolder; } + public void setPrincipalHolder(PrincipalHolder principalHolder) + { + _principalHolder = principalHolder; + } + + public Object getExclusiveOwner() + { + return _exclusiveOwner; + } + + public void setExclusiveOwner(Object exclusiveOwner) + { + _exclusiveOwner = exclusiveOwner; + } + + + public String getResourceName() + { + return _name.toString(); + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 37f91e7464..3f74cb973b 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -21,8 +21,9 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.AMQMessage; public class MockQueueEntry implements QueueEntry { @@ -44,14 +45,14 @@ public class MockQueueEntry implements QueueEntry return false; } - public void addStateChangeListener(StateChangeListener listener) + public boolean isAcquiredBy(Subscription subscription) { - + return false; } - public String debugIdentity() + public void addStateChangeListener(StateChangeListener listener) { - return null; + } public boolean delete() @@ -59,17 +60,22 @@ public class MockQueueEntry implements QueueEntry return false; } - public void dequeue(StoreContext storeContext) throws FailedDequeueException + public void dequeue() + { + + } + + public void discard() { } - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException + public void routeToAlternate() { } - public void dispose(StoreContext storeContext) throws MessageCleanupException + public void dispose() { } @@ -119,70 +125,95 @@ public class MockQueueEntry implements QueueEntry return false; } - + public boolean isQueueDeleted() { return false; } - + public boolean isRejectedBy(Subscription subscription) { return false; } - + public void reject() { } - + public void reject(Subscription subscription) { } - + public void release() { } - + public boolean releaseButRetain() + { + return false; + } + + public boolean removeStateChangeListener(StateChangeListener listener) { return false; } - - public void requeue(StoreContext storeContext) throws AMQException + + public void requeue() { } - + public void requeue(Subscription subscription) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void setDeliveredToSubscription() { } - - public void setRedelivered(boolean b) + + public void setRedelivered() + { + + + } + + public AMQMessageHeader getMessageHeader() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public boolean isPersistent() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + public boolean isRedelivered() + { + return false; } - + public int compareTo(QueueEntry o) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java new file mode 100755 index 0000000000..7dc491de4d --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java @@ -0,0 +1,92 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; + +import java.nio.ByteBuffer; + +public class MockStoredMessage implements StoredMessage<MessageMetaData> +{ + private long _messageId; + private MessageMetaData _metaData; + private final ByteBuffer _content; + + + public MockStoredMessage(long messageId) + { + this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60)); + } + + public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb) + { + _messageId = messageId; + _metaData = new MessageMetaData(info, chb, 0); + _content = ByteBuffer.allocate(_metaData.getContentSize()); + + } + + public MessageMetaData getMetaData() + { + return _metaData; + } + + public long getMessageNumber() + { + return _messageId; + } + + public void addContent(int offsetInMessage, ByteBuffer src) + { + src = src.duplicate(); + ByteBuffer dst = _content.duplicate(); + dst.position(offsetInMessage); + dst.put(src); + } + + public int getContent(int offset, ByteBuffer dst) + { + ByteBuffer src = _content.duplicate(); + src.position(offset); + src = src.slice(); + if(dst.remaining() < src.limit()) + { + src.limit(dst.remaining()); + } + dst.put(src); + return src.limit(); + } + + public TransactionLog.StoreFuture flushToStore() + { + return MessageStore.IMMEDIATE_FUTURE; + } + + public void remove() + { + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 111e94f9bf..8c6574095b 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -1,6 +1,6 @@ package org.apache.qpid.server.queue; /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,16 +8,16 @@ package org.apache.qpid.server.queue; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ @@ -31,21 +31,21 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionImpl; -import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; public class SimpleAMQQueueTest extends TestCase { @@ -59,7 +59,7 @@ public class SimpleAMQQueueTest extends TestCase protected DirectExchange _exchange = new DirectExchange(); protected MockSubscription _subscription = new MockSubscription(); protected FieldTable _arguments = null; - + MessagePublishInfo info = new MessagePublishInfo() { @@ -88,7 +88,7 @@ public class SimpleAMQQueueTest extends TestCase return null; } }; - + @Override protected void setUp() throws Exception { @@ -97,7 +97,7 @@ public class SimpleAMQQueueTest extends TestCase ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(); PropertiesConfiguration env = new PropertiesConfiguration(); - _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store); + _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), env), _store); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments); @@ -119,51 +119,51 @@ public class SimpleAMQQueueTest extends TestCase } catch (IllegalArgumentException e) { - assertTrue("Exception was not about missing name", + assertTrue("Exception was not about missing name", e.getMessage().contains("name")); } - + try { _queue = new SimpleAMQQueue(_qname, false, _owner, false, null); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) { - assertTrue("Exception was not about missing vhost", + assertTrue("Exception was not about missing vhost", e.getMessage().contains("Host")); } - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments); assertNotNull("Queue was not created", _queue); } - + public void testGetVirtualHost() { assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost()); } - + public void testBinding() { try { _queue.bind(_exchange, _routingKey, null); - assertTrue("Routing key was not bound", + assertTrue("Routing key was not bound", _exchange.getBindings().containsKey(_routingKey)); - assertEquals("Queue was not bound to key", + assertEquals("Queue was not bound to key", _exchange.getBindings().get(_routingKey).get(0), _queue); - assertEquals("Exchange binding count", 1, + assertEquals("Exchange binding count", 1, _queue.getExchangeBindings().size()); - assertEquals("Wrong exchange bound", _routingKey, + assertEquals("Wrong exchange bound", _routingKey, _queue.getExchangeBindings().get(0).getRoutingKey()); - assertEquals("Wrong exchange bound", _exchange, + assertEquals("Wrong exchange bound", _exchange, _queue.getExchangeBindings().get(0).getExchange()); - + _queue.unBind(_exchange, _routingKey, null); - assertFalse("Routing key was still bound", + assertFalse("Routing key was still bound", _exchange.getBindings().containsKey(_routingKey)); - assertNull("Routing key was not empty", + assertNull("Routing key was not empty", _exchange.getBindings().get(_routingKey)); } catch (AMQException e) @@ -171,61 +171,61 @@ public class SimpleAMQQueueTest extends TestCase assertNull("Unexpected exception", e); } } - + public void testSubscription() throws AMQException { // Check adding a subscription adds it to the queue _queue.registerSubscription(_subscription, false); - assertEquals("Subscription did not get queue", _queue, + assertEquals("Subscription did not get queue", _queue, _subscription.getQueue()); - assertEquals("Queue does not have consumer", 1, + assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, + assertEquals("Queue does not have active consumer", 1, _queue.getActiveConsumerCount()); - + // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); - assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); - + _queue.enqueue(messageA); + assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + // Check removing the subscription removes it's information from the queue _queue.unregisterSubscription(_subscription); assertTrue("Subscription still had queue", _subscription.isClosed()); assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); - assertFalse("Queue still has active consumer", + assertFalse("Queue still has active consumer", 1 == _queue.getActiveConsumerCount()); - + AMQMessage messageB = createMessage(new Long (25)); - _queue.enqueue(null, messageB); - QueueEntry entry = _subscription.getLastSeenEntry(); - assertNull(entry); + _queue.enqueue(messageB); + assertNull(_subscription.getQueueContext()); + } - + public void testQueueNoSubscriber() throws AMQException, InterruptedException { AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); + _queue.enqueue(messageA); _queue.registerSubscription(_subscription, false); Thread.sleep(150); - assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); + assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); } public void testExclusiveConsumer() throws AMQException { // Check adding an exclusive subscription adds it to the queue _queue.registerSubscription(_subscription, true); - assertEquals("Subscription did not get queue", _queue, + assertEquals("Subscription did not get queue", _queue, _subscription.getQueue()); - assertEquals("Queue does not have consumer", 1, + assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, + assertEquals("Queue does not have active consumer", 1, _queue.getActiveConsumerCount()); // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); - assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); - + _queue.enqueue(messageA); + assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + // Check we cannot add a second subscriber to the queue Subscription subB = new MockSubscription(); Exception ex = null; @@ -235,12 +235,12 @@ public class SimpleAMQQueueTest extends TestCase } catch (AMQException e) { - ex = e; + ex = e; } assertNotNull(ex); assertTrue(ex instanceof AMQException); - // Check we cannot add an exclusive subscriber to a queue with an + // Check we cannot add an exclusive subscriber to a queue with an // existing subscription _queue.unregisterSubscription(_subscription); _queue.registerSubscription(_subscription, false); @@ -250,35 +250,35 @@ public class SimpleAMQQueueTest extends TestCase } catch (AMQException e) { - ex = e; + ex = e; } assertNotNull(ex); } - - public void testAutoDeleteQueue() throws Exception + + public void testAutoDeleteQueue() throws Exception { _queue.stop(); - _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost); + _queue = new SimpleAMQQueue(_qname, false, null, true, _virtualHost); _queue.registerSubscription(_subscription, false); AMQMessage message = createMessage(new Long(25)); - _queue.enqueue(null, message); + _queue.enqueue(message); _queue.unregisterSubscription(_subscription); assertTrue("Queue was not deleted when subscription was removed", _queue.isDeleted()); } - + public void testResend() throws Exception { _queue.registerSubscription(_subscription, false); Long id = new Long(26); AMQMessage message = createMessage(id); - _queue.enqueue(null, message); - QueueEntry entry = _subscription.getLastSeenEntry(); - entry.setRedelivered(true); + _queue.enqueue(message); + QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry(); + entry.setRedelivered(); _queue.resend(entry, _subscription); - + } - + public void testGetFirstMessageId() throws Exception { // Create message @@ -286,7 +286,7 @@ public class SimpleAMQQueueTest extends TestCase AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + _queue.enqueue(message); // Get message id Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); @@ -302,7 +302,7 @@ public class SimpleAMQQueueTest extends TestCase Long messageId = new Long(i); AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + _queue.enqueue(message); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5); @@ -323,7 +323,7 @@ public class SimpleAMQQueueTest extends TestCase Long messageId = new Long(i); AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + _queue.enqueue(message); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); @@ -335,7 +335,7 @@ public class SimpleAMQQueueTest extends TestCase assertEquals("Message ID was wrong", messageId, msgids.get(i)); } } - + public void testGetMessagesRangeOnTheQueue() throws Exception { for (int i = 1 ; i <= 10; i++) @@ -344,142 +344,138 @@ public class SimpleAMQQueueTest extends TestCase Long messageId = new Long(i); AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + _queue.enqueue(message); } - + // Get non-existent 0th QueueEntry & check returned list was empty // (the position parameters in this method are indexed from 1) List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0); assertTrue(entries.size() == 0); - + // Check that when 'from' is 0 it is ignored and the range continues from 1 entries = _queue.getMessagesRangeOnTheQueue(0, 2); assertTrue(entries.size() == 2); - long msgID = entries.get(0).getMessage().getMessageId(); + long msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 1L); - msgID = entries.get(1).getMessage().getMessageId(); + msgID = entries.get(1).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 2L); // Check that when 'from' is greater than 'to' the returned list is empty entries = _queue.getMessagesRangeOnTheQueue(5, 4); assertTrue(entries.size() == 0); - - // Get first QueueEntry & check id + + // Get first QueueEntry & check id entries = _queue.getMessagesRangeOnTheQueue(1, 1); assertTrue(entries.size() == 1); - msgID = entries.get(0).getMessage().getMessageId(); + msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 1L); - + // Get 5th,6th,7th entries and check id's entries = _queue.getMessagesRangeOnTheQueue(5, 7); assertTrue(entries.size() == 3); - msgID = entries.get(0).getMessage().getMessageId(); + msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 5L); - msgID = entries.get(1).getMessage().getMessageId(); + msgID = entries.get(1).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 6L); - msgID = entries.get(2).getMessage().getMessageId(); + msgID = entries.get(2).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 7L); - + // Get 10th QueueEntry & check id entries = _queue.getMessagesRangeOnTheQueue(10, 10); assertTrue(entries.size() == 1); - msgID = entries.get(0).getMessage().getMessageId(); + msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 10L); - + // Get non-existent 11th QueueEntry & check returned set was empty entries = _queue.getMessagesRangeOnTheQueue(11, 11); assertTrue(entries.size() == 0); - + // Get 9th,10th, and non-existent 11th entries & check result is of size 2 with correct IDs entries = _queue.getMessagesRangeOnTheQueue(9, 11); assertTrue(entries.size() == 2); - msgID = entries.get(0).getMessage().getMessageId(); + msgID = entries.get(0).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 9L); - msgID = entries.get(1).getMessage().getMessageId(); + msgID = entries.get(1).getMessage().getMessageNumber(); assertEquals("Message ID was wrong", msgID, 10L); } - + public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException { // Create IncomingMessage and nondurable queue - NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); - IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null); + final IncomingMessage msg = new IncomingMessage(info); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); msg.setContentHeaderBody(contentHeaderBody); - ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); - + + final ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + // Send persistent message + qs.add(_queue); - msg.enqueue(qs); - msg.routingComplete(_store, new MessageHandleFactory()); - _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); - + MessageMetaData metaData = msg.headersReceived(); + StoredMessage handle = _store.addMessage(metaData); + msg.setStoredMessage(handle); + + + ServerTransaction txn = new AutoCommitTransaction(_store); + + txn.enqueue(qs, msg, new ServerTransaction.Action() + { + public void postCommit() + { + msg.enqueue(qs); + } + + public void onRollback() + { + } + }); + + + // Check that it is enqueued AMQQueue data = _store.getMessages().get(1L); - assertNotNull(data); - + assertNull(data); + // Dequeue message MockQueueEntry entry = new MockQueueEntry(); - AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext); - + AMQMessage amqmsg = new AMQMessage(handle); + entry.setMessage(amqmsg); - _queue.dequeue(null, entry); - + _queue.dequeue(entry); + // Check that it is dequeued data = _store.getMessages().get(1L); assertNull(data); } - // FIXME: move this to somewhere useful - private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) - { - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - null, - false); - try - { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), - publishBody, - new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); - } - catch (AMQException e) - { - // won't happen - } - - - return amqMessageHandle; - } - public class TestMessage extends AMQMessage { private final long _tag; private int _count; - TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) + TestMessage(long tag, long messageId, MessagePublishInfo publishBody) throws AMQException { - super(createMessageHandle(messageId, publishBody), storeContext, publishBody); + this(tag, messageId, publishBody, new ContentHeaderBody(1, 1, new BasicContentHeaderProperties(), 0)); + + } + TestMessage(long tag, long messageId, MessagePublishInfo publishBody, ContentHeaderBody chb) + throws AMQException + { + super(new MockStoredMessage(messageId, publishBody, chb)); _tag = tag; } - public boolean incrementReference() { _count++; return true; } - public void decrementReference(StoreContext context) + public void decrementReference() { _count--; } @@ -489,10 +485,10 @@ public class SimpleAMQQueueTest extends TestCase assertEquals("Wrong count for message with tag " + _tag, expected, _count); } } - + protected AMQMessage createMessage(Long id) throws AMQException { - AMQMessage messageA = new TestMessage(id, id, info, new StoreContext()); + AMQMessage messageA = new TestMessage(id, id, info); return messageA; } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index 02de09c91f..ba94af5936 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -27,8 +27,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.AMQException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class SimpleAMQQueueThreadPoolTest extends TestCase { @@ -47,7 +45,7 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount()); - + queue.stop(); assertEquals("References not decreased", initialCount , ReferenceCountingExecutorService.getInstance().getReferenceCount()); @@ -55,6 +53,6 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase finally { ApplicationRegistry.remove(); - } + } } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java index 8102360ce0..44f9861e8d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java @@ -57,11 +57,11 @@ public class ACLManagerTest extends TestCase BufferedWriter out = new BufferedWriter(new FileWriter(tmpFile)); out.write("<security><queueDenier>notyet</queueDenier><exchangeDenier>yes</exchangeDenier></security>"); out.close(); - + _conf = new SecurityConfiguration(new XMLConfiguration(tmpFile)); - + // Create ACLManager - + _pluginManager = new MockPluginManager(""); _authzManager = new ACLManager(_conf, _pluginManager); @@ -79,15 +79,15 @@ public class ACLManagerTest extends TestCase // Correctly Close the AR we created ApplicationRegistry.remove(); super.tearDown(); - } - + } + public void testACLManagerConfigurationPluginManager() throws Exception { AMQQueue queue = new MockAMQQueue("notyet"); AMQQueue otherQueue = new MockAMQQueue("other"); - + assertFalse(_authzManager.authoriseDelete(_session, queue)); - + // This should only be denied if the config hasn't been correctly passed in assertTrue(_authzManager.authoriseDelete(_session, otherQueue)); assertTrue(_authzManager.authorisePurge(_session, queue)); @@ -96,11 +96,11 @@ public class ACLManagerTest extends TestCase public void testACLManagerConfigurationPluginManagerACLPlugin() throws ConfigurationException { _authzManager = new ACLManager(_conf, _pluginManager, ExchangeDenier.FACTORY); - + Exchange exchange = null; assertFalse(_authzManager.authoriseDelete(_session, exchange)); } - + public void testConfigurePlugins() throws ConfigurationException { Configuration hostConfig = new PropertiesConfiguration(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java index 317dee2b47..37a0fd7fc3 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java @@ -14,16 +14,16 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.server.security.access; import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.security.access.plugins.AllowAll; +import org.apache.qpid.server.security.PrincipalHolder; public class ExchangeDenier extends AllowAll { @@ -40,9 +40,9 @@ public class ExchangeDenier extends AllowAll return new ExchangeDenier(); } }; - + @Override - public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange) + public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange) { return AuthzResult.DENIED; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java index 88100dc25f..73e9dac775 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java @@ -27,14 +27,13 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.amqp_0_9.ExchangeDeclareBodyImpl; -import org.apache.qpid.framing.amqp_0_9.QueueDeclareBodyImpl; import org.apache.qpid.framing.amqp_8_0.QueueBindBodyImpl; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; -import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -43,7 +42,7 @@ public class PrincipalPermissionsTest extends TestCase private String _user = "user"; private PrincipalPermissions _perms; - + // Common things that are passed to frame constructors private AMQShortString _queueName = new AMQShortString(this.getClass().getName()+"queue"); private AMQShortString _tempQueueName = new AMQShortString(this.getClass().getName()+"tempqueue"); @@ -65,18 +64,18 @@ public class PrincipalPermissionsTest extends TestCase private AMQQueue _temporaryQueue; private Boolean _temporary = false; private Boolean _ownQueue = false; - + @Override public void setUp() { //Highlight that this test will cause a new AR to be created - ApplicationRegistry.getInstance(); + ApplicationRegistry.getInstance(); _perms = new PrincipalPermissions(_user); - try + try { PropertiesConfiguration env = new PropertiesConfiguration(); - _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env)); + _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration("test", env)); _exchange = DirectExchange.TYPE.newInstance(_virtualHost, _exchangeName, _durable, _ticket, _autoDelete); _queue = AMQQueueFactory.createAMQQueueImpl(_queueName, false, _owner , false, _virtualHost, _arguments); _temporaryQueue = AMQQueueFactory.createAMQQueueImpl(_tempQueueName, false, _owner , true, _virtualHost, _arguments); @@ -132,27 +131,29 @@ public class PrincipalPermissionsTest extends TestCase _perms.grant(Permission.CREATEQUEUE, grantArgs); assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CREATEQUEUE, authArgs)); } - + // FIXME disabled, this fails due to grant putting the grant into the wrong map QPID-1598 public void disableTestExchangeCreate() { - ExchangeDeclareBodyImpl exchangeDeclare = + ExchangeDeclareBodyImpl exchangeDeclare = new ExchangeDeclareBodyImpl(_ticket, _exchangeName, _exchangeType, _passive, _durable, _autoDelete, _internal, _nowait, _arguments); Object[] authArgs = new Object[]{exchangeDeclare}; Object[] grantArgs = new Object[]{_exchangeName, _exchangeType}; - + assertEquals(AuthzResult.DENIED, _perms.authorise(Permission.CREATEEXCHANGE, authArgs)); _perms.grant(Permission.CREATEEXCHANGE, grantArgs); assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CREATEEXCHANGE, authArgs)); } - + public void testConsume() { Object[] authArgs = new Object[]{_queue}; Object[] grantArgs = new Object[]{_queueName, _ownQueue}; - assertEquals(AuthzResult.DENIED,_perms.authorise(Permission.CONSUME, authArgs)); + /* FIXME: This throws a null pointer exception QPID-1599 + * assertFalse(_perms.authorise(Permission.CONSUME, authArgs)); + */ _perms.grant(Permission.CONSUME, grantArgs); assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CONSUME, authArgs)); } @@ -166,7 +167,7 @@ public class PrincipalPermissionsTest extends TestCase _perms.grant(Permission.PUBLISH, grantArgs); assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.PUBLISH, authArgs)); } - + public void testVhostAccess() { //Tests that granting a user Virtualhost level access allows all authorisation requests diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java index 5497f0ae44..5b76bf7532 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java @@ -14,21 +14,20 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.server.security.access; import org.apache.commons.configuration.Configuration; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; import org.apache.qpid.server.security.access.plugins.AllowAll; +import org.apache.qpid.server.security.PrincipalHolder; public class QueueDenier extends AllowAll { - + public static final ACLPluginFactory FACTORY = new ACLPluginFactory() { public boolean supportsTag(String name) @@ -43,18 +42,18 @@ public class QueueDenier extends AllowAll return plugin; } }; - + private String _queueName = ""; - + @Override - public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue) + public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue) { if (!(queue.getName().toString().equals(_queueName))) { return AuthzResult.ALLOWED; - } - else + } + else { return AuthzResult.DENIED; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 5802655cfc..5169676dae 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -29,17 +29,13 @@ import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.AMQPriorityQueue; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.queue.ExchangeBinding; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.queue.*; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.ContentHeaderBody; @@ -103,7 +99,7 @@ public class MessageStoreTest extends TestCase try { - _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), configuration)); + _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), configuration)); ApplicationRegistry.getInstance().getVirtualHostRegistry().registerVirtualHost(_virtualHost); } catch (Exception e) @@ -169,7 +165,7 @@ public class MessageStoreTest extends TestCase Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true); bindAllTopicQueuesToExchange(topicExchange, topicRouting); - //Send Message To NonDurable direct Exchange = persistent + //Send Message To NonDurable direct Exchange = persistent sendMessageOnExchange(nonDurableExchange, directRouting, true); // and non-persistent sendMessageOnExchange(nonDurableExchange, directRouting, false); @@ -344,22 +340,11 @@ public class MessageStoreTest extends TestCase MessagePublishInfo messageInfo = new TestMessagePublishInfo(directExchange, false, false, routingKey); - IncomingMessage currentMessage = null; + final IncomingMessage currentMessage; - try - { - currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(), - messageInfo, - new NonTransactionalContext(_virtualHost.getMessageStore(), - new StoreContext(), null, null), - new InternalTestProtocolSession(_virtualHost)); - } - catch (AMQException e) - { - fail(e.getMessage()); - } - currentMessage.setMessageStore(_virtualHost.getMessageStore()); + currentMessage = new IncomingMessage(messageInfo); + currentMessage.setExchange(directExchange); ContentHeaderBody headerBody = new ContentHeaderBody(); @@ -379,35 +364,42 @@ public class MessageStoreTest extends TestCase currentMessage.setExpiration(); - try - { - currentMessage.route(); - } - catch (AMQException e) - { - fail(e.getMessage()); - } + MessageMetaData mmd = currentMessage.headersReceived(); + currentMessage.setStoredMessage(_virtualHost.getMessageStore().addMessage(mmd)); + + currentMessage.route(); + - try - { - currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory()); - } - catch (AMQException e) - { - fail(e.getMessage()); - } // check and deliver if header says body length is zero if (currentMessage.allContentReceived()) { - try - { - currentMessage.deliverToQueues(); - } - catch (AMQException e) - { - fail(e.getMessage()); - } + // TODO Deliver to queues + ServerTransaction trans = new AutoCommitTransaction(_virtualHost.getMessageStore()); + final List<AMQQueue> destinationQueues = currentMessage.getDestinationQueues(); + trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() { + public void postCommit() + { + try + { + AMQMessage message = new AMQMessage(currentMessage.getStoredMessage()); + + for(AMQQueue queue : destinationQueues) + { + QueueEntry entry = queue.enqueue(message); + } + } + catch (AMQException e) + { + e.printStackTrace(); + } + } + + public void onRollback() + { + //To change body of implemented methods use File | Settings | File Templates. + } + }); } } @@ -496,14 +488,7 @@ public class MessageStoreTest extends TestCase fail(e.getMessage()); } - try - { - _virtualHost.getQueueRegistry().registerQueue(queue); - } - catch (AMQException e) - { - fail(e.getMessage()); - } + _virtualHost.getQueueRegistry().registerQueue(queue); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index fd6789f5ce..9c12242a07 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -25,14 +25,15 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.logging.LogSubject; import java.util.List; import java.util.concurrent.atomic.AtomicLong; +import java.nio.ByteBuffer; /** * A message store that does nothing. Designed to be used in tests that do not want to use any message store @@ -45,8 +46,19 @@ public class SkeletonMessageStore implements MessageStore public void configure(String base, Configuration config) throws Exception { } - - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + Configuration config, + LogSubject logSubject) throws Exception + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + Configuration config, + LogSubject logSubject) throws Exception { //To change body of implemented methods use File | Settings | File Templates. } @@ -55,7 +67,12 @@ public class SkeletonMessageStore implements MessageStore { } - public void removeMessage(StoreContext s, Long messageId) + public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void removeMessage(Long messageId) { } @@ -85,24 +102,10 @@ public class SkeletonMessageStore implements MessageStore public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException { - } - - public void beginTran(StoreContext s) throws AMQException - { } - public boolean inTran(StoreContext sc) - { - return false; - } - public void commitTran(StoreContext storeContext) throws AMQException - { - } - public void abortTran(StoreContext storeContext) throws AMQException - { - } public List<AMQQueue> createQueues() throws AMQException { @@ -114,22 +117,26 @@ public class SkeletonMessageStore implements MessageStore return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + public void storeContentBodyChunk( + Long messageId, + int index, + ContentChunk contentBody, + boolean lastContentBody) throws AMQException { } - public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException + public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException { } - public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException + public MessageMetaData getMessageMetaData(Long messageId) throws AMQException { return null; } - public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException + public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException { return null; } @@ -139,18 +146,75 @@ public class SkeletonMessageStore implements MessageStore return false; } - public void removeQueue(final AMQQueue queue) throws AMQException + public void storeMessageHeader(Long messageNumber, ServerMessage message) { + //To change body of implemented methods use File | Settings | File Templates. + } + public void storeContent(Long messageNumber, long offset, ByteBuffer body) + { + //To change body of implemented methods use File | Settings | File Templates. } - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public ServerMessage getMessage(Long messageNumber) { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void removeQueue(final AMQQueue queue) throws AMQException + { + + } + public void configureTransactionLog(String name, + TransactionLogRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception + { + //To change body of implemented methods use File | Settings | File Templates. } - public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public Transaction newTransaction() { + return new Transaction() + { + + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void commitTran() throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + public StoreFuture commitTranAsync() throws AMQException + { + return new StoreFuture() + { + public boolean isComplete() + { + return true; + } + + public void waitForCompletion() + { + + } + }; + } + + public void abortTran() throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + }; } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java index 4e48435962..4dea13d391 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java @@ -20,32 +20,79 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.framing.abstraction.ContentChunk; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.List; +import java.nio.ByteBuffer; /** * Adds some extra methods to the memory message store for testing purposes. */ public class TestMemoryMessageStore extends MemoryMessageStore { + private AtomicInteger _messageCount = new AtomicInteger(0); + + public TestMemoryMessageStore() { - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); } - public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() + @Override + public StoredMessage addMessage(StorableMessageMetaData metaData) { - return _metaDataMap; + return new TestableStoredMessage(super.addMessage(metaData)); } - public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() + public int getMessageCount() { - return _contentBodyMap; + return _messageCount.get(); + } + + private class TestableStoredMessage implements StoredMessage + { + private final StoredMessage _storedMessage; + + public TestableStoredMessage(StoredMessage storedMessage) + { + _messageCount.incrementAndGet(); + _storedMessage = storedMessage; + } + + public StorableMessageMetaData getMetaData() + { + return _storedMessage.getMetaData(); + } + + public long getMessageNumber() + { + return _storedMessage.getMessageNumber(); + } + + public void addContent(int offsetInMessage, ByteBuffer src) + { + _storedMessage.addContent(offsetInMessage, src); + } + + public int getContent(int offsetInMessage, ByteBuffer dst) + { + return _storedMessage.getContent(offsetInMessage, dst); + } + + public StoreFuture flushToStore() + { + return _storedMessage.flushToStore(); + } + + public void remove() + { + _storedMessage.remove(); + _messageCount.decrementAndGet(); + } + } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index 2346660d25..c5b1ba7868 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -26,9 +26,8 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.AMQMessageHandle; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageMetaData; /** * Tests that reference counting works correctly with AMQMessage and the message store @@ -37,13 +36,12 @@ public class TestReferenceCounting extends TestCase { private TestMemoryMessageStore _store; - private StoreContext _storeContext = new StoreContext(); - protected void setUp() throws Exception { super.setUp(); _store = new TestMemoryMessageStore(); + } /** @@ -83,11 +81,12 @@ public class TestReferenceCounting extends TestCase }; - final long messageId = _store.getNewMessageId(); - AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb); - AMQMessage message = new AMQMessage(messageHandle, - _storeContext,info); + + MessageMetaData mmd = new MessageMetaData(info, chb, 0); + StoredMessage storedMessage = _store.addMessage(mmd); + + + AMQMessage message = new AMQMessage(storedMessage); message = message.takeReference(); @@ -95,9 +94,9 @@ public class TestReferenceCounting extends TestCase // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - assertEquals(1, _store.getMessageMetaDataMap().size()); - message.decrementReference(_storeContext); - assertEquals(1, _store.getMessageMetaDataMap().size()); + assertEquals(1, _store.getMessageCount()); + message.decrementReference(); + assertEquals(1, _store.getMessageCount()); } private ContentHeaderBody createPersistentContentHeader() @@ -141,25 +140,24 @@ public class TestReferenceCounting extends TestCase } }; - final Long messageId = _store.getNewMessageId(); final ContentHeaderBody chb = createPersistentContentHeader(); - AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb); - AMQMessage message = new AMQMessage(messageHandle, - _storeContext, - info); - - + + MessageMetaData mmd = new MessageMetaData(info, chb, 0); + StoredMessage storedMessage = _store.addMessage(mmd); + + AMQMessage message = new AMQMessage(storedMessage); + + message = message.takeReference(); // we call routing complete to set up the handle // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - assertEquals(1, _store.getMessageMetaDataMap().size()); + assertEquals(1, _store.getMessageCount()); message = message.takeReference(); - message.decrementReference(_storeContext); - assertEquals(1, _store.getMessageMetaDataMap().size()); + message.decrementReference(); + assertEquals(1, _store.getMessageCount()); } public static junit.framework.Test suite() diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 9146fe88ae..ab8c1e7c9c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -22,14 +22,15 @@ package org.apache.qpid.server.store; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.framing.abstraction.ContentChunk; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.HashMap; import java.util.List; +import java.nio.ByteBuffer; /** * Adds some extra methods to the memory message store for testing purposes. @@ -39,6 +40,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore MemoryMessageStore _mms = null; private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>(); + private AtomicInteger _messageCount = new AtomicInteger(0); public TestableMemoryMessageStore(MemoryMessageStore mms) { @@ -47,46 +49,111 @@ public class TestableMemoryMessageStore extends MemoryMessageStore public TestableMemoryMessageStore() { - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); + + } + + + + + @Override + public StoredMessage addMessage(StorableMessageMetaData metaData) + { + return new TestableStoredMessage(super.addMessage(metaData)); + } + + public int getMessageCount() + { + return _messageCount.get(); } - public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() + private class TestableTransaction implements Transaction { - if (_mms != null) + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException { - return _mms._metaDataMap; + getMessages().put(messageId, (AMQQueue)queue); } - else + + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException { - return _metaDataMap; + getMessages().remove(messageId); } - } - public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() - { - if (_mms != null) + public void commitTran() throws AMQException { - return _mms._contentBodyMap; } - else + + public StoreFuture commitTranAsync() throws AMQException + { + return new StoreFuture() + { + public boolean isComplete() + { + return true; + } + + public void waitForCompletion() + { + + } + }; + } + + public void abortTran() throws AMQException { - return _contentBodyMap; } - } - - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException - { - getMessages().put(messageId, queue); } - public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + + @Override + public Transaction newTransaction() { - getMessages().remove(messageId); + return new TestableTransaction(); } public HashMap<Long, AMQQueue> getMessages() { return _messages; } + + private class TestableStoredMessage implements StoredMessage + { + private final StoredMessage _storedMessage; + + public TestableStoredMessage(StoredMessage storedMessage) + { + _messageCount.incrementAndGet(); + _storedMessage = storedMessage; + } + + public StorableMessageMetaData getMetaData() + { + return _storedMessage.getMetaData(); + } + + public long getMessageNumber() + { + return _storedMessage.getMessageNumber(); + } + + public void addContent(int offsetInMessage, ByteBuffer src) + { + _storedMessage.addContent(offsetInMessage, src); + } + + public int getContent(int offsetInMessage, ByteBuffer dst) + { + return _storedMessage.getContent(offsetInMessage, dst); + } + + public StoreFuture flushToStore() + { + return _storedMessage.flushToStore(); + } + + public void remove() + { + _storedMessage.remove(); + _messageCount.decrementAndGet(); + } + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index a3274a3a05..97ba143bdf 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -42,11 +42,15 @@ public class MockSubscription implements Subscription private AMQShortString tag = new AMQShortString("mocktag"); private AMQQueue queue = null; private StateListener _listener = null; - private QueueEntry lastSeen = null; + private AMQQueue.Context _queueContext = null; private State _state = State.ACTIVE; private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); private final Lock _stateChangeLock = new ReentrantLock(); + private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); + private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this); + + private static final AtomicLong idGenerator = new AtomicLong(0); // Create a simple ID that increments for ever new Subscription private final long _subscriptionID = idGenerator.getAndIncrement(); @@ -81,14 +85,19 @@ public class MockSubscription implements Subscription return _subscriptionID; } - public QueueEntry getLastSeenEntry() + public AMQQueue.Context getQueueContext() { - return lastSeen; + return _queueContext; } public SubscriptionAcquiredState getOwningState() { - return new QueueEntry.SubscriptionAcquiredState(this); + return _owningState; + } + + public QueueEntry.SubscriptionAssignedState getAssignedState() + { + return _assignedState; } public LogActor getLogActor() @@ -116,6 +125,21 @@ public class MockSubscription implements Subscription return true; } + public void confirmAutoClose() + { + + } + + public void set(String key, Object value) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Object get(String key) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public boolean isAutoClose() { return false; @@ -131,6 +155,16 @@ public class MockSubscription implements Subscription return _closed; } + public boolean acquires() + { + return true; + } + + public boolean seesRequeues() + { + return true; + } + public boolean isSuspended() { return false; @@ -149,25 +183,23 @@ public class MockSubscription implements Subscription { } + public void onDequeue(QueueEntry queueEntry) + { + } + public void restoreCredit(QueueEntry queueEntry) { + //To change body of implemented methods use File | Settings | File Templates. } public void send(QueueEntry msg) throws AMQException { - lastSeen = msg; messages.add(msg); } - public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) + public void setQueueContext(AMQQueue.Context queueContext) { - boolean result = false; - if (expected != null) - { - result = (expected.equals(lastSeen)); - } - lastSeen = newValue; - return result; + _queueContext = queueContext; } public void setQueue(AMQQueue queue, boolean exclusive) @@ -175,6 +207,10 @@ public class MockSubscription implements Subscription this.queue = queue; } + public void setNoLocal(boolean noLocal) + { + } + public void setStateListener(StateListener listener) { this._listener = listener; diff --git a/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java deleted file mode 100644 index 84d3d313d1..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.txn; - -import junit.framework.TestCase; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.store.StoreContext; - -import java.util.LinkedList; -import java.util.NoSuchElementException; - -public class TxnBufferTest extends TestCase -{ - private final LinkedList<MockOp> ops = new LinkedList<MockOp>(); - - public void testCommit() throws AMQException - { - MockStore store = new MockStore(); - - TxnBuffer buffer = new TxnBuffer(); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - //check relative ordering - MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit(); - buffer.enlist(op); - buffer.enlist(op); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - - buffer.commit(null); - - validateOps(); - store.validate(); - } - - public void testRollback() throws AMQException - { - MockStore store = new MockStore(); - - TxnBuffer buffer = new TxnBuffer(); - buffer.enlist(new MockOp().expectRollback()); - buffer.enlist(new MockOp().expectRollback()); - buffer.enlist(new MockOp().expectRollback()); - - buffer.rollback(null); - - validateOps(); - store.validate(); - } - - public void testCommitWithFailureDuringPrepare() throws AMQException - { - MockStore store = new MockStore(); - store.beginTran(null); - - TxnBuffer buffer = new TxnBuffer(); - buffer.enlist(new StoreMessageOperation(store)); - buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); - buffer.enlist(new TxnTester(store)); - buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); - buffer.enlist(new FailedPrepare()); - buffer.enlist(new MockOp()); - - try - { - buffer.commit(null); - } - catch (NoSuchElementException e) - { - - } - - validateOps(); - store.validate(); - } - - public void testCommitWithPersistance() throws AMQException - { - MockStore store = new MockStore(); - store.beginTran(null); - store.expectCommit(); - - TxnBuffer buffer = new TxnBuffer(); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.enlist(new StoreMessageOperation(store)); - buffer.enlist(new TxnTester(store)); - - buffer.commit(null); - validateOps(); - store.validate(); - } - - private void validateOps() - { - for (MockOp op : ops) - { - op.validate(); - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TxnBufferTest.class); - } - - class MockOp implements TxnOp - { - final Object PREPARE = "PREPARE"; - final Object COMMIT = "COMMIT"; - final Object UNDO_PREPARE = "UNDO_PREPARE"; - final Object ROLLBACK = "ROLLBACK"; - - private final LinkedList expected = new LinkedList(); - - MockOp() - { - ops.add(this); - } - - public void prepare(StoreContext context) - { - assertEquals(expected.removeLast(), PREPARE); - } - - public void commit(StoreContext context) - { - assertEquals(expected.removeLast(), COMMIT); - } - - public void undoPrepare() - { - assertEquals(expected.removeLast(), UNDO_PREPARE); - } - - public void rollback(StoreContext context) - { - assertEquals(expected.removeLast(), ROLLBACK); - } - - private MockOp expect(Object optype) - { - expected.addFirst(optype); - return this; - } - - MockOp expectPrepare() - { - return expect(PREPARE); - } - - MockOp expectCommit() - { - return expect(COMMIT); - } - - MockOp expectUndoPrepare() - { - return expect(UNDO_PREPARE); - } - - MockOp expectRollback() - { - return expect(ROLLBACK); - } - - void validate() - { - assertEquals("Expected ops were not all invoked", new LinkedList(), expected); - } - - void clear() - { - expected.clear(); - } - } - - class MockStore extends TestMemoryMessageStore - { - final Object BEGIN = "BEGIN"; - final Object ABORT = "ABORT"; - final Object COMMIT = "COMMIT"; - - private final LinkedList expected = new LinkedList(); - private boolean inTran; - - public void beginTran(StoreContext context) throws AMQException - { - inTran = true; - } - - public void commitTran(StoreContext context) throws AMQException - { - assertEquals(expected.removeLast(), COMMIT); - inTran = false; - } - - public void abortTran(StoreContext context) throws AMQException - { - assertEquals(expected.removeLast(), ABORT); - inTran = false; - } - - public boolean inTran(StoreContext context) - { - return inTran; - } - - private MockStore expect(Object optype) - { - expected.addFirst(optype); - return this; - } - - MockStore expectBegin() - { - return expect(BEGIN); - } - - MockStore expectCommit() - { - return expect(COMMIT); - } - - MockStore expectAbort() - { - return expect(ABORT); - } - - void clear() - { - expected.clear(); - } - - void validate() - { - assertEquals("Expected ops were not all invoked", new LinkedList(), expected); - } - } - - class NullOp implements TxnOp - { - public void prepare(StoreContext context) throws AMQException - { - } - public void commit(StoreContext context) - { - } - public void undoPrepare() - { - } - public void rollback(StoreContext context) - { - } - } - - class FailedPrepare extends NullOp - { - public void prepare() throws AMQException - { - throw new AMQException(null, "Fail!", null); - } - } - - class TxnTester extends NullOp - { - private final MessageStore store; - - private final StoreContext context = new StoreContext(); - - TxnTester(MessageStore store) - { - this.store = store; - } - - public void prepare() throws AMQException - { - assertTrue("Expected prepare to be performed under txn", store.inTran(context)); - } - - public void commit() - { - assertTrue("Expected commit not to be performed under txn", !store.inTran(context)); - } - } - -} diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index f2096df9d1..906c769f9c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -31,7 +31,6 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; @@ -41,12 +40,10 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.util.MockChannel; -import java.security.Principal; public class InternalBrokerBaseCase extends TestCase { @@ -55,7 +52,6 @@ public class InternalBrokerBaseCase extends TestCase protected MockChannel _channel; protected InternalTestProtocolSession _session; protected VirtualHost _virtualHost; - protected StoreContext _storeContext = new StoreContext(); protected AMQQueue _queue; protected AMQShortString QUEUE_NAME; @@ -97,7 +93,7 @@ public class InternalBrokerBaseCase extends TestCase protected void checkStoreContents(int messageCount) { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size()); + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); //The above publish message is sufficiently small not to fit in the header so no Body is required. //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size()); @@ -114,11 +110,7 @@ public class InternalBrokerBaseCase extends TestCase e.printStackTrace(); fail(e.getMessage()); } - catch (ConsumerTagNotUniqueException e) - { - e.printStackTrace(); - fail(e.getMessage()); - } + //Keep the compiler happy return null; } @@ -137,11 +129,7 @@ public class InternalBrokerBaseCase extends TestCase e.printStackTrace(); fail(e.getMessage()); } - catch (ConsumerTagNotUniqueException e) - { - e.printStackTrace(); - fail(e.getMessage()); - } + //Keep the compiler happy return null; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java index 6b8201eefb..3d37412376 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -36,8 +36,9 @@ import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.plugins.AllowAll; import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Arrays; import java.util.Collection; @@ -76,7 +77,7 @@ public class NullApplicationRegistry extends ApplicationRegistry _virtualHostRegistry = new VirtualHostRegistry(this); PropertiesConfiguration vhostProps = new PropertiesConfiguration(); VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps); - VirtualHost dummyHost = new VirtualHost(hostConfig); + VirtualHost dummyHost = new VirtualHostImpl(hostConfig); _virtualHostRegistry.registerVirtualHost(dummyHost); _virtualHostRegistry.setDefaultVirtualHostName("test"); _pluginManager = new PluginManager(""); @@ -97,7 +98,7 @@ public class NullApplicationRegistry extends ApplicationRegistry try { - super.close(); + super.close(); } finally { diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java index 7b7c86bb80..bb338458f1 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -35,8 +35,9 @@ import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabase import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.logging.RootMessageLoggerImpl; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; @@ -60,7 +61,7 @@ public class TestApplicationRegistry extends ApplicationRegistry private ServerConfiguration _config; - + public TestApplicationRegistry() throws ConfigurationException { super(new ServerConfiguration(new PropertiesConfiguration())); @@ -96,10 +97,10 @@ public class TestApplicationRegistry extends ApplicationRegistry _messageStore = new TestableMemoryMessageStore(); _virtualHostRegistry = new VirtualHostRegistry(this); - + PropertiesConfiguration vhostProps = new PropertiesConfiguration(); VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps); - _vHost = new VirtualHost(hostConfig, _messageStore); + _vHost = new VirtualHostImpl(hostConfig, _messageStore); _virtualHostRegistry.registerVirtualHost(_vHost); @@ -152,7 +153,7 @@ public class TestApplicationRegistry extends ApplicationRegistry CurrentActor.remove(); } } - + } diff --git a/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java b/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java index 447d09429d..9bd1e7c5e1 100644 --- a/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java +++ b/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java @@ -35,9 +35,6 @@ public class MockChannel extends AMQChannel super(session, channelId, messageStore); } - public Subscription getSubscription(AMQShortString subscription) - { - return _tag2SubscriptionMap.get(subscription); - } - + + } |
