summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-08-18 09:13:02 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-08-18 09:13:02 +0000
commitab6fffad2230229810c995253a6f021e42e03aaf (patch)
treefdee7a99130750af8d7c71d25c358a282e17e405 /qpid/java/broker/src/test
parent35b5c7fd8c761d41caa88505e8c2fee319e92a84 (diff)
downloadqpid-python-ab6fffad2230229810c995253a6f021e42e03aaf.tar.gz
QPID-5081 : [Java Broker] Refactor Queue Creation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1515079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java18
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java31
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java316
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java28
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java94
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java93
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java45
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java2
12 files changed, 441 insertions, 212 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index de6d036f29..dc9ddf7b32 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -113,17 +113,17 @@ public class VirtualHostConfigurationTest extends QpidTestCase
VirtualHost vhost = createVirtualHost(getName());
// Check that atest was a priority queue with 5 priorities
- AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
+ AMQQueue atest = vhost.getQueue("atest");
assertTrue(atest instanceof AMQPriorityQueue);
assertEquals(5, ((AMQPriorityQueue) atest).getPriorities());
// Check that ptest was a priority queue with 10 priorities
- AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest"));
+ AMQQueue ptest = vhost.getQueue("ptest");
assertTrue(ptest instanceof AMQPriorityQueue);
assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities());
// Check that ntest wasn't a priority queue
- AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest"));
+ AMQQueue ntest = vhost.getQueue("ntest");
assertFalse(ntest instanceof AMQPriorityQueue);
}
@@ -146,13 +146,13 @@ public class VirtualHostConfigurationTest extends QpidTestCase
VirtualHost vhost = createVirtualHost(getName());
// Check specifically configured values
- AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
+ AMQQueue aTest = vhost.getQueue("atest");
assertEquals(4, aTest.getMaximumQueueDepth());
assertEquals(5, aTest.getMaximumMessageSize());
assertEquals(6, aTest.getMaximumMessageAge());
// Check default values
- AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest"));
+ AMQQueue bTest = vhost.getQueue("btest");
assertEquals(1, bTest.getMaximumQueueDepth());
assertEquals(2, bTest.getMaximumMessageSize());
assertEquals(3, bTest.getMaximumMessageAge());
@@ -214,10 +214,10 @@ public class VirtualHostConfigurationTest extends QpidTestCase
assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled());
// Get queues
- AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles"));
- AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle"));
- AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2"));
- AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0"));
+ AMQQueue biggles = test.getQueue("biggles");
+ AMQQueue beetle = test.getQueue("beetle");
+ AMQQueue r2d2 = extra.getQueue("r2d2");
+ AMQQueue c3p0 = extra.getQueue("c3p0");
// Disabled specifically for this queue, overriding virtualhost setting
assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index f2a64381df..7adec3d595 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -75,7 +75,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testNoRoute() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
@@ -86,7 +87,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testDirectMatch() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
@@ -108,7 +110,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testStarMatch() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
@@ -139,7 +141,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashMatch() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
@@ -190,7 +192,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMidHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
routeMessage("a.c.d.b",0l);
@@ -215,7 +218,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMatchafterHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
@@ -253,7 +257,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashAfterHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -274,7 +279,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -295,7 +301,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testSubMatchFails() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null));
int queueCount = routeMessage("a.b.c",0l);
@@ -326,7 +333,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMoreRouting() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
@@ -339,7 +347,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMoreQueue() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index 693fd16b9f..a468fa072b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.queue;
+import java.util.Collections;
import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import java.util.ArrayList;
+import org.apache.qpid.server.model.Queue;
import static org.mockito.Mockito.when;
@@ -38,9 +38,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
@Override
public void setUp() throws Exception
{
- FieldTable arguments = new FieldTable();
- arguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 3);
- setArguments(arguments);
+ setArguments(Collections.singletonMap(Queue.PRIORITIES,(Object)3));
super.setUp();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index c8e0e53d75..62c9b4c46d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -20,91 +20,213 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.configuration.XMLConfiguration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class AMQQueueFactoryTest extends QpidTestCase
{
private QueueRegistry _queueRegistry;
private VirtualHost _virtualHost;
- private Broker _broker;
+ private AMQQueueFactory _queueFactory;
+ private List<AMQQueue> _queues;
+ private QueueConfiguration _queueConfiguration;
@Override
public void setUp() throws Exception
{
super.setUp();
- BrokerTestHelper.setUp();
- XMLConfiguration configXml = new XMLConfiguration();
- configXml.addProperty("store.class", TestableMemoryMessageStore.class.getName());
+ _queues = new ArrayList<AMQQueue>();
+ _virtualHost = mock(VirtualHost.class);
- _broker = BrokerTestHelper.createBrokerMock();
- if (getName().equals("testDeadLetterQueueDoesNotInheritDLQorMDCSettings"))
- {
- when(_broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(5);
- when(_broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(true);
- }
+ VirtualHostConfiguration vhostConfig = mock(VirtualHostConfiguration.class);
+ when(_virtualHost.getConfiguration()).thenReturn(vhostConfig);
+ _queueConfiguration = mock(QueueConfiguration.class);
+ when(vhostConfig.getQueueConfiguration(anyString())).thenReturn(_queueConfiguration);
+ LogActor logActor = mock(LogActor.class);
+ CurrentActor.set(logActor);
+ RootMessageLogger rootLogger = mock(RootMessageLogger.class);
+ when(logActor.getRootMessageLogger()).thenReturn(rootLogger);
+ DurableConfigurationStore store = mock(DurableConfigurationStore.class);
+ when(_virtualHost.getDurableConfigurationStore()).thenReturn(store);
+
+ mockExchangeCreation();
+ mockQueueRegistry();
+ delegateVhostQueueCreation();
+
+ when(_virtualHost.getQueues()).thenReturn(_queues);
+
+
+ _queueFactory = new AMQQueueFactory(_virtualHost, _queueRegistry);
- _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getName(), configXml, _broker));
- _queueRegistry = _virtualHost.getQueueRegistry();
}
+ private void delegateVhostQueueCreation() throws AMQException
+ {
+ final ArgumentCaptor<UUID> id = ArgumentCaptor.forClass(UUID.class);
+ final ArgumentCaptor<String> queueName = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<Boolean> durable = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<String> owner = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<Boolean> autoDelete = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<Boolean> exclusive = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<Boolean> deleteOnNoConsumer = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<Map> arguments = ArgumentCaptor.forClass(Map.class);
+
+ when(_virtualHost.createQueue(id.capture(), queueName.capture(), durable.capture(), owner.capture(),
+ autoDelete.capture(), exclusive.capture(), deleteOnNoConsumer.capture(), arguments.capture())).then(
+ new Answer<AMQQueue>()
+ {
+ @Override
+ public AMQQueue answer(InvocationOnMock invocation) throws Throwable
+ {
+ return _queueFactory.createAMQQueueImpl(id.getValue(),
+ queueName.getValue(),
+ durable.getValue(),
+ owner.getValue(),
+ autoDelete.getValue(),
+ exclusive.getValue(),
+ deleteOnNoConsumer.getValue(),
+ arguments.getValue());
+ }
+ }
+ );
+ }
+
+ private void mockQueueRegistry()
+ {
+ _queueRegistry = mock(QueueRegistry.class);
+
+ final ArgumentCaptor<AMQQueue> capturedQueue = ArgumentCaptor.forClass(AMQQueue.class);
+ doAnswer(new Answer()
+ {
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ AMQQueue queue = capturedQueue.getValue();
+ when(_queueRegistry.getQueue(eq(queue.getId()))).thenReturn(queue);
+ when(_queueRegistry.getQueue(eq(queue.getName()))).thenReturn(queue);
+ when(_virtualHost.getQueue(eq(queue.getId()))).thenReturn(queue);
+ when(_virtualHost.getQueue(eq(queue.getName()))).thenReturn(queue);
+ _queues.add(queue);
+
+ return null;
+ }
+ }).when(_queueRegistry).registerQueue(capturedQueue.capture());
+ }
+
+ private void mockExchangeCreation() throws AMQException
+ {
+ final ArgumentCaptor<UUID> idCapture = ArgumentCaptor.forClass(UUID.class);
+ final ArgumentCaptor<String> exchangeNameCapture = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<String> type = ArgumentCaptor.forClass(String.class);
+
+ when(_virtualHost.createExchange(idCapture.capture(), exchangeNameCapture.capture(), type.capture(),
+ anyBoolean(), anyBoolean(), anyString())).then(
+ new Answer<Exchange>()
+ {
+ @Override
+ public Exchange answer(InvocationOnMock invocation) throws Throwable
+ {
+ final String name = exchangeNameCapture.getValue();
+ final UUID id = idCapture.getValue();
+
+ final Exchange exchange = mock(Exchange.class);
+ ExchangeType exType = mock(ExchangeType.class);
+
+ when(exchange.getName()).thenReturn(name);
+ when(exchange.getId()).thenReturn(id);
+ when(exchange.getType()).thenReturn(exType);
+ final String typeName = type.getValue();
+ when(exType.getType()).thenReturn(typeName);
+ when(exType.getName()).thenReturn(new AMQShortString(typeName));
+
+ when(_virtualHost.getExchange(eq(name))).thenReturn(exchange);
+ when(_virtualHost.getExchange(eq(id))).thenReturn(exchange);
+
+ final ArgumentCaptor<AMQQueue> queue = ArgumentCaptor.forClass(AMQQueue.class);
+
+ when(exchange.addBinding(anyString(),queue.capture(),anyMap())).then(new Answer<Boolean>() {
+
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable
+ {
+ when(exchange.isBound(eq(queue.getValue()))).thenReturn(true);
+ return true;
+ }
+ });
+
+ return exchange;
+ }
+ }
+ );
+ }
+
@Override
public void tearDown() throws Exception
{
- try
- {
- _virtualHost.close();
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
+ super.tearDown();
}
private void verifyRegisteredQueueCount(int count)
{
- assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size());
+ assertEquals("Queue was not registered in virtualhost", count, _virtualHost.getQueues().size());
}
private void verifyQueueRegistered(String queueName)
{
- assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName));
+ assertNotNull("Queue " + queueName + " was not created", _virtualHost.getQueue(queueName));
}
public void testPriorityQueueRegistration() throws Exception
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.PRIORITIES, (Object) 5);
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false,
- false, _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false,
+ false,
+ false,
+ attributes);
assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
verifyQueueRegistered("testPriorityQueue");
@@ -115,43 +237,42 @@ public class AMQQueueFactoryTest extends QpidTestCase
public void testSimpleQueueRegistration() throws Exception
{
String queueName = getName();
- AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false,
- false, _virtualHost, null);
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false,
+ false,
+ false,
+ null);
assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
verifyQueueRegistered(queueName);
//verify that no alternate exchange or DLQ were produced
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange());
- assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not exist", _virtualHost.getQueue(dlQueueName));
verifyRegisteredQueueCount(1);
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does
* cause the alternate exchange to be set and DLQ to be produced.
* @throws AMQException
*/
public void testDeadLetterQueueEnabled() throws AMQException
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
String queueName = "testDeadLetterQueueEnabled";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
-
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
+ false,
+ attributes);
Exchange altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
@@ -161,7 +282,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
- AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
@@ -178,17 +299,20 @@ public class AMQQueueFactoryTest extends QpidTestCase
*/
public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception
{
+
String queueName = "testDeadLetterQueueEnabled";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ when(_queueConfiguration.getMaxDeliveryCount()).thenReturn(5);
+ when(_queueConfiguration.isDeadLetterQueueEnabled()).thenReturn(true);
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
- _virtualHost, null);
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
+ false,
+ null);
assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount());
Exchange altExchange = queue.getAlternateExchange();
@@ -199,7 +323,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
- AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
@@ -210,81 +334,77 @@ public class AMQQueueFactoryTest extends QpidTestCase
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not
* result in the alternate exchange being set and DLQ being created.
* @throws AMQException
*/
public void testDeadLetterQueueDisabled() throws AMQException
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) false);
String queueName = "testDeadLetterQueueDisabled";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
-
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
+ false,
+ attributes);
assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName));
- assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should still not exist", _virtualHost.getQueue(dlQueueName));
//only 1 queue should have been registered
verifyRegisteredQueueCount(1);
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but
* creating an auto-delete queue, does not result in the alternate exchange
* being set and DLQ being created.
* @throws AMQException
*/
public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
-
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
//create an autodelete queue
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false,
+ false,
+ attributes);
assertTrue("Queue should be autodelete", queue.isAutoDelete());
//ensure that the autodelete property overrides the request to enable DLQ
assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getExchange(dlExchangeName));
- assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not exist as queue is autodelete", _virtualHost.getQueue(dlQueueName));
//only 1 queue should have been registered
verifyRegisteredQueueCount(1);
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
* the desired effect.
*/
public void testMaximumDeliveryCount() throws Exception
{
- final FieldTable fieldTable = new FieldTable();
- fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
+ false,
+ attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount());
@@ -293,13 +413,14 @@ public class AMQQueueFactoryTest extends QpidTestCase
}
/**
- * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
+ * Tests that omitting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
* that queue is created with a default maximumDeliveryCount of zero (unless set in config).
*/
public void testMaximumDeliveryCountDefault() throws Exception
{
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
- _virtualHost, null);
+ final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
+ false,
+ null);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount());
@@ -314,7 +435,9 @@ public class AMQQueueFactoryTest extends QpidTestCase
{
try
{
- AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, _virtualHost, null);
+ _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false,
+ false,
+ null);
fail("queue with null name can not be created!");
}
catch (Exception e)
@@ -336,10 +459,9 @@ public class AMQQueueFactoryTest extends QpidTestCase
// change DLQ name to make its length bigger than exchange name
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE");
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE");
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
- AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
- false, false, _virtualHost, FieldTable.convertToMap(fieldTable));
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
+ _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
+ false, false, false, attributes);
fail("queue with DLQ name having more than 255 characters can not be created!");
}
catch (Exception e)
@@ -362,10 +484,9 @@ public class AMQQueueFactoryTest extends QpidTestCase
// change DLQ name to make its length bigger than exchange name
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE");
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ");
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
- AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
- false, false, _virtualHost, FieldTable.convertToMap(fieldTable));
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
+ _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
+ false, false, false, attributes);
fail("queue with DLE name having more than 255 characters can not be created!");
}
catch (Exception e)
@@ -379,16 +500,17 @@ public class AMQQueueFactoryTest extends QpidTestCase
public void testMessageGroupFromConfig() throws Exception
{
- PropertiesConfiguration queueConfig = new PropertiesConfiguration();
- queueConfig.addProperty("queues.queue.test.argument", "qpid.group_header_key=mykey");
- queueConfig.addProperty("queues.queue.test.argument", "qpid.shared_msg_group=1");
+ Map<String,String> arguments = new HashMap<String, String>();
+ arguments.put("qpid.group_header_key","mykey");
+ arguments.put("qpid.shared_msg_group","1");
+ QueueConfiguration qConf = mock(QueueConfiguration.class);
+ when(qConf.getArguments()).thenReturn(arguments);
+ when(qConf.getName()).thenReturn("test");
- final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", queueConfig, _broker);;
- QueueConfiguration qConf = new QueueConfiguration("test", vhostConfig);
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(qConf, _virtualHost);
- assertEquals("mykey", queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY));
- assertEquals("1", queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(qConf);
+ assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY));
+ assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS));
}
private String generateStringWithLength(char ch, int length)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index b677ece408..e490db288c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -469,7 +469,14 @@ public class MockAMQQueue implements AMQQueue
}
- public Map<String, Object> getArguments()
+ @Override
+ public Collection<String> getAvailableAttributes()
+ {
+ return null;
+ }
+
+ @Override
+ public Object getAttribute(String attrName)
{
return null;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 0faa796f1c..2328745b83 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -29,12 +29,12 @@ import static org.mockito.Matchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
+import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -65,7 +65,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
private AMQShortString _routingKey = new AMQShortString("routing key");
private DirectExchange _exchange;
private MockSubscription _subscription = new MockSubscription();
- private FieldTable _arguments = null;
+ private Map<String,Object> _arguments = null;
private MessagePublishInfo info = new MessagePublishInfo()
{
@@ -103,8 +103,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
_virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(),
- false, false, _virtualHost, FieldTable.convertToMap(_arguments));
+ _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(),
+ false, false, false, _arguments);
_exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString());
}
@@ -127,8 +127,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
public void testCreateQueue() throws AMQException
{
_queue.stop();
- try {
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, _owner.asString(), false, false, _virtualHost, FieldTable.convertToMap(_arguments));
+ try
+ {
+ _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null,
+ false, _owner.asString(), false,
+ false, false, _arguments);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
@@ -137,7 +140,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
e.getMessage().contains("name"));
}
- try {
+ try
+ {
_queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP);
assertNull("Queue was created", _queue);
}
@@ -147,8 +151,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
e.getMessage().contains("Host"));
}
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), false,
- false, _virtualHost, FieldTable.convertToMap(_arguments));
+ _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(),
+ "differentName", false,
+ _owner.asString(), false,
+ false, false, _arguments);
assertNotNull("Queue was not created", _queue);
}
@@ -1225,12 +1231,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
return _subscription;
}
- public FieldTable getArguments()
+ public Map<String,Object> getArguments()
{
return _arguments;
}
- public void setArguments(FieldTable arguments)
+ public void setArguments(Map<String,Object> arguments)
{
_arguments = arguments;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index 4abb7233dc..c115af5a38 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -50,8 +50,8 @@ public class SimpleAMQQueueThreadPoolTest extends QpidTestCase
try
{
- SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "test", false,
- "owner", false, false, test, null);
+ SimpleAMQQueue queue = (SimpleAMQQueue)
+ test.createQueue(UUIDGenerator.generateRandomUUID(), "test", false, "owner", false, false, false, null);
assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 67cf0780da..e9ad4ba236 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -52,6 +52,9 @@ import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRec
import org.apache.qpid.server.store.Transaction.Record;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase
{
@@ -178,7 +181,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testBindQueue() throws Exception
{
- AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
+ AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
@@ -197,7 +200,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testUnbindQueue() throws Exception
{
- AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
+ AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
@@ -212,8 +215,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testCreateQueueAMQQueue() throws Exception
{
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, null);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, null);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
Map<String, Object> queueAttributes = new HashMap<String, Object>();
@@ -225,13 +228,12 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testCreateQueueAMQQueueFieldTable() throws Exception
{
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
@@ -241,7 +243,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
- queueAttributes.put(Queue.ARGUMENTS, attributes);
+ queueAttributes.putAll(attributes);
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
@@ -250,8 +252,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
Exchange alternateExchange = createTestAlternateExchange();
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, null);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange, null);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
@@ -275,16 +277,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testUpdateQueueExclusivity() throws Exception
{
// create queue
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
+
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// update the queue to have exclusive=false
- queue = createTestQueue(getName(), getName() + "Owner", false);
- when(queue.getArguments()).thenReturn(attributes);
+ queue = createTestQueue(getName(), getName() + "Owner", false, attributes);
DurableConfigurationStoreHelper.updateQueue(_configStore, queue);
@@ -295,7 +296,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
- queueAttributes.put(Queue.ARGUMENTS, attributes);
+ queueAttributes.putAll(attributes);
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -304,17 +305,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testUpdateQueueAlternateExchange() throws Exception
{
// create queue
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// update the queue to have exclusive=false
Exchange alternateExchange = createTestAlternateExchange();
- queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange);
- when(queue.getArguments()).thenReturn(attributes);
+ queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange, attributes);
DurableConfigurationStoreHelper.updateQueue(_configStore, queue);
@@ -325,7 +324,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
- queueAttributes.put(Queue.ARGUMENTS, attributes);
+ queueAttributes.putAll(attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -334,12 +333,11 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testRemoveQueue() throws Exception
{
// create queue
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// remove queue
DurableConfigurationStoreHelper.removeQueue(_configStore,queue);
@@ -349,12 +347,19 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
anyMap());
}
- private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException
+ private AMQQueue createTestQueue(String queueName,
+ String queueOwner,
+ boolean exclusive,
+ final Map<String, Object> arguments) throws AMQStoreException
{
- return createTestQueue(queueName, queueOwner, exclusive, null);
+ return createTestQueue(queueName, queueOwner, exclusive, null, arguments);
}
- private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive, Exchange alternateExchange) throws AMQStoreException
+ private AMQQueue createTestQueue(String queueName,
+ String queueOwner,
+ boolean exclusive,
+ Exchange alternateExchange,
+ final Map<String, Object> arguments) throws AMQStoreException
{
AMQQueue queue = mock(AMQQueue.class);
when(queue.getName()).thenReturn(queueName);
@@ -363,6 +368,23 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getAlternateExchange()).thenReturn(alternateExchange);
+ if(arguments != null && !arguments.isEmpty())
+ {
+ when(queue.getAvailableAttributes()).thenReturn(arguments.keySet());
+ final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class);
+ when(queue.getAttribute(requestedAttribute.capture())).then(
+ new Answer()
+ {
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ String attrName = requestedAttribute.getValue();
+ return arguments.get(attrName);
+ }
+ });
+ }
+
return queue;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index caf6acb4bb..cb1fc2737d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -181,9 +181,8 @@ public class BrokerTestHelper
public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException
{
- SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, null,
- false, false, virtualHost, Collections.<String, Object>emptyMap());
- virtualHost.getQueueRegistry().registerQueue(queue);
+ SimpleAMQQueue queue = (SimpleAMQQueue) virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null,
+ false, false, false, Collections.<String, Object>emptyMap());
return queue;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 2d3483f078..66a71c562f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -45,6 +45,8 @@ import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.*;
import org.apache.qpid.server.security.SecurityManager;
@@ -82,7 +84,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
private DurableConfigurationStore _store;
private ExchangeFactory _exchangeFactory;
private ExchangeRegistry _exchangeRegistry;
- private QueueRegistry _queueRegistry;
+ private QueueFactory _queueFactory;
@Override
public void setUp() throws Exception
@@ -105,6 +107,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
+ when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue);
final ArgumentCaptor<Exchange> registeredExchange = ArgumentCaptor.forClass(Exchange.class);
doAnswer(new Answer()
@@ -114,37 +117,68 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
public Object answer(final InvocationOnMock invocation) throws Throwable
{
Exchange exchange = registeredExchange.getValue();
- when(_exchangeRegistry.getExchange(exchange.getId())).thenReturn(exchange);
- when(_exchangeRegistry.getExchange(exchange.getName())).thenReturn(exchange);
+ when(_exchangeRegistry.getExchange(eq(exchange.getId()))).thenReturn(exchange);
+ when(_exchangeRegistry.getExchange(eq(exchange.getName()))).thenReturn(exchange);
return null;
}
}).when(_exchangeRegistry).registerExchange(registeredExchange.capture());
- _queueRegistry = mock(QueueRegistry.class);
- when(_vhost.getQueueRegistry()).thenReturn(_queueRegistry);
+ final ArgumentCaptor<UUID> idArg = ArgumentCaptor.forClass(UUID.class);
+ final ArgumentCaptor<String> queueArg = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<Map> argsArg = ArgumentCaptor.forClass(Map.class);
- when(_queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
+ _queueFactory = mock(QueueFactory.class);
- final ArgumentCaptor<AMQQueue> registeredQueue = ArgumentCaptor.forClass(AMQQueue.class);
- doAnswer(new Answer()
- {
+ when(_queueFactory.createAMQQueueImpl(idArg.capture(), queueArg.capture(),
+ anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then(
+ new Answer()
+ {
- @Override
- public Object answer(final InvocationOnMock invocation) throws Throwable
- {
- AMQQueue queue = registeredQueue.getValue();
- when(_queueRegistry.getQueue(queue.getId())).thenReturn(queue);
- when(_queueRegistry.getQueue(queue.getName())).thenReturn(queue);
- return null;
- }
- }).when(_queueRegistry).registerQueue(registeredQueue.capture());
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ final AMQQueue queue = mock(AMQQueue.class);
+
+ final String queueName = queueArg.getValue();
+ final UUID queueId = idArg.getValue();
+
+ when(queue.getName()).thenReturn(queueName);
+ when(queue.getId()).thenReturn(queueId);
+ when(_vhost.getQueue(eq(queueName))).thenReturn(queue);
+ when(_vhost.getQueue(eq(queueId))).thenReturn(queue);
+
+ final ArgumentCaptor<Exchange> altExchangeArg = ArgumentCaptor.forClass(Exchange.class);
+ doAnswer(
+ new Answer()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ final Exchange value = altExchangeArg.getValue();
+ when(queue.getAlternateExchange()).thenReturn(value);
+ return null;
+ }
+ }
+ ).when(queue).setAlternateExchange(altExchangeArg.capture());
+
+ Map args = argsArg.getValue();
+ if(args.containsKey(Queue.ALTERNATE_EXCHANGE))
+ {
+ final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString());
+ final Exchange exchange = _exchangeRegistry.getExchange(exchangeId);
+ queue.setAlternateExchange(exchange);
+ }
+ return queue;
+ }
+ });
_exchangeFactory = mock(ExchangeFactory.class);
+
DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(_vhost, _exchangeRegistry),
+ new QueueRecoverer(_vhost, _exchangeRegistry, _queueFactory),
new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory),
new BindingRecoverer(_vhost, _exchangeRegistry)
};
@@ -356,24 +390,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
final UUID queueId = new UUID(1, 0);
final UUID exchangeId = new UUID(2, 0);
- /* These lines necessary to get queue creation to work because AMQQueueFactory is called directly rather than
- queue creation being on vhost - yuck! */
- SecurityManager securityManager = mock(SecurityManager.class);
- when(_vhost.getSecurityManager()).thenReturn(securityManager);
- when(securityManager.authoriseCreateQueue(anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),
- any(AMQShortString.class),anyString())).thenReturn(true);
- VirtualHostConfiguration configuration = mock(VirtualHostConfiguration.class);
- when(_vhost.getConfiguration()).thenReturn(configuration);
- QueueConfiguration queueConfiguration = mock(QueueConfiguration.class);
- when(configuration.getQueueConfiguration(anyString())).thenReturn(queueConfiguration);
- LogActor logActor = mock(LogActor.class);
- CurrentActor.set(logActor);
- RootMessageLogger rootLogger = mock(RootMessageLogger.class);
- when(logActor.getRootMessageLogger()).thenReturn(rootLogger);
- /* end of queue creation mock hackery */
-
final Exchange customExchange = mock(Exchange.class);
+ when(customExchange.getId()).thenReturn(exchangeId);
+ when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME);
+
when(_exchangeFactory.createExchange(eq(exchangeId),
eq(CUSTOM_EXCHANGE_NAME),
eq(HeadersExchange.TYPE.getType()),
@@ -390,7 +411,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_durableConfigurationRecoverer.completeConfigurationRecovery();
- assertEquals(_queueRegistry.getQueue(queueId).getAlternateExchange(), customExchange);
+ assertEquals(customExchange, _vhost.getQueue(queueId).getAlternateExchange());
}
private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 6769c1c2fc..1ca7ff1b65 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.virtualhost;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -28,6 +29,7 @@ import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
@@ -119,6 +121,43 @@ public class MockVirtualHost implements VirtualHost
}
@Override
+ public AMQQueue getQueue(String name)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue getQueue(UUID id)
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<AMQQueue> getQueues()
+ {
+ return null;
+ }
+
+ @Override
+ public int removeQueue(AMQQueue queue) throws AMQException
+ {
+ return 0;
+ }
+
+ @Override
+ public AMQQueue createQueue(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQException
+ {
+ return null;
+ }
+
+ @Override
public Exchange createExchange(UUID id,
String exchange,
String type,
@@ -141,6 +180,12 @@ public class MockVirtualHost implements VirtualHost
}
@Override
+ public Exchange getExchange(UUID id)
+ {
+ return null;
+ }
+
+ @Override
public Exchange getDefaultExchange()
{
return null;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
index e72196c383..03cb483e40 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
@@ -244,7 +244,7 @@ public class StandardVirtualHostTest extends QpidTestCase
VirtualHost vhost = createVirtualHost(vhostName, config);
assertNotNull("virtualhost should exist", vhost);
- AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName);
+ AMQQueue queue = vhost.getQueue(queueName);
assertNotNull("queue should exist", queue);
Exchange defaultExch = vhost.getDefaultExchange();