summaryrefslogtreecommitdiff
path: root/java/broker/src/test
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-25 22:58:57 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-25 22:58:57 +0000
commit953ed819249457a5a6c4349c3b215f26d1abba16 (patch)
tree48340ad71f89a641111000ef4c6b63d8b2ce1ad7 /java/broker/src/test
parenta581be3f131e53b3f18aff392d5d28222d20e71d (diff)
downloadqpid-python-953ed819249457a5a6c4349c3b215f26d1abba16.tar.gz
Merged from java-broker-0-10 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@829675 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java20
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java290
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java22
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java57
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java148
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java1
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java81
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java7
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (renamed from java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java)325
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java10
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java32
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java13
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java59
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java74
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java28
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java80
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java71
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java14
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java37
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java95
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java71
-rwxr-xr-xjava/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java92
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java260
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java18
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java10
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java27
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java19
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java101
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java118
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java63
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java46
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java113
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java62
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java306
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java18
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java11
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java15
-rw-r--r--java/broker/src/test/java/org/apache/qpid/util/MockChannel.java7
43 files changed, 1266 insertions, 1573 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
index e3889162ad..3cdb9c0c2d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQBrokerManagerMBeanTest extends TestCase
@@ -46,7 +47,7 @@ public class AMQBrokerManagerMBeanTest extends TestCase
assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
- ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject());
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
mbean.createNewExchange(exchange1, "direct", false);
mbean.createNewExchange(exchange2, "topic", false);
mbean.createNewExchange(exchange3, "headers", false);
@@ -68,7 +69,7 @@ public class AMQBrokerManagerMBeanTest extends TestCase
{
String queueName = "testQueue_" + System.currentTimeMillis();
- ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject());
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
index 105056fe3d..d2408ba21f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
@@ -22,23 +22,22 @@ package org.apache.qpid.server;
import junit.framework.TestCase;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.queue.MockQueueEntry;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleQueueEntryList;
import org.apache.qpid.server.queue.MockAMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.AMQException;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.LinkedList;
-import java.util.Iterator;
/**
* QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
@@ -63,6 +62,7 @@ public class ExtractResendAndRequeueTest extends TestCase
UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
private AMQQueue _queue = new MockAMQQueue(getName());
+ private MessageStore _messageStore = new MemoryMessageStore();
private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
@Override
@@ -89,7 +89,7 @@ public class ExtractResendAndRequeueTest extends TestCase
while(queueEntries.advance())
{
QueueEntry entry = queueEntries.getNode();
- _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(), entry);
+ _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
// Store the entry for future inspection
_referenceList.add(entry);
@@ -137,7 +137,7 @@ public class ExtractResendAndRequeueTest extends TestCase
// requeueIfUnabletoResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, new StoreContext()));
+ msgToResend, true, _messageStore));
assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -166,7 +166,7 @@ public class ExtractResendAndRequeueTest extends TestCase
// requeueIfUnabletoResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, new StoreContext()));
+ msgToResend, true, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
@@ -187,7 +187,7 @@ public class ExtractResendAndRequeueTest extends TestCase
// requeueIfUnabletoResend = true so all messages should go to msgToRequeue
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, new StoreContext()));
+ msgToResend, true, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
@@ -208,7 +208,7 @@ public class ExtractResendAndRequeueTest extends TestCase
// requeueIfUnabletoResend = false so all messages should be dropped all maps should be empty
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, new StoreContext()));
+ msgToResend, false, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -240,7 +240,7 @@ public class ExtractResendAndRequeueTest extends TestCase
// requeueIfUnabletoResend : value doesn't matter here as queue has been deleted
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, new StoreContext()));
+ msgToResend, false, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
deleted file mode 100644
index 06d6b6de8b..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.ack;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-
-import java.util.*;
-
-public class TxAckTest extends TestCase
-{
- private Scenario individual;
- private Scenario multiple;
- private Scenario combined;
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
-
- // Highlight that this test will cause the creation of an AR
- ApplicationRegistry.getInstance();
-
- //ack only 5th msg
- individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
- individual.update(5, false);
-
- //ack all up to and including 5th msg
- multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
- multiple.update(5, true);
-
- //leave only 8th and 9th unacked
- combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
- combined.update(3, false);
- combined.update(5, true);
- combined.update(7, true);
- combined.update(2, true);//should be ignored
- combined.update(1, false);//should be ignored
- combined.update(10, false);
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- individual.stop();
- multiple.stop();
- combined.stop();
-
- // Ensure we close the AR we created
- ApplicationRegistry.remove();
- super.tearDown();
- }
-
- public void testPrepare() throws AMQException
- {
- individual.prepare();
- multiple.prepare();
- combined.prepare();
- }
-
- public void testUndoPrepare() throws AMQException
- {
- individual.undoPrepare();
- multiple.undoPrepare();
- combined.undoPrepare();
- }
-
- public void testCommit() throws AMQException
- {
- individual.commit();
- multiple.commit();
- combined.commit();
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TxAckTest.class);
- }
-
- private class Scenario
- {
- private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000);
- private final TxAck _op = new TxAck(_map);
- private final List<Long> _acked;
- private final List<Long> _unacked;
- private StoreContext _storeContext = new StoreContext();
- private AMQQueue _queue;
-
- Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
- {
- TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(),
- _storeContext, null,
- new LinkedList<RequiredDeliveryException>()
- );
-
- VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
-
- _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false,
- virtualHost, null);
-
- for (int i = 0; i < messageCount; i++)
- {
- long deliveryTag = i + 1;
-
- MessagePublishInfo info = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return null;
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return null;
- }
- };
-
- TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
- _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
- }
- _acked = acked;
- _unacked = unacked;
- }
-
- void update(long deliverytag, boolean multiple)
- {
- _op.update(deliverytag, multiple);
- }
-
- private void assertCount(List<Long> tags, int expected)
- {
- for (long tag : tags)
- {
- QueueEntry u = _map.get(tag);
- assertTrue("Message not found for tag " + tag, u != null);
- ((TestMessage) u.getMessage()).assertCountEquals(expected);
- }
- }
-
- void prepare() throws AMQException
- {
- _op.consolidate();
- _op.prepare(_storeContext);
-
- assertCount(_acked, -1);
- assertCount(_unacked, 0);
-
- }
-
- void undoPrepare()
- {
- _op.consolidate();
- _op.undoPrepare();
-
- assertCount(_acked, 1);
- assertCount(_unacked, 0);
- }
-
- void commit()
- {
- _op.consolidate();
- _op.commit(_storeContext);
-
- //check acked messages are removed from map
- Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags());
- keys.retainAll(_acked);
- assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
- //check unacked messages are still in map
- keys = new HashSet<Long>(_unacked);
- keys.removeAll(_map.getDeliveryTags());
- assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
- }
-
- public void stop()
- {
- _queue.stop();
- }
- }
-
- private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
- {
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- null,
- false);
- try
- {
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
- publishBody,
- new ContentHeaderBody()
- {
- public int getSize()
- {
- return 1;
- }
- });
- }
- catch (AMQException e)
- {
- // won't happen
- }
-
-
- return amqMessageHandle;
- }
-
-
- private class TestMessage extends AMQMessage
- {
- private final long _tag;
- private int _count;
-
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
- throws AMQException
- {
- super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
- _tag = tag;
- }
-
-
- public boolean incrementReference()
- {
- _count++;
- return true;
- }
-
- public void decrementReference(StoreContext context)
- {
- _count--;
- }
-
- void assertCountEquals(int expected)
- {
- assertEquals("Wrong count for message with tag " + _tag, expected, _count);
- }
- }
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index c5f291a0cb..5bd739c0af 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,6 +27,7 @@ import java.io.RandomAccessFile;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
+import java.util.Collections;
import junit.framework.TestCase;
@@ -482,12 +483,17 @@ public class ServerConfigurationTest extends TestCase
{
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
- assertEquals(5672, serverConfig.getPort());
+ assertNotNull(serverConfig.getPorts());
+ assertEquals(1, serverConfig.getPorts().size());
+ assertEquals(5672, serverConfig.getPorts().get(0));
+
// Check value we set
- _config.setProperty("connector.port", 10);
+ _config.setProperty("connector.port", "10");
serverConfig = new ServerConfiguration(_config);
- assertEquals(10, serverConfig.getPort());
+ assertNotNull(serverConfig.getPorts());
+ assertEquals(1, serverConfig.getPorts().size());
+ assertEquals("10", serverConfig.getPorts().get(0));
}
public void testGetBind() throws ConfigurationException
@@ -723,7 +729,9 @@ public class ServerConfigurationTest extends TestCase
ServerConfiguration config = new ServerConfiguration(mainFile.getAbsoluteFile());
assertEquals(4235, config.getSSLPort()); // From first file, not
// overriden by second
- assertEquals(2342, config.getPort()); // From the first file, not
+ assertNotNull(config.getPorts());
+ assertEquals(1, config.getPorts().size());
+ assertEquals("2342", config.getPorts().get(0)); // From the first file, not
// present in the second
assertEquals(true, config.getQpidNIO()); // From the second file, not
// present in the first
@@ -967,7 +975,7 @@ public class ServerConfigurationTest extends TestCase
out.write("\t<rule access=\"deny\" network=\"127.0.0.1\"/>");
out.write("</firewall>\n");
out.close();
-
+
reg.getConfiguration().reparseConfigFile();
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index 5d3b4e681a..b65020395c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -14,21 +14,20 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.qpid.server.configuration;
import junit.framework.TestCase;
-import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class VirtualHostConfigurationTest extends TestCase
{
@@ -55,50 +54,50 @@ public class VirtualHostConfigurationTest extends TestCase
super.tearDown();
}
-
+
public void testQueuePriority() throws Exception
{
// Set up queue with 5 priorities
- configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
+ configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
"atest");
- configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange",
+ configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange",
"amq.direct");
- configXml.addProperty("virtualhost.test.queues.queue.atest.priorities",
+ configXml.addProperty("virtualhost.test.queues.queue.atest.priorities",
"5");
// Set up queue with JMS style priorities
- configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
+ configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
"ptest");
- configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange",
+ configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange",
"amq.direct");
- configXml.addProperty("virtualhost.test.queues.queue.ptest.priority",
+ configXml.addProperty("virtualhost.test.queues.queue.ptest.priority",
"true");
-
+
// Set up queue with no priorities
- configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
+ configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
"ntest");
- configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange",
+ configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange",
"amq.direct");
- configXml.addProperty("virtualhost.test.queues.queue.ntest.priority",
+ configXml.addProperty("virtualhost.test.queues.queue.ntest.priority",
"false");
-
- VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
-
+
+ VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
+
// Check that atest was a priority queue with 5 priorities
AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
assertTrue(atest instanceof AMQPriorityQueue);
assertEquals(5, ((AMQPriorityQueue) atest).getPriorities());
-
+
// Check that ptest was a priority queue with 10 priorities
AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest"));
assertTrue(ptest instanceof AMQPriorityQueue);
assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities());
-
+
// Check that ntest wasn't a priority queue
AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest"));
assertFalse(ntest instanceof AMQPriorityQueue);
}
-
+
public void testQueueAlerts() throws Exception
{
// Set up queue with 5 priorities
@@ -106,7 +105,7 @@ public class VirtualHostConfigurationTest extends TestCase
configXml.addProperty("virtualhost.test.queues.maximumQueueDepth", "1");
configXml.addProperty("virtualhost.test.queues.maximumMessageSize", "2");
configXml.addProperty("virtualhost.test.queues.maximumMessageAge", "3");
-
+
configXml.addProperty("virtualhost.test.queues(-1).queue(1).name(1)", "atest");
configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", "amq.direct");
configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumQueueDepth", "4");
@@ -114,21 +113,21 @@ public class VirtualHostConfigurationTest extends TestCase
configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumMessageAge", "6");
configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "btest");
-
- VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
-
+
+ VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
+
// Check specifically configured values
AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
assertEquals(4, aTest.getMaximumQueueDepth());
assertEquals(5, aTest.getMaximumMessageSize());
assertEquals(6, aTest.getMaximumMessageAge());
-
- // Check default values
+
+ // Check default values
AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest"));
assertEquals(1, bTest.getMaximumQueueDepth());
assertEquals(2, bTest.getMaximumMessageSize());
assertEquals(3, bTest.getMaximumMessageAge());
-
+
}
-
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index f8dd266bd2..e26b5b048c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -27,18 +27,18 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.log4j.Logger;
import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
public class AbstractHeadersExchangeTestBase extends TestCase
{
@@ -52,10 +52,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase
*/
private MessageStore _store = new MemoryMessageStore();
- private StoreContext _storeContext = new StoreContext();
-
- private MessageHandleFactory _handleFactory = new MessageHandleFactory();
-
private int count;
public void testDoNothing()
@@ -91,14 +87,18 @@ public class AbstractHeadersExchangeTestBase extends TestCase
}
- protected void route(Message m) throws AMQException
+ protected int route(Message m) throws AMQException
{
+ m.getIncomingMessage().headersReceived();
m.route(exchange);
- m.getIncomingMessage().routingComplete(_store, _handleFactory);
if(m.getIncomingMessage().allContentReceived())
{
- m.getIncomingMessage().deliverToQueues();
+ for(AMQQueue q : m.getIncomingMessage().getDestinationQueues())
+ {
+ q.enqueue(m);
+ }
}
+ return m.getIncomingMessage().getDestinationQueues().size();
}
protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
@@ -118,10 +118,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
{
- try
- {
- route(m);
- assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+ int queueCount = route(m);
+
for (TestQueue q : queues)
{
if (expected.contains(q))
@@ -135,12 +133,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
}
}
- }
- catch (NoRouteException ex)
- {
- assertTrue("Expected "+m+" not to be returned",expectReturn);
- }
+ if(expectReturn)
+ {
+ assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount);
+ }
}
@@ -242,6 +239,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
{
final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+ public String toString()
+ {
+ return getName().toString();
+ }
+
public TestQueue(AMQShortString name) throws AMQException
{
super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
@@ -256,9 +258,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase
* @throws AMQException
*/
@Override
- public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
+ public QueueEntry enqueue(ServerMessage msg) throws AMQException
{
- messages.add( new HeadersExchangeTest.Message(msg));
+ messages.add( new HeadersExchangeTest.Message((AMQMessage) msg));
return new QueueEntry()
{
@@ -317,6 +319,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isAcquiredBy(Subscription subscription)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void setDeliveredToSubscription()
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -327,9 +334,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public String debugIdentity()
+ public boolean releaseButRetain()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean immediateAndNotDelivered()
@@ -337,11 +344,26 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void setRedelivered(boolean b)
+ public void setRedelivered()
{
//To change body of implemented methods use File | Settings | File Templates.
}
+ public AMQMessageHeader getMessageHeader()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isPersistent()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isRedelivered()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public Subscription getDeliveredSubscription()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -362,17 +384,22 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void requeue(StoreContext storeContext) throws AMQException
+ public void requeue()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void requeue(Subscription subscription)
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+ public void dequeue()
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dispose(final StoreContext storeContext) throws MessageCleanupException
+ public void dispose()
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -382,7 +409,12 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void discard()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void routeToAlternate()
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -421,15 +453,16 @@ public class AbstractHeadersExchangeTestBase extends TestCase
*/
static class Message extends AMQMessage
{
+ private static AtomicLong _messageId = new AtomicLong();
+
private class TestIncomingMessage extends IncomingMessage
{
public TestIncomingMessage(final long messageId,
final MessagePublishInfo info,
- final TransactionalContext txnContext,
final AMQProtocolSession publisher)
{
- super(messageId, info, txnContext, publisher);
+ super(info);
}
@@ -439,7 +472,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
}
- public ContentHeaderBody getContentHeaderBody()
+ public ContentHeaderBody getContentHeader()
{
try
{
@@ -454,15 +487,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase
private IncomingMessage _incoming;
- private static MessageStore _messageStore = new SkeletonMessageStore();
-
- private static StoreContext _storeContext = new StoreContext();
-
-
- private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>()
- );
Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
{
@@ -471,7 +495,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException
{
- this(protocolSession, _messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+ this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST);
}
public IncomingMessage getIncomingMessage()
@@ -484,46 +508,34 @@ public class AbstractHeadersExchangeTestBase extends TestCase
ContentHeaderBody header,
List<ContentBody> bodies) throws AMQException
{
- super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
+ super(new MockStoredMessage(messageId, publish, header));
+
+ StoredMessage<MessageMetaData> storedMessage = getStoredMessage();
+ int pos = 0;
+ for(ContentBody body : bodies)
+ {
+ storedMessage.addContent(pos, body.payload.duplicate().buf());
+ pos += body.payload.limit();
+ }
-
- _incoming = new TestIncomingMessage(getMessageId(),publish, _txnContext, protocolsession);
+ _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
_incoming.setContentHeaderBody(header);
}
- private static AMQMessageHandle createMessageHandle(final long messageId,
- final MessagePublishInfo publish,
- final ContentHeaderBody header)
- {
-
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- _messageStore,
- true);
-
- try
- {
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
- }
- catch (AMQException e)
- {
-
- }
- return amqMessageHandle;
- }
private Message(AMQMessage msg) throws AMQException
{
- super(msg);
+ super(msg.getStoredMessage());
}
void route(Exchange exchange) throws AMQException
{
- exchange.route(_incoming);
+ _incoming.enqueue(exchange.route(_incoming));
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
index ac12e050b4..016f7eacbe 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 86ba96bf5d..dc47951548 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -22,16 +22,95 @@ package org.apache.qpid.server.exchange;
import java.util.Map;
import java.util.HashMap;
+import java.util.Set;
import junit.framework.TestCase;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.message.AMQMessageHeader;
/**
*/
public class HeadersBindingTest extends TestCase
{
+
+ private class MockHeader implements AMQMessageHeader
+ {
+
+ private final Map<String, Object> _headers = new HashMap<String, Object>();
+
+ public String getCorrelationId()
+ {
+ return null;
+ }
+
+ public long getExpiration()
+ {
+ return 0;
+ }
+
+ public String getMessageId()
+ {
+ return null;
+ }
+
+ public String getMimeType()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public String getEncoding()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public byte getPriority()
+ {
+ return 0;
+ }
+
+ public long getTimestamp()
+ {
+ return 0;
+ }
+
+ public String getType()
+ {
+ return null;
+ }
+
+ public String getReplyTo()
+ {
+ return null;
+ }
+
+ public Object getHeader(String name)
+ {
+ return _headers.get(name);
+ }
+
+ public boolean containsHeaders(Set<String> names)
+ {
+ return _headers.keySet().containsAll(names);
+ }
+
+ public boolean containsHeader(String name)
+ {
+ return _headers.containsKey(name);
+ }
+
+ public void setString(String key, String value)
+ {
+ setObject(key,value);
+ }
+
+ public void setObject(String key, Object value)
+ {
+ _headers.put(key,value);
+ }
+ }
+
private FieldTable bindHeaders = new FieldTable();
- private FieldTable matchHeaders = new FieldTable();
+ private MockHeader matchHeaders = new MockHeader();
public void testDefault_1()
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 750a1cd081..580bc78b8d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,7 +35,8 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
super.setUp();
// AR will use the NullAR by default
// Just use the first vhost.
- VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
+ VirtualHost
+ virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
_protocolSession = new InternalTestProtocolSession(virtualHost);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index 82b8f76efc..9d7a323b6d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -14,9 +14,9 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.exchange;
@@ -25,28 +25,24 @@ import junit.framework.Assert;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import java.util.LinkedList;
-
-public class DestWildExchangeTest extends TestCase
+public class TopicExchangeTest extends TestCase
{
TopicExchange _exchange;
VirtualHost _vhost;
MessageStore _store;
- StoreContext _context;
InternalTestProtocolSession _protocolSession;
@@ -56,27 +52,26 @@ public class DestWildExchangeTest extends TestCase
_exchange = new TopicExchange();
_vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
_store = new MemoryMessageStore();
- _context = new StoreContext();
_protocolSession = new InternalTestProtocolSession(_vhost);
}
public void tearDown()
{
- ApplicationRegistry.remove();
+ ApplicationRegistry.remove();
}
public void testNoRoute() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b"));
- IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession);
+ IncomingMessage message = new IncomingMessage(info);
- _exchange.route(message);
+ message.enqueue(_exchange.route(message));
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -89,33 +84,20 @@ public class DestWildExchangeTest extends TestCase
IncomingMessage message = createMessage("a.b");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c");
- try
- {
- routeMessage(message);
- fail("Message has no route and should fail to be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -129,52 +111,33 @@ public class DestWildExchangeTest extends TestCase
IncomingMessage message = createMessage("a.b");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ int queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a");
- try
- {
- routeMessage(message);
- fail("Message has no route and should fail to be routed");
- }
- catch (AMQException nre)
- {
- }
+
+ queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -187,89 +150,56 @@ public class DestWildExchangeTest extends TestCase
IncomingMessage message = createMessage("a.b.c");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ int queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.b");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("b");
- try
- {
- routeMessage(message);
- fail("Message has no route and should fail to be routed");
- }
- catch (AMQException nre)
- {
- }
+
+ queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -283,38 +213,24 @@ public class DestWildExchangeTest extends TestCase
IncomingMessage message = createMessage("a.c.d.b");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c.b");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -327,66 +243,39 @@ public class DestWildExchangeTest extends TestCase
IncomingMessage message = createMessage("a.c.b.b");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.a.b.c");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.b.c.b");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.b.c.b.c");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
-
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -400,34 +289,21 @@ public class DestWildExchangeTest extends TestCase
IncomingMessage message = createMessage("a.c.b.b.c");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.a.b.c.d");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -440,33 +316,20 @@ public class DestWildExchangeTest extends TestCase
IncomingMessage message = createMessage("a.c.b.b.c");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.a.b.c.d");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -479,25 +342,26 @@ public class DestWildExchangeTest extends TestCase
IncomingMessage message = createMessage("a.b.c");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
- private void routeMessage(final IncomingMessage message)
+ private int routeMessage(final IncomingMessage message)
throws AMQException
{
- _exchange.route(message);
- message.routingComplete(_store, new MessageHandleFactory());
- message.deliverToQueues();
+ MessageMetaData mmd = message.headersReceived();
+ message.setStoredMessage(_store.addMessage(mmd));
+
+ message.enqueue(_exchange.route(message));
+ AMQMessage msg = new AMQMessage(message.getStoredMessage());
+ for(AMQQueue q : message.getDestinationQueues())
+ {
+ q.enqueue(msg);
+ }
+ return message.getDestinationQueues().size();
}
public void testMoreRouting() throws AMQException
@@ -508,14 +372,8 @@ public class DestWildExchangeTest extends TestCase
IncomingMessage message = createMessage("a.b.c");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
@@ -529,14 +387,8 @@ public class DestWildExchangeTest extends TestCase
IncomingMessage message = createMessage("a");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
@@ -546,12 +398,11 @@ public class DestWildExchangeTest extends TestCase
{
MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
- TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null,
- new LinkedList<RequiredDeliveryException>()
- );
-
- IncomingMessage message = new IncomingMessage(0L, info, trancontext,_protocolSession);
- message.setContentHeaderBody( new ContentHeaderBody());
+ IncomingMessage message = new IncomingMessage(info);
+ final ContentHeaderBody chb = new ContentHeaderBody();
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ chb.properties = props;
+ message.setContentHeaderBody(chb);
return message;
@@ -574,7 +425,7 @@ public class DestWildExchangeTest extends TestCase
public void setExchange(AMQShortString exchange)
{
-
+
}
public boolean isImmediate()
diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
index 0fa126dedb..46dc677921 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
@@ -121,7 +121,7 @@ public class AMQPChannelActorTest extends TestCase
// Verify that the message has the correct type
assertTrue("Message contains the [con: prefix",
logs.get(0).toString().contains("[con:"));
-
+
// Verify that all the values were presented to the MessageFormatter
// so we will not end up with '{n}' entries in the log.
diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
index 1b25844a14..98c14efe4d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
@@ -75,7 +75,7 @@ public class AMQPConnectionActorTest extends TestCase
// Correctly Close the AR we created
ApplicationRegistry.remove();
- super.tearDown();
+ super.tearDown();
}
private void setUpWithConfig(ServerConfiguration serverConfig) throws AMQException
diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java
index e02993b840..e3280a4076 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java
@@ -41,12 +41,12 @@ public class MessageStoreMessagesTest extends AbstractTestMessages
{
String location = "/path/to/the/message/store.files";
- _logMessage = MessageStoreMessages.MST_1002(location);
+ _logMessage = ConfigStoreMessages.CFG_1002(location);
List<Object> log = performLog();
String[] expected = {"Store location :", location};
- validateLogMessage(log, "MST-1002", expected);
+ validateLogMessage(log, "CFG-1002", expected);
}
public void testMessage1003()
@@ -59,7 +59,7 @@ public class MessageStoreMessagesTest extends AbstractTestMessages
validateLogMessage(log, "MST-1003", expected);
}
- public void testMessage1004()
+ /* public void testMessage1004()
{
_logMessage = MessageStoreMessages.MST_1004(null,false);
List<Object> log = performLog();
@@ -91,7 +91,7 @@ public class MessageStoreMessagesTest extends AbstractTestMessages
// Here we use MessageFormat to ensure the messasgeCount of 2000 is
// reformated for display as '2,000'
- String[] expected = {"Recovered ",
+ String[] expected = {"Recovered ",
MessageFormat.format("{0,number}", messasgeCount),
"messages for queue", queueName};
@@ -119,5 +119,5 @@ public class MessageStoreMessagesTest extends AbstractTestMessages
validateLogMessage(log, "MST-1006", expected);
}
-
+ */
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index bc36c61382..e6561a06b9 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -23,19 +23,16 @@ package org.apache.qpid.server.protocol;
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import javax.management.JMException;
-import java.security.Principal;
/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class AMQProtocolSessionMBeanTest extends TestCase
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index ec7bf1cb72..681e513ecb 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -21,19 +21,19 @@
package org.apache.qpid.server.protocol;
import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.transport.TestNetworkDriver;
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
@@ -70,6 +70,16 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
return (byte) 8;
}
+ public void writeReturn(MessagePublishInfo messagePublishInfo,
+ ContentHeaderBody header,
+ MessageContentSource msgContent,
+ int channelId,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public byte getProtocolMinorVersion()
{
return (byte) 0;
@@ -82,12 +92,12 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
synchronized (_channelDelivers)
{
List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
-
+
if (all == null)
{
return new ArrayList<DeliveryPair>(0);
}
-
+
List<DeliveryPair> msgs = all.subList(0, count);
List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
@@ -108,7 +118,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
}
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
{
_deliveryCount.incrementAndGet();
@@ -130,11 +140,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
consumers.put(consumerTag, consumerDelivers);
}
- consumerDelivers.add(new DeliveryPair(deliveryTag, message));
+ consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
}
}
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException
{
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
index e37492bcb0..13e712dbac 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
@@ -22,17 +22,10 @@ package org.apache.qpid.server.protocol;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-
-import java.security.Principal;
/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class MaxChannelsTest extends TestCase
@@ -66,14 +59,14 @@ public class MaxChannelsTest extends TestCase
}
assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size()));
}
-
+
@Override
public void setUp()
{
//Highlight that this test will cause a new AR to be created
ApplicationRegistry.getInstance();
}
-
+
@Override
public void tearDown() throws Exception
{
@@ -87,7 +80,7 @@ public class MaxChannelsTest extends TestCase
{
// Correctly Close the AR we created
ApplicationRegistry.remove();
- }
+ }
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index aff7af6952..dd013e6ad5 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -1,6 +1,6 @@
package org.apache.qpid.server.queue;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,21 +8,22 @@ package org.apache.qpid.server.queue;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.util.ArrayList;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import junit.framework.AssertionFailedError;
@@ -42,38 +43,38 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
// Enqueue messages in order
- _queue.enqueue(null, createMessage(1L, (byte) 10));
- _queue.enqueue(null, createMessage(2L, (byte) 4));
- _queue.enqueue(null, createMessage(3L, (byte) 0));
-
+ _queue.enqueue(createMessage(1L, (byte) 10));
+ _queue.enqueue(createMessage(2L, (byte) 4));
+ _queue.enqueue(createMessage(3L, (byte) 0));
+
// Enqueue messages in reverse order
- _queue.enqueue(null, createMessage(4L, (byte) 0));
- _queue.enqueue(null, createMessage(5L, (byte) 4));
- _queue.enqueue(null, createMessage(6L, (byte) 10));
-
+ _queue.enqueue(createMessage(4L, (byte) 0));
+ _queue.enqueue(createMessage(5L, (byte) 4));
+ _queue.enqueue(createMessage(6L, (byte) 10));
+
// Enqueue messages out of order
- _queue.enqueue(null, createMessage(7L, (byte) 4));
- _queue.enqueue(null, createMessage(8L, (byte) 10));
- _queue.enqueue(null, createMessage(9L, (byte) 0));
-
+ _queue.enqueue(createMessage(7L, (byte) 4));
+ _queue.enqueue(createMessage(8L, (byte) 10));
+ _queue.enqueue(createMessage(9L, (byte) 0));
+
// Register subscriber
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
-
+
ArrayList<QueueEntry> msgs = _subscription.getMessages();
try
{
- assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId());
- assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId());
- assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId());
+ assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber());
+ assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber());
+ assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber());
- assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId());
- assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId());
- assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId());
+ assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber());
+ assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber());
+ assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber());
- assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId());
- assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId());
- assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId());
+ assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber());
+ assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber());
+ assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber());
}
catch (AssertionFailedError afe)
{
@@ -81,7 +82,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
int index = 1;
for (QueueEntry qe : msgs)
{
- System.err.println(index + ":" + qe.getMessage().getMessageId());
+ System.err.println(index + ":" + qe.getMessage().getMessageNumber());
index++;
}
@@ -98,10 +99,10 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
msg.getContentHeaderBody().properties = props;
return msg;
}
-
+
protected AMQMessage createMessage(Long id) throws AMQException
{
return createMessage(id, (byte) 0);
}
-
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 19470e6226..5f0d77afea 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -20,21 +20,17 @@
*/
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.LinkedList;
-
-import javax.management.Notification;
-
import junit.framework.TestCase;
-
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
@@ -42,16 +38,16 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import javax.management.Notification;
+import java.util.ArrayList;
+
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
-{
+{
private final static long MAX_MESSAGE_COUNT = 50;
private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec
private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB
@@ -61,11 +57,6 @@ public class AMQQueueAlertTest extends TestCase
private VirtualHost _virtualHost;
private AMQProtocolEngine _protocolSession;
private MessageStore _messageStore = new MemoryMessageStore();
- private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>()
- );
private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
/**
@@ -75,6 +66,10 @@ public class AMQQueueAlertTest extends TestCase
*/
public void testMessageCountAlert() throws Exception
{
+ _protocolSession = new InternalTestProtocolSession(_virtualHost);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ _protocolSession.addChannel(channel);
+
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"),
false, _virtualHost,
null);
@@ -82,7 +77,7 @@ public class AMQQueueAlertTest extends TestCase
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
- sendMessages(MAX_MESSAGE_COUNT, 256l);
+ sendMessages(channel, MAX_MESSAGE_COUNT, 256l);
assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
Notification lastNotification = _queueMBean.getLastNotification();
@@ -99,6 +94,10 @@ public class AMQQueueAlertTest extends TestCase
*/
public void testMessageSizeAlert() throws Exception
{
+ _protocolSession = new InternalTestProtocolSession(_virtualHost);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ _protocolSession.addChannel(channel);
+
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"),
false, _virtualHost,
null);
@@ -106,7 +105,7 @@ public class AMQQueueAlertTest extends TestCase
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
_queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
- sendMessages(1, MAX_MESSAGE_SIZE * 2);
+ sendMessages(channel, 1, MAX_MESSAGE_SIZE * 2);
assertTrue(_queueMBean.getMessageCount() == 1);
Notification lastNotification = _queueMBean.getLastNotification();
@@ -125,6 +124,10 @@ public class AMQQueueAlertTest extends TestCase
*/
public void testQueueDepthAlertNoSubscriber() throws Exception
{
+ _protocolSession = new InternalTestProtocolSession(_virtualHost);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ _protocolSession.addChannel(channel);
+
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"),
false, _virtualHost,
null);
@@ -134,7 +137,7 @@ public class AMQQueueAlertTest extends TestCase
while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
{
- sendMessages(1, MAX_MESSAGE_SIZE);
+ sendMessages(channel, 1, MAX_MESSAGE_SIZE);
}
Notification lastNotification = _queueMBean.getLastNotification();
@@ -154,6 +157,10 @@ public class AMQQueueAlertTest extends TestCase
*/
public void testMessageAgeAlert() throws Exception
{
+ _protocolSession = new InternalTestProtocolSession(_virtualHost);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ _protocolSession.addChannel(channel);
+
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"),
false, _virtualHost,
null);
@@ -161,7 +168,7 @@ public class AMQQueueAlertTest extends TestCase
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
_queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
- sendMessages(1, MAX_MESSAGE_SIZE);
+ sendMessages(channel, 1, MAX_MESSAGE_SIZE);
// Ensure message sits on queue long enough to age.
Thread.sleep(MAX_MESSAGE_AGE * 2);
@@ -201,7 +208,7 @@ public class AMQQueueAlertTest extends TestCase
// Send messages(no of message to be little more than what can cause a Queue_Depth alert)
int messageCount = Math.round(MAX_QUEUE_DEPTH / MAX_MESSAGE_SIZE) + 10;
long totalSize = (messageCount * MAX_MESSAGE_SIZE);
- sendMessages(messageCount, MAX_MESSAGE_SIZE);
+ sendMessages(channel, messageCount, MAX_MESSAGE_SIZE);
// Check queueDepth. There should be no messages on the queue and as the subscriber is listening
// so there should be no Queue_Deoth alert raised
@@ -228,7 +235,7 @@ public class AMQQueueAlertTest extends TestCase
_queue.registerSubscription(
subscription2, false);
-
+
while (_queue.getUndeliveredMessageCount()!= 0)
{
Thread.sleep(100);
@@ -247,7 +254,7 @@ public class AMQQueueAlertTest extends TestCase
_queueMBean.clearQueue();
assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
}
-
+
protected IncomingMessage message(final boolean immediate, long size) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
@@ -280,8 +287,10 @@ public class AMQQueueAlertTest extends TestCase
};
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ contentHeaderBody.properties = props;
contentHeaderBody.bodySize = size; // in bytes
- IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+ IncomingMessage message = new IncomingMessage(publish);
message.setContentHeaderBody(contentHeaderBody);
return message;
@@ -305,16 +314,19 @@ public class AMQQueueAlertTest extends TestCase
}
- private void sendMessages(long messageCount, final long size) throws AMQException
+ private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException
{
IncomingMessage[] messages = new IncomingMessage[(int) messageCount];
+ MessageMetaData[] metaData = new MessageMetaData[(int) messageCount];
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false, size);
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
+ metaData[i] = messages[i].headersReceived();
+ messages[i].setStoredMessage(_messageStore.addMessage(metaData[i]));
+
messages[i].enqueue(qs);
- messages[i].routingComplete(_messageStore, new MessageHandleFactory());
}
@@ -324,6 +336,10 @@ public class AMQQueueAlertTest extends TestCase
ByteBuffer _data = ByteBuffer.allocate((int)size);
+ {
+ _data.limit((int)size);
+ }
+
public int getSize()
{
return (int) size;
@@ -336,10 +352,12 @@ public class AMQQueueAlertTest extends TestCase
public void reduceToFit()
{
-
+
}
});
- messages[i].deliverToQueues();
+
+ _queue.enqueue(new AMQMessage(messages[i].getStoredMessage()));
+
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index e692069663..97f061fdd1 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -25,7 +25,6 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
public class AMQQueueFactoryTest extends TestCase
{
@@ -55,31 +54,18 @@ public class AMQQueueFactoryTest extends TestCase
FieldTable fieldTable = new FieldTable();
fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5);
- try
- {
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
- _virtualHost, fieldTable);
- assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
+ _virtualHost, fieldTable);
+
+ assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
}
public void testSimpleQueueRegistration()
{
- try
- {
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
- _virtualHost, null);
- assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
+ _virtualHost, null);
+ assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 03eb7021ad..3bb8d397be 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -29,7 +29,10 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactory;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -38,19 +41,15 @@ import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.mina.common.ByteBuffer;
+import org.apache.commons.configuration.PropertiesConfiguration;
import javax.management.JMException;
import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Collections;
/**
* Test class to test AMQQueueMBean attribtues and operations
@@ -61,8 +60,6 @@ public class AMQQueueMBeanTest extends TestCase
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private MessageStore _messageStore;
- private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext;
private VirtualHost _virtualHost;
private AMQProtocolSession _protocolSession;
private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
@@ -108,7 +105,7 @@ public class AMQQueueMBeanTest extends TestCase
//Ensure that the data has been removed from the Store
verifyBrokerState();
}
-
+
public void testDeleteMessages() throws Exception
{
int messageCount = 10;
@@ -129,9 +126,9 @@ public class AMQQueueMBeanTest extends TestCase
}
catch(Exception e)
{
-
+
}
-
+
//delete last message, leaving 2nd to 9th
_queueMBean.deleteMessages(10L,10L);
assertTrue(_queueMBean.getMessageCount() == (messageCount - 2));
@@ -143,7 +140,7 @@ public class AMQQueueMBeanTest extends TestCase
}
catch(Exception e)
{
-
+
}
//delete remaining messages, leaving none
@@ -159,18 +156,16 @@ public class AMQQueueMBeanTest extends TestCase
private void verifyBrokerState()
{
- TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore)_virtualHost.getMessageStore();
// Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
- assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
- assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
- assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
- assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+
+ assertEquals("Store should have no messages:" + store.getMessageCount(), 0, store.getMessageCount());
}
public void testConsumerCount() throws AMQException
{
-
+
assertTrue(_queue.getActiveConsumerCount() == 0);
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
@@ -182,7 +177,7 @@ public class AMQQueueMBeanTest extends TestCase
Subscription subscription =
SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), protocolSession, new AMQShortString("test"), false, null, false, channel.getCreditManager());
-
+
_queue.registerSubscription(subscription, false);
assertEquals(1,(int)_queueMBean.getActiveConsumerCount());
@@ -225,7 +220,6 @@ public class AMQQueueMBeanTest extends TestCase
assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth));
assertTrue(_queueMBean.getName().equals("testQueue"));
- assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
assertFalse(_queueMBean.isAutoDelete());
assertFalse(_queueMBean.isDurable());
}
@@ -261,7 +255,7 @@ public class AMQQueueMBeanTest extends TestCase
{
}
-
+
try
{
long end = Integer.MAX_VALUE;
@@ -275,17 +269,22 @@ public class AMQQueueMBeanTest extends TestCase
}
IncomingMessage msg = message(false, false);
- long id = msg.getMessageId();
- _queue.clearQueue(_storeContext);
+ _queue.clearQueue();
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, new MessageHandleFactory());
+ MessageMetaData mmd = msg.headersReceived();
+ msg.setStoredMessage(_messageStore.addMessage(mmd));
+ long id = msg.getMessageNumber();
msg.addContentBodyFrame(new ContentChunk()
{
ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE);
+ {
+ _data.limit((int)MESSAGE_SIZE);
+ }
+
public int getSize()
{
return (int) MESSAGE_SIZE;
@@ -301,7 +300,12 @@ public class AMQQueueMBeanTest extends TestCase
}
});
- msg.deliverToQueues();
+
+ AMQMessage m = new AMQMessage(msg.getStoredMessage());
+ for(AMQQueue q : msg.getDestinationQueues())
+ {
+ q.enqueue(m);
+ }
// _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
_queueMBean.viewMessageContent(id);
try
@@ -350,7 +354,7 @@ public class AMQQueueMBeanTest extends TestCase
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+ IncomingMessage msg = new IncomingMessage(publish);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
@@ -360,15 +364,17 @@ public class AMQQueueMBeanTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+ IApplicationRegistry applicationRegistry = new TestApplicationRegistry(new ServerConfiguration(configuration));
+ ApplicationRegistry.initialise(applicationRegistry );
+
+ configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
_messageStore = _virtualHost.getMessageStore();
- _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>()
- );
-
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost,
null);
_queueMBean = new AMQQueueMBean(_queue);
@@ -391,7 +397,8 @@ public class AMQQueueMBeanTest extends TestCase
currentMessage.enqueue(qs);
// route header
- currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
+ MessageMetaData mmd = currentMessage.headersReceived();
+ currentMessage.setStoredMessage(_messageStore.addMessage(mmd));
// Add the body so we have somthing to test later
currentMessage.addContentBodyFrame(
@@ -400,7 +407,12 @@ public class AMQQueueMBeanTest extends TestCase
.convertToContentChunk(
new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
MESSAGE_SIZE)));
- currentMessage.deliverToQueues();
+
+ AMQMessage m = new AMQMessage(currentMessage.getStoredMessage());
+ for(AMQQueue q : currentMessage.getDestinationQueues())
+ {
+ q.enqueue(m);
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 79f7d75aa9..d64e533f72 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -28,7 +28,10 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -39,15 +42,9 @@ import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.NullApplicationRegistry;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.Set;
-import java.util.Collections;
/**
* Tests that acknowledgements are handled correctly.
@@ -62,8 +59,6 @@ public class AckTest extends TestCase
private TestMemoryMessageStore _messageStore;
- private StoreContext _storeContext = new StoreContext();
-
private AMQChannel _channel;
private AMQQueue _queue;
@@ -99,11 +94,7 @@ public class AckTest extends TestCase
private void publishMessages(int count, boolean persistent) throws AMQException
{
- TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
- new LinkedList<RequiredDeliveryException>()
- );
_queue.registerSubscription(_subscription,false);
- MessageHandleFactory factory = new MessageHandleFactory();
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -136,31 +127,50 @@ public class AckTest extends TestCase
return new AMQShortString("rk");
}
};
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession);
+ final IncomingMessage msg = new IncomingMessage(publishBody);
//IncomingMessage msg2 = null;
+ BasicContentHeaderProperties b = new BasicContentHeaderProperties();
+ ContentHeaderBody cb = new ContentHeaderBody();
+ cb.properties = b;
+
if (persistent)
{
- BasicContentHeaderProperties b = new BasicContentHeaderProperties();
//This is DeliveryMode.PERSISTENT
b.setDeliveryMode((byte) 2);
- ContentHeaderBody cb = new ContentHeaderBody();
- cb.properties = b;
- msg.setContentHeaderBody(cb);
- }
- else
- {
- msg.setContentHeaderBody(new ContentHeaderBody());
}
+
+ msg.setContentHeaderBody(cb);
+
// we increment the reference here since we are not delivering the messaging to any queues, which is where
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, factory);
+ MessageMetaData mmd = msg.headersReceived();
+ msg.setStoredMessage(_messageStore.addMessage(mmd));
if(msg.allContentReceived())
{
- msg.deliverToQueues();
+ ServerTransaction txn = new AutoCommitTransaction(_messageStore);
+ txn.enqueue(_queue, msg, new ServerTransaction.Action() {
+ public void postCommit()
+ {
+ try
+ {
+ _queue.enqueue(new AMQMessage(msg.getStoredMessage()));
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
+
}
// we manually send the message to the subscription
//_subscription.send(new QueueEntry(_queue,msg), _queue);
@@ -178,8 +188,7 @@ public class AckTest extends TestCase
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
+ assertEquals("",msgCount,map.size());
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -191,8 +200,6 @@ public class AckTest extends TestCase
assertTrue(unackedMsg.getQueue() == _queue);
}
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
}
/**
@@ -207,8 +214,8 @@ public class AckTest extends TestCase
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
- assertTrue(_messageStore.getContentBodyMap().size() == 0);
+ assertTrue(_messageStore.getMessageCount() == 0);
+
}
@@ -224,8 +231,8 @@ public class AckTest extends TestCase
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
- assertTrue(_messageStore.getContentBodyMap().size() == 0);
+ assertTrue(_messageStore.getMessageCount() == 0);
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
index 355ba6a362..7000df157e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
@@ -20,25 +20,18 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.AMQMessage;
public class MockAMQMessage extends AMQMessage
{
public MockAMQMessage(long messageId)
throws AMQException
{
- super(new MockAMQMessageHandle(messageId) ,
- (StoreContext)null,
- (MessagePublishInfo)new MockMessagePublishInfo());
+ super(new MockStoredMessage(messageId));
}
- protected MockAMQMessage(AMQMessage msg)
- throws AMQException
- {
- super(msg);
- }
+
@Override
@@ -46,4 +39,5 @@ public class MockAMQMessage extends AMQMessage
{
return 0l;
}
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java
deleted file mode 100644
index bdb0707c27..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.store.StoreContext;
-
-public class MockAMQMessageHandle extends InMemoryMessageHandle
-{
- public MockAMQMessageHandle(final Long messageId)
- {
- super(messageId);
- }
-
- @Override
- public long getBodySize(StoreContext store)
- {
- return 0l;
- }
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index a131c2a465..7c1f728664 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -23,23 +23,19 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.Configuration;
import java.util.List;
import java.util.Set;
import java.util.Map;
-import java.util.HashMap;
-import java.util.LinkedList;
public class MockAMQQueue implements AMQQueue
{
@@ -47,6 +43,10 @@ public class MockAMQQueue implements AMQQueue
private AMQShortString _name;
private VirtualHost _virtualhost;
+ private PrincipalHolder _principalHolder;
+
+ private Object _exclusiveOwner;
+
public MockAMQQueue(String name)
{
_name = new AMQShortString(name);
@@ -57,6 +57,11 @@ public class MockAMQQueue implements AMQQueue
return _name;
}
+ public void setNoLocal(boolean b)
+ {
+
+ }
+
public boolean isDurable()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -162,17 +167,22 @@ public class MockAMQQueue implements AMQQueue
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
+ public QueueEntry enqueue(ServerMessage message) throws AMQException
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
+ public void requeue(QueueEntry entry)
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
+ public void requeue(QueueEntryImpl storeContext, Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeue(QueueEntry entry)
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -211,23 +221,23 @@ public class MockAMQQueue implements AMQQueue
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
-
+
public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
{
return null;
}
- public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext)
+ public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext)
+ public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
+ public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -286,16 +296,16 @@ public class MockAMQQueue implements AMQQueue
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
+ public void deleteMessageFromTop()
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public long clearQueue(StoreContext storeContext) throws AMQException
+ public long clearQueue()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
-
+
public void checkMessageStatus() throws AMQException
{
@@ -327,8 +337,28 @@ public class MockAMQQueue implements AMQQueue
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isExclusive()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Exchange getAlternateExchange()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void checkCapacity(AMQChannel channel)
- {
+ {
}
public ManagedObject getManagedObject()
@@ -343,7 +373,7 @@ public class MockAMQQueue implements AMQQueue
public void setMinimumAlertRepeatGap(long value)
{
-
+
}
public long getCapacity()
@@ -368,7 +398,32 @@ public class MockAMQQueue implements AMQQueue
public void configure(QueueConfiguration config)
{
-
+
+ }
+
+ public PrincipalHolder getPrincipalHolder()
+ {
+ return _principalHolder;
}
+ public void setPrincipalHolder(PrincipalHolder principalHolder)
+ {
+ _principalHolder = principalHolder;
+ }
+
+ public Object getExclusiveOwner()
+ {
+ return _exclusiveOwner;
+ }
+
+ public void setExclusiveOwner(Object exclusiveOwner)
+ {
+ _exclusiveOwner = exclusiveOwner;
+ }
+
+
+ public String getResourceName()
+ {
+ return _name.toString();
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 37f91e7464..3f74cb973b 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -21,8 +21,9 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
public class MockQueueEntry implements QueueEntry
{
@@ -44,14 +45,14 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public void addStateChangeListener(StateChangeListener listener)
+ public boolean isAcquiredBy(Subscription subscription)
{
-
+ return false;
}
- public String debugIdentity()
+ public void addStateChangeListener(StateChangeListener listener)
{
- return null;
+
}
public boolean delete()
@@ -59,17 +60,22 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public void dequeue(StoreContext storeContext) throws FailedDequeueException
+ public void dequeue()
+ {
+
+ }
+
+ public void discard()
{
}
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void routeToAlternate()
{
}
- public void dispose(StoreContext storeContext) throws MessageCleanupException
+ public void dispose()
{
}
@@ -119,70 +125,95 @@ public class MockQueueEntry implements QueueEntry
return false;
}
-
+
public boolean isQueueDeleted()
{
return false;
}
-
+
public boolean isRejectedBy(Subscription subscription)
{
return false;
}
-
+
public void reject()
{
}
-
+
public void reject(Subscription subscription)
{
}
-
+
public void release()
{
}
-
+ public boolean releaseButRetain()
+ {
+ return false;
+ }
+
+
public boolean removeStateChangeListener(StateChangeListener listener)
{
return false;
}
-
- public void requeue(StoreContext storeContext) throws AMQException
+
+ public void requeue()
{
}
-
+ public void requeue(Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
public void setDeliveredToSubscription()
{
}
-
- public void setRedelivered(boolean b)
+
+ public void setRedelivered()
+ {
+
+
+ }
+
+ public AMQMessageHeader getMessageHeader()
{
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+ public boolean isPersistent()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+ public boolean isRedelivered()
+ {
+ return false;
}
-
+
public int compareTo(QueueEntry o)
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
new file mode 100755
index 0000000000..7dc491de4d
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
@@ -0,0 +1,92 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+import java.nio.ByteBuffer;
+
+public class MockStoredMessage implements StoredMessage<MessageMetaData>
+{
+ private long _messageId;
+ private MessageMetaData _metaData;
+ private final ByteBuffer _content;
+
+
+ public MockStoredMessage(long messageId)
+ {
+ this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60));
+ }
+
+ public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb)
+ {
+ _messageId = messageId;
+ _metaData = new MessageMetaData(info, chb, 0);
+ _content = ByteBuffer.allocate(_metaData.getContentSize());
+
+ }
+
+ public MessageMetaData getMetaData()
+ {
+ return _metaData;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ src = src.duplicate();
+ ByteBuffer dst = _content.duplicate();
+ dst.position(offsetInMessage);
+ dst.put(src);
+ }
+
+ public int getContent(int offset, ByteBuffer dst)
+ {
+ ByteBuffer src = _content.duplicate();
+ src.position(offset);
+ src = src.slice();
+ if(dst.remaining() < src.limit())
+ {
+ src.limit(dst.remaining());
+ }
+ dst.put(src);
+ return src.limit();
+ }
+
+ public TransactionLog.StoreFuture flushToStore()
+ {
+ return MessageStore.IMMEDIATE_FUTURE;
+ }
+
+ public void remove()
+ {
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 111e94f9bf..8c6574095b 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -1,6 +1,6 @@
package org.apache.qpid.server.queue;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.qpid.server.queue;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
@@ -31,21 +31,21 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
-import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
public class SimpleAMQQueueTest extends TestCase
{
@@ -59,7 +59,7 @@ public class SimpleAMQQueueTest extends TestCase
protected DirectExchange _exchange = new DirectExchange();
protected MockSubscription _subscription = new MockSubscription();
protected FieldTable _arguments = null;
-
+
MessagePublishInfo info = new MessagePublishInfo()
{
@@ -88,7 +88,7 @@ public class SimpleAMQQueueTest extends TestCase
return null;
}
};
-
+
@Override
protected void setUp() throws Exception
{
@@ -97,7 +97,7 @@ public class SimpleAMQQueueTest extends TestCase
ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance();
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store);
+ _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), env), _store);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -119,51 +119,51 @@ public class SimpleAMQQueueTest extends TestCase
}
catch (IllegalArgumentException e)
{
- assertTrue("Exception was not about missing name",
+ assertTrue("Exception was not about missing name",
e.getMessage().contains("name"));
}
-
+
try {
_queue = new SimpleAMQQueue(_qname, false, _owner, false, null);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
{
- assertTrue("Exception was not about missing vhost",
+ assertTrue("Exception was not about missing vhost",
e.getMessage().contains("Host"));
}
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
_virtualHost, _arguments);
assertNotNull("Queue was not created", _queue);
}
-
+
public void testGetVirtualHost()
{
assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
}
-
+
public void testBinding()
{
try
{
_queue.bind(_exchange, _routingKey, null);
- assertTrue("Routing key was not bound",
+ assertTrue("Routing key was not bound",
_exchange.getBindings().containsKey(_routingKey));
- assertEquals("Queue was not bound to key",
+ assertEquals("Queue was not bound to key",
_exchange.getBindings().get(_routingKey).get(0),
_queue);
- assertEquals("Exchange binding count", 1,
+ assertEquals("Exchange binding count", 1,
_queue.getExchangeBindings().size());
- assertEquals("Wrong exchange bound", _routingKey,
+ assertEquals("Wrong exchange bound", _routingKey,
_queue.getExchangeBindings().get(0).getRoutingKey());
- assertEquals("Wrong exchange bound", _exchange,
+ assertEquals("Wrong exchange bound", _exchange,
_queue.getExchangeBindings().get(0).getExchange());
-
+
_queue.unBind(_exchange, _routingKey, null);
- assertFalse("Routing key was still bound",
+ assertFalse("Routing key was still bound",
_exchange.getBindings().containsKey(_routingKey));
- assertNull("Routing key was not empty",
+ assertNull("Routing key was not empty",
_exchange.getBindings().get(_routingKey));
}
catch (AMQException e)
@@ -171,61 +171,61 @@ public class SimpleAMQQueueTest extends TestCase
assertNull("Unexpected exception", e);
}
}
-
+
public void testSubscription() throws AMQException
{
// Check adding a subscription adds it to the queue
_queue.registerSubscription(_subscription, false);
- assertEquals("Subscription did not get queue", _queue,
+ assertEquals("Subscription did not get queue", _queue,
_subscription.getQueue());
- assertEquals("Queue does not have consumer", 1,
+ assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
- assertEquals("Queue does not have active consumer", 1,
+ assertEquals("Queue does not have active consumer", 1,
_queue.getActiveConsumerCount());
-
+
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-
+ _queue.enqueue(messageA);
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
assertTrue("Subscription still had queue", _subscription.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
- assertFalse("Queue still has active consumer",
+ assertFalse("Queue still has active consumer",
1 == _queue.getActiveConsumerCount());
-
+
AMQMessage messageB = createMessage(new Long (25));
- _queue.enqueue(null, messageB);
- QueueEntry entry = _subscription.getLastSeenEntry();
- assertNull(entry);
+ _queue.enqueue(messageB);
+ assertNull(_subscription.getQueueContext());
+
}
-
+
public void testQueueNoSubscriber() throws AMQException, InterruptedException
{
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
+ _queue.enqueue(messageA);
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
}
public void testExclusiveConsumer() throws AMQException
{
// Check adding an exclusive subscription adds it to the queue
_queue.registerSubscription(_subscription, true);
- assertEquals("Subscription did not get queue", _queue,
+ assertEquals("Subscription did not get queue", _queue,
_subscription.getQueue());
- assertEquals("Queue does not have consumer", 1,
+ assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
- assertEquals("Queue does not have active consumer", 1,
+ assertEquals("Queue does not have active consumer", 1,
_queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-
+ _queue.enqueue(messageA);
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+
// Check we cannot add a second subscriber to the queue
Subscription subB = new MockSubscription();
Exception ex = null;
@@ -235,12 +235,12 @@ public class SimpleAMQQueueTest extends TestCase
}
catch (AMQException e)
{
- ex = e;
+ ex = e;
}
assertNotNull(ex);
assertTrue(ex instanceof AMQException);
- // Check we cannot add an exclusive subscriber to a queue with an
+ // Check we cannot add an exclusive subscriber to a queue with an
// existing subscription
_queue.unregisterSubscription(_subscription);
_queue.registerSubscription(_subscription, false);
@@ -250,35 +250,35 @@ public class SimpleAMQQueueTest extends TestCase
}
catch (AMQException e)
{
- ex = e;
+ ex = e;
}
assertNotNull(ex);
}
-
- public void testAutoDeleteQueue() throws Exception
+
+ public void testAutoDeleteQueue() throws Exception
{
_queue.stop();
- _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
+ _queue = new SimpleAMQQueue(_qname, false, null, true, _virtualHost);
_queue.registerSubscription(_subscription, false);
AMQMessage message = createMessage(new Long(25));
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
_queue.unregisterSubscription(_subscription);
assertTrue("Queue was not deleted when subscription was removed",
_queue.isDeleted());
}
-
+
public void testResend() throws Exception
{
_queue.registerSubscription(_subscription, false);
Long id = new Long(26);
AMQMessage message = createMessage(id);
- _queue.enqueue(null, message);
- QueueEntry entry = _subscription.getLastSeenEntry();
- entry.setRedelivered(true);
+ _queue.enqueue(message);
+ QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
+ entry.setRedelivered();
_queue.resend(entry, _subscription);
-
+
}
-
+
public void testGetFirstMessageId() throws Exception
{
// Create message
@@ -286,7 +286,7 @@ public class SimpleAMQQueueTest extends TestCase
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
// Get message id
Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
@@ -302,7 +302,7 @@ public class SimpleAMQQueueTest extends TestCase
Long messageId = new Long(i);
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
@@ -323,7 +323,7 @@ public class SimpleAMQQueueTest extends TestCase
Long messageId = new Long(i);
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -335,7 +335,7 @@ public class SimpleAMQQueueTest extends TestCase
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
-
+
public void testGetMessagesRangeOnTheQueue() throws Exception
{
for (int i = 1 ; i <= 10; i++)
@@ -344,142 +344,138 @@ public class SimpleAMQQueueTest extends TestCase
Long messageId = new Long(i);
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
}
-
+
// Get non-existent 0th QueueEntry & check returned list was empty
// (the position parameters in this method are indexed from 1)
List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
assertTrue(entries.size() == 0);
-
+
// Check that when 'from' is 0 it is ignored and the range continues from 1
entries = _queue.getMessagesRangeOnTheQueue(0, 2);
assertTrue(entries.size() == 2);
- long msgID = entries.get(0).getMessage().getMessageId();
+ long msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 1L);
- msgID = entries.get(1).getMessage().getMessageId();
+ msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 2L);
// Check that when 'from' is greater than 'to' the returned list is empty
entries = _queue.getMessagesRangeOnTheQueue(5, 4);
assertTrue(entries.size() == 0);
-
- // Get first QueueEntry & check id
+
+ // Get first QueueEntry & check id
entries = _queue.getMessagesRangeOnTheQueue(1, 1);
assertTrue(entries.size() == 1);
- msgID = entries.get(0).getMessage().getMessageId();
+ msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 1L);
-
+
// Get 5th,6th,7th entries and check id's
entries = _queue.getMessagesRangeOnTheQueue(5, 7);
assertTrue(entries.size() == 3);
- msgID = entries.get(0).getMessage().getMessageId();
+ msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 5L);
- msgID = entries.get(1).getMessage().getMessageId();
+ msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 6L);
- msgID = entries.get(2).getMessage().getMessageId();
+ msgID = entries.get(2).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 7L);
-
+
// Get 10th QueueEntry & check id
entries = _queue.getMessagesRangeOnTheQueue(10, 10);
assertTrue(entries.size() == 1);
- msgID = entries.get(0).getMessage().getMessageId();
+ msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 10L);
-
+
// Get non-existent 11th QueueEntry & check returned set was empty
entries = _queue.getMessagesRangeOnTheQueue(11, 11);
assertTrue(entries.size() == 0);
-
+
// Get 9th,10th, and non-existent 11th entries & check result is of size 2 with correct IDs
entries = _queue.getMessagesRangeOnTheQueue(9, 11);
assertTrue(entries.size() == 2);
- msgID = entries.get(0).getMessage().getMessageId();
+ msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 9L);
- msgID = entries.get(1).getMessage().getMessageId();
+ msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 10L);
}
-
+
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
+ final IncomingMessage msg = new IncomingMessage(info);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
- ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
-
+
+ final ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
// Send persistent message
+
qs.add(_queue);
- msg.enqueue(qs);
- msg.routingComplete(_store, new MessageHandleFactory());
- _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
-
+ MessageMetaData metaData = msg.headersReceived();
+ StoredMessage handle = _store.addMessage(metaData);
+ msg.setStoredMessage(handle);
+
+
+ ServerTransaction txn = new AutoCommitTransaction(_store);
+
+ txn.enqueue(qs, msg, new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ msg.enqueue(qs);
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+
+
+
// Check that it is enqueued
AMQQueue data = _store.getMessages().get(1L);
- assertNotNull(data);
-
+ assertNull(data);
+
// Dequeue message
MockQueueEntry entry = new MockQueueEntry();
- AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext);
-
+ AMQMessage amqmsg = new AMQMessage(handle);
+
entry.setMessage(amqmsg);
- _queue.dequeue(null, entry);
-
+ _queue.dequeue(entry);
+
// Check that it is dequeued
data = _store.getMessages().get(1L);
assertNull(data);
}
- // FIXME: move this to somewhere useful
- private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
- {
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- null,
- false);
- try
- {
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
- publishBody,
- new ContentHeaderBody()
- {
- public int getSize()
- {
- return 1;
- }
- });
- }
- catch (AMQException e)
- {
- // won't happen
- }
-
-
- return amqMessageHandle;
- }
-
public class TestMessage extends AMQMessage
{
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody)
throws AMQException
{
- super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
+ this(tag, messageId, publishBody, new ContentHeaderBody(1, 1, new BasicContentHeaderProperties(), 0));
+
+ }
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, ContentHeaderBody chb)
+ throws AMQException
+ {
+ super(new MockStoredMessage(messageId, publishBody, chb));
_tag = tag;
}
-
public boolean incrementReference()
{
_count++;
return true;
}
- public void decrementReference(StoreContext context)
+ public void decrementReference()
{
_count--;
}
@@ -489,10 +485,10 @@ public class SimpleAMQQueueTest extends TestCase
assertEquals("Wrong count for message with tag " + _tag, expected, _count);
}
}
-
+
protected AMQMessage createMessage(Long id) throws AMQException
{
- AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
+ AMQMessage messageA = new TestMessage(id, id, info);
return messageA;
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index 02de09c91f..ba94af5936 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -27,8 +27,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.AMQException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class SimpleAMQQueueThreadPoolTest extends TestCase
{
@@ -47,7 +45,7 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase
assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount());
-
+
queue.stop();
assertEquals("References not decreased", initialCount , ReferenceCountingExecutorService.getInstance().getReferenceCount());
@@ -55,6 +53,6 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase
finally
{
ApplicationRegistry.remove();
- }
+ }
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
index 8102360ce0..44f9861e8d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
@@ -57,11 +57,11 @@ public class ACLManagerTest extends TestCase
BufferedWriter out = new BufferedWriter(new FileWriter(tmpFile));
out.write("<security><queueDenier>notyet</queueDenier><exchangeDenier>yes</exchangeDenier></security>");
out.close();
-
+
_conf = new SecurityConfiguration(new XMLConfiguration(tmpFile));
-
+
// Create ACLManager
-
+
_pluginManager = new MockPluginManager("");
_authzManager = new ACLManager(_conf, _pluginManager);
@@ -79,15 +79,15 @@ public class ACLManagerTest extends TestCase
// Correctly Close the AR we created
ApplicationRegistry.remove();
super.tearDown();
- }
-
+ }
+
public void testACLManagerConfigurationPluginManager() throws Exception
{
AMQQueue queue = new MockAMQQueue("notyet");
AMQQueue otherQueue = new MockAMQQueue("other");
-
+
assertFalse(_authzManager.authoriseDelete(_session, queue));
-
+
// This should only be denied if the config hasn't been correctly passed in
assertTrue(_authzManager.authoriseDelete(_session, otherQueue));
assertTrue(_authzManager.authorisePurge(_session, queue));
@@ -96,11 +96,11 @@ public class ACLManagerTest extends TestCase
public void testACLManagerConfigurationPluginManagerACLPlugin() throws ConfigurationException
{
_authzManager = new ACLManager(_conf, _pluginManager, ExchangeDenier.FACTORY);
-
+
Exchange exchange = null;
assertFalse(_authzManager.authoriseDelete(_session, exchange));
}
-
+
public void testConfigurePlugins() throws ConfigurationException
{
Configuration hostConfig = new PropertiesConfiguration();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
index 317dee2b47..37a0fd7fc3 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
@@ -14,16 +14,16 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.security.access;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.PrincipalHolder;
public class ExchangeDenier extends AllowAll
{
@@ -40,9 +40,9 @@ public class ExchangeDenier extends AllowAll
return new ExchangeDenier();
}
};
-
+
@Override
- public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
+ public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange)
{
return AuthzResult.DENIED;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
index 88100dc25f..73e9dac775 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
@@ -27,14 +27,13 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.amqp_0_9.ExchangeDeclareBodyImpl;
-import org.apache.qpid.framing.amqp_0_9.QueueDeclareBodyImpl;
import org.apache.qpid.framing.amqp_8_0.QueueBindBodyImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -43,7 +42,7 @@ public class PrincipalPermissionsTest extends TestCase
private String _user = "user";
private PrincipalPermissions _perms;
-
+
// Common things that are passed to frame constructors
private AMQShortString _queueName = new AMQShortString(this.getClass().getName()+"queue");
private AMQShortString _tempQueueName = new AMQShortString(this.getClass().getName()+"tempqueue");
@@ -65,18 +64,18 @@ public class PrincipalPermissionsTest extends TestCase
private AMQQueue _temporaryQueue;
private Boolean _temporary = false;
private Boolean _ownQueue = false;
-
+
@Override
public void setUp()
{
//Highlight that this test will cause a new AR to be created
- ApplicationRegistry.getInstance();
+ ApplicationRegistry.getInstance();
_perms = new PrincipalPermissions(_user);
- try
+ try
{
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
+ _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration("test", env));
_exchange = DirectExchange.TYPE.newInstance(_virtualHost, _exchangeName, _durable, _ticket, _autoDelete);
_queue = AMQQueueFactory.createAMQQueueImpl(_queueName, false, _owner , false, _virtualHost, _arguments);
_temporaryQueue = AMQQueueFactory.createAMQQueueImpl(_tempQueueName, false, _owner , true, _virtualHost, _arguments);
@@ -132,27 +131,29 @@ public class PrincipalPermissionsTest extends TestCase
_perms.grant(Permission.CREATEQUEUE, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CREATEQUEUE, authArgs));
}
-
+
// FIXME disabled, this fails due to grant putting the grant into the wrong map QPID-1598
public void disableTestExchangeCreate()
{
- ExchangeDeclareBodyImpl exchangeDeclare =
+ ExchangeDeclareBodyImpl exchangeDeclare =
new ExchangeDeclareBodyImpl(_ticket, _exchangeName, _exchangeType, _passive, _durable,
_autoDelete, _internal, _nowait, _arguments);
Object[] authArgs = new Object[]{exchangeDeclare};
Object[] grantArgs = new Object[]{_exchangeName, _exchangeType};
-
+
assertEquals(AuthzResult.DENIED, _perms.authorise(Permission.CREATEEXCHANGE, authArgs));
_perms.grant(Permission.CREATEEXCHANGE, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CREATEEXCHANGE, authArgs));
}
-
+
public void testConsume()
{
Object[] authArgs = new Object[]{_queue};
Object[] grantArgs = new Object[]{_queueName, _ownQueue};
- assertEquals(AuthzResult.DENIED,_perms.authorise(Permission.CONSUME, authArgs));
+ /* FIXME: This throws a null pointer exception QPID-1599
+ * assertFalse(_perms.authorise(Permission.CONSUME, authArgs));
+ */
_perms.grant(Permission.CONSUME, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CONSUME, authArgs));
}
@@ -166,7 +167,7 @@ public class PrincipalPermissionsTest extends TestCase
_perms.grant(Permission.PUBLISH, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.PUBLISH, authArgs));
}
-
+
public void testVhostAccess()
{
//Tests that granting a user Virtualhost level access allows all authorisation requests
diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java
index 5497f0ae44..5b76bf7532 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java
@@ -14,21 +14,20 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.security.access;
import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.PrincipalHolder;
public class QueueDenier extends AllowAll
{
-
+
public static final ACLPluginFactory FACTORY = new ACLPluginFactory()
{
public boolean supportsTag(String name)
@@ -43,18 +42,18 @@ public class QueueDenier extends AllowAll
return plugin;
}
};
-
+
private String _queueName = "";
-
+
@Override
- public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
+ public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue)
{
if (!(queue.getName().toString().equals(_queueName)))
{
return AuthzResult.ALLOWED;
- }
- else
+ }
+ else
{
return AuthzResult.DENIED;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 5802655cfc..5169676dae 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -29,17 +29,13 @@ import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.queue.ExchangeBinding;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -103,7 +99,7 @@ public class MessageStoreTest extends TestCase
try
{
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), configuration));
+ _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), configuration));
ApplicationRegistry.getInstance().getVirtualHostRegistry().registerVirtualHost(_virtualHost);
}
catch (Exception e)
@@ -169,7 +165,7 @@ public class MessageStoreTest extends TestCase
Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
bindAllTopicQueuesToExchange(topicExchange, topicRouting);
- //Send Message To NonDurable direct Exchange = persistent
+ //Send Message To NonDurable direct Exchange = persistent
sendMessageOnExchange(nonDurableExchange, directRouting, true);
// and non-persistent
sendMessageOnExchange(nonDurableExchange, directRouting, false);
@@ -344,22 +340,11 @@ public class MessageStoreTest extends TestCase
MessagePublishInfo messageInfo = new TestMessagePublishInfo(directExchange, false, false, routingKey);
- IncomingMessage currentMessage = null;
+ final IncomingMessage currentMessage;
- try
- {
- currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(),
- messageInfo,
- new NonTransactionalContext(_virtualHost.getMessageStore(),
- new StoreContext(), null, null),
- new InternalTestProtocolSession(_virtualHost));
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- currentMessage.setMessageStore(_virtualHost.getMessageStore());
+ currentMessage = new IncomingMessage(messageInfo);
+
currentMessage.setExchange(directExchange);
ContentHeaderBody headerBody = new ContentHeaderBody();
@@ -379,35 +364,42 @@ public class MessageStoreTest extends TestCase
currentMessage.setExpiration();
- try
- {
- currentMessage.route();
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ MessageMetaData mmd = currentMessage.headersReceived();
+ currentMessage.setStoredMessage(_virtualHost.getMessageStore().addMessage(mmd));
+
+ currentMessage.route();
+
- try
- {
- currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
// check and deliver if header says body length is zero
if (currentMessage.allContentReceived())
{
- try
- {
- currentMessage.deliverToQueues();
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ // TODO Deliver to queues
+ ServerTransaction trans = new AutoCommitTransaction(_virtualHost.getMessageStore());
+ final List<AMQQueue> destinationQueues = currentMessage.getDestinationQueues();
+ trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() {
+ public void postCommit()
+ {
+ try
+ {
+ AMQMessage message = new AMQMessage(currentMessage.getStoredMessage());
+
+ for(AMQQueue queue : destinationQueues)
+ {
+ QueueEntry entry = queue.enqueue(message);
+ }
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
}
}
@@ -496,14 +488,7 @@ public class MessageStoreTest extends TestCase
fail(e.getMessage());
}
- try
- {
- _virtualHost.getQueueRegistry().registerQueue(queue);
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ _virtualHost.getQueueRegistry().registerQueue(queue);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index fd6789f5ce..9c12242a07 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -25,14 +25,15 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.logging.LogSubject;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import java.nio.ByteBuffer;
/**
* A message store that does nothing. Designed to be used in tests that do not want to use any message store
@@ -45,8 +46,19 @@ public class SkeletonMessageStore implements MessageStore
public void configure(String base, Configuration config) throws Exception
{
}
-
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -55,7 +67,12 @@ public class SkeletonMessageStore implements MessageStore
{
}
- public void removeMessage(StoreContext s, Long messageId)
+ public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeMessage(Long messageId)
{
}
@@ -85,24 +102,10 @@ public class SkeletonMessageStore implements MessageStore
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
- }
-
- public void beginTran(StoreContext s) throws AMQException
- {
}
- public boolean inTran(StoreContext sc)
- {
- return false;
- }
- public void commitTran(StoreContext storeContext) throws AMQException
- {
- }
- public void abortTran(StoreContext storeContext) throws AMQException
- {
- }
public List<AMQQueue> createQueues() throws AMQException
{
@@ -114,22 +117,26 @@ public class SkeletonMessageStore implements MessageStore
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ public void storeContentBodyChunk(
+ Long messageId,
+ int index,
+ ContentChunk contentBody,
+ boolean lastContentBody) throws AMQException
{
}
- public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException
{
}
- public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
{
return null;
}
- public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
+ public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException
{
return null;
}
@@ -139,18 +146,75 @@ public class SkeletonMessageStore implements MessageStore
return false;
}
- public void removeQueue(final AMQQueue queue) throws AMQException
+ public void storeMessageHeader(Long messageNumber, ServerMessage message)
{
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ public void storeContent(Long messageNumber, long offset, ByteBuffer body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public ServerMessage getMessage(Long messageNumber)
{
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeQueue(final AMQQueue queue) throws AMQException
+ {
+
+ }
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public Transaction newTransaction()
{
+ return new Transaction()
+ {
+
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void commitTran() throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ public StoreFuture commitTranAsync() throws AMQException
+ {
+ return new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+
+ }
+ };
+ }
+
+ public void abortTran() throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ };
}
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
index 4e48435962..4dea13d391 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
@@ -20,32 +20,79 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.abstraction.ContentChunk;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
+import java.nio.ByteBuffer;
/**
* Adds some extra methods to the memory message store for testing purposes.
*/
public class TestMemoryMessageStore extends MemoryMessageStore
{
+ private AtomicInteger _messageCount = new AtomicInteger(0);
+
+
public TestMemoryMessageStore()
{
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
}
- public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+ @Override
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
{
- return _metaDataMap;
+ return new TestableStoredMessage(super.addMessage(metaData));
}
- public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
+ public int getMessageCount()
{
- return _contentBodyMap;
+ return _messageCount.get();
+ }
+
+ private class TestableStoredMessage implements StoredMessage
+ {
+ private final StoredMessage _storedMessage;
+
+ public TestableStoredMessage(StoredMessage storedMessage)
+ {
+ _messageCount.incrementAndGet();
+ _storedMessage = storedMessage;
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ return _storedMessage.getMetaData();
+ }
+
+ public long getMessageNumber()
+ {
+ return _storedMessage.getMessageNumber();
+ }
+
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ _storedMessage.addContent(offsetInMessage, src);
+ }
+
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return _storedMessage.getContent(offsetInMessage, dst);
+ }
+
+ public StoreFuture flushToStore()
+ {
+ return _storedMessage.flushToStore();
+ }
+
+ public void remove()
+ {
+ _storedMessage.remove();
+ _messageCount.decrementAndGet();
+ }
+
}
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 2346660d25..c5b1ba7868 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -26,9 +26,8 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
/**
* Tests that reference counting works correctly with AMQMessage and the message store
@@ -37,13 +36,12 @@ public class TestReferenceCounting extends TestCase
{
private TestMemoryMessageStore _store;
- private StoreContext _storeContext = new StoreContext();
-
protected void setUp() throws Exception
{
super.setUp();
_store = new TestMemoryMessageStore();
+
}
/**
@@ -83,11 +81,12 @@ public class TestReferenceCounting extends TestCase
};
- final long messageId = _store.getNewMessageId();
- AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
- messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb);
- AMQMessage message = new AMQMessage(messageHandle,
- _storeContext,info);
+
+ MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+ StoredMessage storedMessage = _store.addMessage(mmd);
+
+
+ AMQMessage message = new AMQMessage(storedMessage);
message = message.takeReference();
@@ -95,9 +94,9 @@ public class TestReferenceCounting extends TestCase
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
- assertEquals(1, _store.getMessageMetaDataMap().size());
- message.decrementReference(_storeContext);
- assertEquals(1, _store.getMessageMetaDataMap().size());
+ assertEquals(1, _store.getMessageCount());
+ message.decrementReference();
+ assertEquals(1, _store.getMessageCount());
}
private ContentHeaderBody createPersistentContentHeader()
@@ -141,25 +140,24 @@ public class TestReferenceCounting extends TestCase
}
};
- final Long messageId = _store.getNewMessageId();
final ContentHeaderBody chb = createPersistentContentHeader();
- AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
- messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb);
- AMQMessage message = new AMQMessage(messageHandle,
- _storeContext,
- info);
-
-
+
+ MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+ StoredMessage storedMessage = _store.addMessage(mmd);
+
+ AMQMessage message = new AMQMessage(storedMessage);
+
+
message = message.takeReference();
// we call routing complete to set up the handle
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
- assertEquals(1, _store.getMessageMetaDataMap().size());
+ assertEquals(1, _store.getMessageCount());
message = message.takeReference();
- message.decrementReference(_storeContext);
- assertEquals(1, _store.getMessageMetaDataMap().size());
+ message.decrementReference();
+ assertEquals(1, _store.getMessageCount());
}
public static junit.framework.Test suite()
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 9146fe88ae..ab8c1e7c9c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -22,14 +22,15 @@ package org.apache.qpid.server.store;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.abstraction.ContentChunk;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashMap;
import java.util.List;
+import java.nio.ByteBuffer;
/**
* Adds some extra methods to the memory message store for testing purposes.
@@ -39,6 +40,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
MemoryMessageStore _mms = null;
private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
+ private AtomicInteger _messageCount = new AtomicInteger(0);
public TestableMemoryMessageStore(MemoryMessageStore mms)
{
@@ -47,46 +49,111 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
public TestableMemoryMessageStore()
{
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+
+ }
+
+
+
+
+ @Override
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
+ {
+ return new TestableStoredMessage(super.addMessage(metaData));
+ }
+
+ public int getMessageCount()
+ {
+ return _messageCount.get();
}
- public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+ private class TestableTransaction implements Transaction
{
- if (_mms != null)
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
{
- return _mms._metaDataMap;
+ getMessages().put(messageId, (AMQQueue)queue);
}
- else
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
{
- return _metaDataMap;
+ getMessages().remove(messageId);
}
- }
- public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
- {
- if (_mms != null)
+ public void commitTran() throws AMQException
{
- return _mms._contentBodyMap;
}
- else
+
+ public StoreFuture commitTranAsync() throws AMQException
+ {
+ return new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+
+ }
+ };
+ }
+
+ public void abortTran() throws AMQException
{
- return _contentBodyMap;
}
- }
-
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
- {
- getMessages().put(messageId, queue);
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+
+ @Override
+ public Transaction newTransaction()
{
- getMessages().remove(messageId);
+ return new TestableTransaction();
}
public HashMap<Long, AMQQueue> getMessages()
{
return _messages;
}
+
+ private class TestableStoredMessage implements StoredMessage
+ {
+ private final StoredMessage _storedMessage;
+
+ public TestableStoredMessage(StoredMessage storedMessage)
+ {
+ _messageCount.incrementAndGet();
+ _storedMessage = storedMessage;
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ return _storedMessage.getMetaData();
+ }
+
+ public long getMessageNumber()
+ {
+ return _storedMessage.getMessageNumber();
+ }
+
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ _storedMessage.addContent(offsetInMessage, src);
+ }
+
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return _storedMessage.getContent(offsetInMessage, dst);
+ }
+
+ public StoreFuture flushToStore()
+ {
+ return _storedMessage.flushToStore();
+ }
+
+ public void remove()
+ {
+ _storedMessage.remove();
+ _messageCount.decrementAndGet();
+ }
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index a3274a3a05..97ba143bdf 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -42,11 +42,15 @@ public class MockSubscription implements Subscription
private AMQShortString tag = new AMQShortString("mocktag");
private AMQQueue queue = null;
private StateListener _listener = null;
- private QueueEntry lastSeen = null;
+ private AMQQueue.Context _queueContext = null;
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
+
+
private static final AtomicLong idGenerator = new AtomicLong(0);
// Create a simple ID that increments for ever new Subscription
private final long _subscriptionID = idGenerator.getAndIncrement();
@@ -81,14 +85,19 @@ public class MockSubscription implements Subscription
return _subscriptionID;
}
- public QueueEntry getLastSeenEntry()
+ public AMQQueue.Context getQueueContext()
{
- return lastSeen;
+ return _queueContext;
}
public SubscriptionAcquiredState getOwningState()
{
- return new QueueEntry.SubscriptionAcquiredState(this);
+ return _owningState;
+ }
+
+ public QueueEntry.SubscriptionAssignedState getAssignedState()
+ {
+ return _assignedState;
}
public LogActor getLogActor()
@@ -116,6 +125,21 @@ public class MockSubscription implements Subscription
return true;
}
+ public void confirmAutoClose()
+ {
+
+ }
+
+ public void set(String key, Object value)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object get(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isAutoClose()
{
return false;
@@ -131,6 +155,16 @@ public class MockSubscription implements Subscription
return _closed;
}
+ public boolean acquires()
+ {
+ return true;
+ }
+
+ public boolean seesRequeues()
+ {
+ return true;
+ }
+
public boolean isSuspended()
{
return false;
@@ -149,25 +183,23 @@ public class MockSubscription implements Subscription
{
}
+ public void onDequeue(QueueEntry queueEntry)
+ {
+ }
+
public void restoreCredit(QueueEntry queueEntry)
{
+ //To change body of implemented methods use File | Settings | File Templates.
}
public void send(QueueEntry msg) throws AMQException
{
- lastSeen = msg;
messages.add(msg);
}
- public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
+ public void setQueueContext(AMQQueue.Context queueContext)
{
- boolean result = false;
- if (expected != null)
- {
- result = (expected.equals(lastSeen));
- }
- lastSeen = newValue;
- return result;
+ _queueContext = queueContext;
}
public void setQueue(AMQQueue queue, boolean exclusive)
@@ -175,6 +207,10 @@ public class MockSubscription implements Subscription
this.queue = queue;
}
+ public void setNoLocal(boolean noLocal)
+ {
+ }
+
public void setStateListener(StateListener listener)
{
this._listener = listener;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
deleted file mode 100644
index 84d3d313d1..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.txn;
-
-import junit.framework.TestCase;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-
-import java.util.LinkedList;
-import java.util.NoSuchElementException;
-
-public class TxnBufferTest extends TestCase
-{
- private final LinkedList<MockOp> ops = new LinkedList<MockOp>();
-
- public void testCommit() throws AMQException
- {
- MockStore store = new MockStore();
-
- TxnBuffer buffer = new TxnBuffer();
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- //check relative ordering
- MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit();
- buffer.enlist(op);
- buffer.enlist(op);
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
-
- buffer.commit(null);
-
- validateOps();
- store.validate();
- }
-
- public void testRollback() throws AMQException
- {
- MockStore store = new MockStore();
-
- TxnBuffer buffer = new TxnBuffer();
- buffer.enlist(new MockOp().expectRollback());
- buffer.enlist(new MockOp().expectRollback());
- buffer.enlist(new MockOp().expectRollback());
-
- buffer.rollback(null);
-
- validateOps();
- store.validate();
- }
-
- public void testCommitWithFailureDuringPrepare() throws AMQException
- {
- MockStore store = new MockStore();
- store.beginTran(null);
-
- TxnBuffer buffer = new TxnBuffer();
- buffer.enlist(new StoreMessageOperation(store));
- buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
- buffer.enlist(new TxnTester(store));
- buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare());
- buffer.enlist(new FailedPrepare());
- buffer.enlist(new MockOp());
-
- try
- {
- buffer.commit(null);
- }
- catch (NoSuchElementException e)
- {
-
- }
-
- validateOps();
- store.validate();
- }
-
- public void testCommitWithPersistance() throws AMQException
- {
- MockStore store = new MockStore();
- store.beginTran(null);
- store.expectCommit();
-
- TxnBuffer buffer = new TxnBuffer();
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.enlist(new StoreMessageOperation(store));
- buffer.enlist(new TxnTester(store));
-
- buffer.commit(null);
- validateOps();
- store.validate();
- }
-
- private void validateOps()
- {
- for (MockOp op : ops)
- {
- op.validate();
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TxnBufferTest.class);
- }
-
- class MockOp implements TxnOp
- {
- final Object PREPARE = "PREPARE";
- final Object COMMIT = "COMMIT";
- final Object UNDO_PREPARE = "UNDO_PREPARE";
- final Object ROLLBACK = "ROLLBACK";
-
- private final LinkedList expected = new LinkedList();
-
- MockOp()
- {
- ops.add(this);
- }
-
- public void prepare(StoreContext context)
- {
- assertEquals(expected.removeLast(), PREPARE);
- }
-
- public void commit(StoreContext context)
- {
- assertEquals(expected.removeLast(), COMMIT);
- }
-
- public void undoPrepare()
- {
- assertEquals(expected.removeLast(), UNDO_PREPARE);
- }
-
- public void rollback(StoreContext context)
- {
- assertEquals(expected.removeLast(), ROLLBACK);
- }
-
- private MockOp expect(Object optype)
- {
- expected.addFirst(optype);
- return this;
- }
-
- MockOp expectPrepare()
- {
- return expect(PREPARE);
- }
-
- MockOp expectCommit()
- {
- return expect(COMMIT);
- }
-
- MockOp expectUndoPrepare()
- {
- return expect(UNDO_PREPARE);
- }
-
- MockOp expectRollback()
- {
- return expect(ROLLBACK);
- }
-
- void validate()
- {
- assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
- }
-
- void clear()
- {
- expected.clear();
- }
- }
-
- class MockStore extends TestMemoryMessageStore
- {
- final Object BEGIN = "BEGIN";
- final Object ABORT = "ABORT";
- final Object COMMIT = "COMMIT";
-
- private final LinkedList expected = new LinkedList();
- private boolean inTran;
-
- public void beginTran(StoreContext context) throws AMQException
- {
- inTran = true;
- }
-
- public void commitTran(StoreContext context) throws AMQException
- {
- assertEquals(expected.removeLast(), COMMIT);
- inTran = false;
- }
-
- public void abortTran(StoreContext context) throws AMQException
- {
- assertEquals(expected.removeLast(), ABORT);
- inTran = false;
- }
-
- public boolean inTran(StoreContext context)
- {
- return inTran;
- }
-
- private MockStore expect(Object optype)
- {
- expected.addFirst(optype);
- return this;
- }
-
- MockStore expectBegin()
- {
- return expect(BEGIN);
- }
-
- MockStore expectCommit()
- {
- return expect(COMMIT);
- }
-
- MockStore expectAbort()
- {
- return expect(ABORT);
- }
-
- void clear()
- {
- expected.clear();
- }
-
- void validate()
- {
- assertEquals("Expected ops were not all invoked", new LinkedList(), expected);
- }
- }
-
- class NullOp implements TxnOp
- {
- public void prepare(StoreContext context) throws AMQException
- {
- }
- public void commit(StoreContext context)
- {
- }
- public void undoPrepare()
- {
- }
- public void rollback(StoreContext context)
- {
- }
- }
-
- class FailedPrepare extends NullOp
- {
- public void prepare() throws AMQException
- {
- throw new AMQException(null, "Fail!", null);
- }
- }
-
- class TxnTester extends NullOp
- {
- private final MessageStore store;
-
- private final StoreContext context = new StoreContext();
-
- TxnTester(MessageStore store)
- {
- this.store = store;
- }
-
- public void prepare() throws AMQException
- {
- assertTrue("Expected prepare to be performed under txn", store.inTran(context));
- }
-
- public void commit()
- {
- assertTrue("Expected commit not to be performed under txn", !store.inTran(context));
- }
- }
-
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index f2096df9d1..906c769f9c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -31,7 +31,6 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
@@ -41,12 +40,10 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.util.MockChannel;
-import java.security.Principal;
public class InternalBrokerBaseCase extends TestCase
{
@@ -55,7 +52,6 @@ public class InternalBrokerBaseCase extends TestCase
protected MockChannel _channel;
protected InternalTestProtocolSession _session;
protected VirtualHost _virtualHost;
- protected StoreContext _storeContext = new StoreContext();
protected AMQQueue _queue;
protected AMQShortString QUEUE_NAME;
@@ -97,7 +93,7 @@ public class InternalBrokerBaseCase extends TestCase
protected void checkStoreContents(int messageCount)
{
- assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size());
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount());
//The above publish message is sufficiently small not to fit in the header so no Body is required.
//assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size());
@@ -114,11 +110,7 @@ public class InternalBrokerBaseCase extends TestCase
e.printStackTrace();
fail(e.getMessage());
}
- catch (ConsumerTagNotUniqueException e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
//Keep the compiler happy
return null;
}
@@ -137,11 +129,7 @@ public class InternalBrokerBaseCase extends TestCase
e.printStackTrace();
fail(e.getMessage());
}
- catch (ConsumerTagNotUniqueException e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
//Keep the compiler happy
return null;
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index 6b8201eefb..3d37412376 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,8 +36,9 @@ import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.plugins.AllowAll;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Arrays;
import java.util.Collection;
@@ -76,7 +77,7 @@ public class NullApplicationRegistry extends ApplicationRegistry
_virtualHostRegistry = new VirtualHostRegistry(this);
PropertiesConfiguration vhostProps = new PropertiesConfiguration();
VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
- VirtualHost dummyHost = new VirtualHost(hostConfig);
+ VirtualHost dummyHost = new VirtualHostImpl(hostConfig);
_virtualHostRegistry.registerVirtualHost(dummyHost);
_virtualHostRegistry.setDefaultVirtualHostName("test");
_pluginManager = new PluginManager("");
@@ -97,7 +98,7 @@ public class NullApplicationRegistry extends ApplicationRegistry
try
{
- super.close();
+ super.close();
}
finally
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index 7b7c86bb80..bb338458f1 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,8 +35,9 @@ import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabase
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.logging.RootMessageLoggerImpl;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
@@ -60,7 +61,7 @@ public class TestApplicationRegistry extends ApplicationRegistry
private ServerConfiguration _config;
-
+
public TestApplicationRegistry() throws ConfigurationException
{
super(new ServerConfiguration(new PropertiesConfiguration()));
@@ -96,10 +97,10 @@ public class TestApplicationRegistry extends ApplicationRegistry
_messageStore = new TestableMemoryMessageStore();
_virtualHostRegistry = new VirtualHostRegistry(this);
-
+
PropertiesConfiguration vhostProps = new PropertiesConfiguration();
VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
- _vHost = new VirtualHost(hostConfig, _messageStore);
+ _vHost = new VirtualHostImpl(hostConfig, _messageStore);
_virtualHostRegistry.registerVirtualHost(_vHost);
@@ -152,7 +153,7 @@ public class TestApplicationRegistry extends ApplicationRegistry
CurrentActor.remove();
}
}
-
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java b/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
index 447d09429d..9bd1e7c5e1 100644
--- a/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
+++ b/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
@@ -35,9 +35,6 @@ public class MockChannel extends AMQChannel
super(session, channelId, messageStore);
}
- public Subscription getSubscription(AMQShortString subscription)
- {
- return _tag2SubscriptionMap.get(subscription);
- }
-
+
+
}