diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-08-18 09:13:02 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-08-18 09:13:02 +0000 |
| commit | ab6fffad2230229810c995253a6f021e42e03aaf (patch) | |
| tree | fdee7a99130750af8d7c71d25c358a282e17e405 /qpid/java/broker/src/test | |
| parent | 35b5c7fd8c761d41caa88505e8c2fee319e92a84 (diff) | |
| download | qpid-python-ab6fffad2230229810c995253a6f021e42e03aaf.tar.gz | |
QPID-5081 : [Java Broker] Refactor Queue Creation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1515079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
12 files changed, 441 insertions, 212 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index de6d036f29..dc9ddf7b32 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -113,17 +113,17 @@ public class VirtualHostConfigurationTest extends QpidTestCase VirtualHost vhost = createVirtualHost(getName()); // Check that atest was a priority queue with 5 priorities - AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest")); + AMQQueue atest = vhost.getQueue("atest"); assertTrue(atest instanceof AMQPriorityQueue); assertEquals(5, ((AMQPriorityQueue) atest).getPriorities()); // Check that ptest was a priority queue with 10 priorities - AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest")); + AMQQueue ptest = vhost.getQueue("ptest"); assertTrue(ptest instanceof AMQPriorityQueue); assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities()); // Check that ntest wasn't a priority queue - AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest")); + AMQQueue ntest = vhost.getQueue("ntest"); assertFalse(ntest instanceof AMQPriorityQueue); } @@ -146,13 +146,13 @@ public class VirtualHostConfigurationTest extends QpidTestCase VirtualHost vhost = createVirtualHost(getName()); // Check specifically configured values - AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest")); + AMQQueue aTest = vhost.getQueue("atest"); assertEquals(4, aTest.getMaximumQueueDepth()); assertEquals(5, aTest.getMaximumMessageSize()); assertEquals(6, aTest.getMaximumMessageAge()); // Check default values - AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest")); + AMQQueue bTest = vhost.getQueue("btest"); assertEquals(1, bTest.getMaximumQueueDepth()); assertEquals(2, bTest.getMaximumMessageSize()); assertEquals(3, bTest.getMaximumMessageAge()); @@ -214,10 +214,10 @@ public class VirtualHostConfigurationTest extends QpidTestCase assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled()); // Get queues - AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles")); - AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle")); - AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2")); - AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0")); + AMQQueue biggles = test.getQueue("biggles"); + AMQQueue beetle = test.getQueue("beetle"); + AMQQueue r2d2 = extra.getQueue("r2d2"); + AMQQueue c3p0 = extra.getQueue("c3p0"); // Disabled specifically for this queue, overriding virtualhost setting assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index f2a64381df..7adec3d595 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -75,7 +75,8 @@ public class TopicExchangeTest extends QpidTestCase public void testNoRoute() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); @@ -86,7 +87,8 @@ public class TopicExchangeTest extends QpidTestCase public void testDirectMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -108,7 +110,7 @@ public class TopicExchangeTest extends QpidTestCase public void testStarMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null)); @@ -139,7 +141,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null)); @@ -190,7 +192,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMidHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); routeMessage("a.c.d.b",0l); @@ -215,7 +218,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMatchafterHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null)); @@ -253,7 +257,8 @@ public class TopicExchangeTest extends QpidTestCase public void testHashAfterHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null)); int queueCount = routeMessage("a.c.b.b.c",0l); @@ -274,7 +279,8 @@ public class TopicExchangeTest extends QpidTestCase public void testHashHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null)); int queueCount = routeMessage("a.c.b.b.c",0l); @@ -295,7 +301,8 @@ public class TopicExchangeTest extends QpidTestCase public void testSubMatchFails() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null)); int queueCount = routeMessage("a.b.c",0l); @@ -326,7 +333,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMoreRouting() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -339,7 +347,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMoreQueue() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 693fd16b9f..a468fa072b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.server.queue; +import java.util.Collections; import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import java.util.ArrayList; +import org.apache.qpid.server.model.Queue; import static org.mockito.Mockito.when; @@ -38,9 +38,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest @Override public void setUp() throws Exception { - FieldTable arguments = new FieldTable(); - arguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 3); - setArguments(arguments); + setArguments(Collections.singletonMap(Queue.PRIORITIES,(Object)3)); super.setUp(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index c8e0e53d75..62c9b4c46d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,91 +20,213 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.configuration.XMLConfiguration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class AMQQueueFactoryTest extends QpidTestCase { private QueueRegistry _queueRegistry; private VirtualHost _virtualHost; - private Broker _broker; + private AMQQueueFactory _queueFactory; + private List<AMQQueue> _queues; + private QueueConfiguration _queueConfiguration; @Override public void setUp() throws Exception { super.setUp(); - BrokerTestHelper.setUp(); - XMLConfiguration configXml = new XMLConfiguration(); - configXml.addProperty("store.class", TestableMemoryMessageStore.class.getName()); + _queues = new ArrayList<AMQQueue>(); + _virtualHost = mock(VirtualHost.class); - _broker = BrokerTestHelper.createBrokerMock(); - if (getName().equals("testDeadLetterQueueDoesNotInheritDLQorMDCSettings")) - { - when(_broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(5); - when(_broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(true); - } + VirtualHostConfiguration vhostConfig = mock(VirtualHostConfiguration.class); + when(_virtualHost.getConfiguration()).thenReturn(vhostConfig); + _queueConfiguration = mock(QueueConfiguration.class); + when(vhostConfig.getQueueConfiguration(anyString())).thenReturn(_queueConfiguration); + LogActor logActor = mock(LogActor.class); + CurrentActor.set(logActor); + RootMessageLogger rootLogger = mock(RootMessageLogger.class); + when(logActor.getRootMessageLogger()).thenReturn(rootLogger); + DurableConfigurationStore store = mock(DurableConfigurationStore.class); + when(_virtualHost.getDurableConfigurationStore()).thenReturn(store); + + mockExchangeCreation(); + mockQueueRegistry(); + delegateVhostQueueCreation(); + + when(_virtualHost.getQueues()).thenReturn(_queues); + + + _queueFactory = new AMQQueueFactory(_virtualHost, _queueRegistry); - _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getName(), configXml, _broker)); - _queueRegistry = _virtualHost.getQueueRegistry(); } + private void delegateVhostQueueCreation() throws AMQException + { + final ArgumentCaptor<UUID> id = ArgumentCaptor.forClass(UUID.class); + final ArgumentCaptor<String> queueName = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<Boolean> durable = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<String> owner = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<Boolean> autoDelete = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<Boolean> exclusive = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<Boolean> deleteOnNoConsumer = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<Map> arguments = ArgumentCaptor.forClass(Map.class); + + when(_virtualHost.createQueue(id.capture(), queueName.capture(), durable.capture(), owner.capture(), + autoDelete.capture(), exclusive.capture(), deleteOnNoConsumer.capture(), arguments.capture())).then( + new Answer<AMQQueue>() + { + @Override + public AMQQueue answer(InvocationOnMock invocation) throws Throwable + { + return _queueFactory.createAMQQueueImpl(id.getValue(), + queueName.getValue(), + durable.getValue(), + owner.getValue(), + autoDelete.getValue(), + exclusive.getValue(), + deleteOnNoConsumer.getValue(), + arguments.getValue()); + } + } + ); + } + + private void mockQueueRegistry() + { + _queueRegistry = mock(QueueRegistry.class); + + final ArgumentCaptor<AMQQueue> capturedQueue = ArgumentCaptor.forClass(AMQQueue.class); + doAnswer(new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + AMQQueue queue = capturedQueue.getValue(); + when(_queueRegistry.getQueue(eq(queue.getId()))).thenReturn(queue); + when(_queueRegistry.getQueue(eq(queue.getName()))).thenReturn(queue); + when(_virtualHost.getQueue(eq(queue.getId()))).thenReturn(queue); + when(_virtualHost.getQueue(eq(queue.getName()))).thenReturn(queue); + _queues.add(queue); + + return null; + } + }).when(_queueRegistry).registerQueue(capturedQueue.capture()); + } + + private void mockExchangeCreation() throws AMQException + { + final ArgumentCaptor<UUID> idCapture = ArgumentCaptor.forClass(UUID.class); + final ArgumentCaptor<String> exchangeNameCapture = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<String> type = ArgumentCaptor.forClass(String.class); + + when(_virtualHost.createExchange(idCapture.capture(), exchangeNameCapture.capture(), type.capture(), + anyBoolean(), anyBoolean(), anyString())).then( + new Answer<Exchange>() + { + @Override + public Exchange answer(InvocationOnMock invocation) throws Throwable + { + final String name = exchangeNameCapture.getValue(); + final UUID id = idCapture.getValue(); + + final Exchange exchange = mock(Exchange.class); + ExchangeType exType = mock(ExchangeType.class); + + when(exchange.getName()).thenReturn(name); + when(exchange.getId()).thenReturn(id); + when(exchange.getType()).thenReturn(exType); + final String typeName = type.getValue(); + when(exType.getType()).thenReturn(typeName); + when(exType.getName()).thenReturn(new AMQShortString(typeName)); + + when(_virtualHost.getExchange(eq(name))).thenReturn(exchange); + when(_virtualHost.getExchange(eq(id))).thenReturn(exchange); + + final ArgumentCaptor<AMQQueue> queue = ArgumentCaptor.forClass(AMQQueue.class); + + when(exchange.addBinding(anyString(),queue.capture(),anyMap())).then(new Answer<Boolean>() { + + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable + { + when(exchange.isBound(eq(queue.getValue()))).thenReturn(true); + return true; + } + }); + + return exchange; + } + } + ); + } + @Override public void tearDown() throws Exception { - try - { - _virtualHost.close(); - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } + super.tearDown(); } private void verifyRegisteredQueueCount(int count) { - assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size()); + assertEquals("Queue was not registered in virtualhost", count, _virtualHost.getQueues().size()); } private void verifyQueueRegistered(String queueName) { - assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName)); + assertNotNull("Queue " + queueName + " was not created", _virtualHost.getQueue(queueName)); } public void testPriorityQueueRegistration() throws Exception { - FieldTable fieldTable = new FieldTable(); - fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5); + Map<String,Object> attributes = Collections.singletonMap(Queue.PRIORITIES, (Object) 5); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false, - false, _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false, + false, + false, + attributes); assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass()); verifyQueueRegistered("testPriorityQueue"); @@ -115,43 +237,42 @@ public class AMQQueueFactoryTest extends QpidTestCase public void testSimpleQueueRegistration() throws Exception { String queueName = getName(); - AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, - false, _virtualHost, null); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, + false, + false, + null); assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); verifyQueueRegistered(queueName); //verify that no alternate exchange or DLQ were produced - QueueRegistry qReg = _virtualHost.getQueueRegistry(); assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange()); - assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not exist", _virtualHost.getQueue(dlQueueName)); verifyRegisteredQueueCount(1); } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does * cause the alternate exchange to be set and DLQ to be produced. * @throws AMQException */ public void testDeadLetterQueueEnabled() throws AMQException { - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); String queueName = "testDeadLetterQueueEnabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); - - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + false, + attributes); Exchange altExchange = queue.getAlternateExchange(); assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); @@ -161,7 +282,7 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); - AMQQueue dlQueue = qReg.getQueue(dlQueueName); + AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); assertNotNull("The DLQ was not registered as expected", dlQueue); assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); @@ -178,17 +299,20 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception { + String queueName = "testDeadLetterQueueEnabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); + when(_queueConfiguration.getMaxDeliveryCount()).thenReturn(5); + when(_queueConfiguration.isDeadLetterQueueEnabled()).thenReturn(true); - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, - _virtualHost, null); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + false, + null); assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount()); Exchange altExchange = queue.getAlternateExchange(); @@ -199,7 +323,7 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); - AMQQueue dlQueue = qReg.getQueue(dlQueueName); + AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); assertNotNull("The DLQ was not registered as expected", dlQueue); assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); @@ -210,81 +334,77 @@ public class AMQQueueFactoryTest extends QpidTestCase } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not * result in the alternate exchange being set and DLQ being created. * @throws AMQException */ public void testDeadLetterQueueDisabled() throws AMQException { - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) false); String queueName = "testDeadLetterQueueDisabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); - - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + false, + attributes); assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName)); - assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should still not exist", _virtualHost.getQueue(dlQueueName)); //only 1 queue should have been registered verifyRegisteredQueueCount(1); } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but * creating an auto-delete queue, does not result in the alternate exchange * being set and DLQ being created. * @throws AMQException */ public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException { - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); - - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); //create an autodelete queue - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false, + false, + attributes); assertTrue("Queue should be autodelete", queue.isAutoDelete()); //ensure that the autodelete property overrides the request to enable DLQ assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange()); assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getExchange(dlExchangeName)); - assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not exist as queue is autodelete", _virtualHost.getQueue(dlQueueName)); //only 1 queue should have been registered verifyRegisteredQueueCount(1); } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has * the desired effect. */ public void testMaximumDeliveryCount() throws Exception { - final FieldTable fieldTable = new FieldTable(); - fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5); + Map<String,Object> attributes = Collections.singletonMap(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5); - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, + false, + attributes); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount()); @@ -293,13 +413,14 @@ public class AMQQueueFactoryTest extends QpidTestCase } /** - * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means + * Tests that omitting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means * that queue is created with a default maximumDeliveryCount of zero (unless set in config). */ public void testMaximumDeliveryCountDefault() throws Exception { - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, - _virtualHost, null); + final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, + false, + null); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount()); @@ -314,7 +435,9 @@ public class AMQQueueFactoryTest extends QpidTestCase { try { - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, _virtualHost, null); + _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, + false, + null); fail("queue with null name can not be created!"); } catch (Exception e) @@ -336,10 +459,9 @@ public class AMQQueueFactoryTest extends QpidTestCase // change DLQ name to make its length bigger than exchange name setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE"); - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", - false, false, _virtualHost, FieldTable.convertToMap(fieldTable)); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); + _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", + false, false, false, attributes); fail("queue with DLQ name having more than 255 characters can not be created!"); } catch (Exception e) @@ -362,10 +484,9 @@ public class AMQQueueFactoryTest extends QpidTestCase // change DLQ name to make its length bigger than exchange name setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ"); - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", - false, false, _virtualHost, FieldTable.convertToMap(fieldTable)); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); + _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", + false, false, false, attributes); fail("queue with DLE name having more than 255 characters can not be created!"); } catch (Exception e) @@ -379,16 +500,17 @@ public class AMQQueueFactoryTest extends QpidTestCase public void testMessageGroupFromConfig() throws Exception { - PropertiesConfiguration queueConfig = new PropertiesConfiguration(); - queueConfig.addProperty("queues.queue.test.argument", "qpid.group_header_key=mykey"); - queueConfig.addProperty("queues.queue.test.argument", "qpid.shared_msg_group=1"); + Map<String,String> arguments = new HashMap<String, String>(); + arguments.put("qpid.group_header_key","mykey"); + arguments.put("qpid.shared_msg_group","1"); + QueueConfiguration qConf = mock(QueueConfiguration.class); + when(qConf.getArguments()).thenReturn(arguments); + when(qConf.getName()).thenReturn("test"); - final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", queueConfig, _broker);; - QueueConfiguration qConf = new QueueConfiguration("test", vhostConfig); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(qConf, _virtualHost); - assertEquals("mykey", queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY)); - assertEquals("1", queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(qConf); + assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY)); + assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS)); } private String generateStringWithLength(char ch, int length) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index b677ece408..e490db288c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -469,7 +469,14 @@ public class MockAMQQueue implements AMQQueue } - public Map<String, Object> getArguments() + @Override + public Collection<String> getAvailableAttributes() + { + return null; + } + + @Override + public Object getAttribute(String attrName) { return null; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 0faa796f1c..2328745b83 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -29,12 +29,12 @@ import static org.mockito.Matchers.contains; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; +import java.util.Map; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessageHeader; @@ -65,7 +65,7 @@ public class SimpleAMQQueueTest extends QpidTestCase private AMQShortString _routingKey = new AMQShortString("routing key"); private DirectExchange _exchange; private MockSubscription _subscription = new MockSubscription(); - private FieldTable _arguments = null; + private Map<String,Object> _arguments = null; private MessagePublishInfo info = new MessagePublishInfo() { @@ -103,8 +103,8 @@ public class SimpleAMQQueueTest extends QpidTestCase _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), - false, false, _virtualHost, FieldTable.convertToMap(_arguments)); + _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), + false, false, false, _arguments); _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString()); } @@ -127,8 +127,11 @@ public class SimpleAMQQueueTest extends QpidTestCase public void testCreateQueue() throws AMQException { _queue.stop(); - try { - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, _owner.asString(), false, false, _virtualHost, FieldTable.convertToMap(_arguments)); + try + { + _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null, + false, _owner.asString(), false, + false, false, _arguments); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) @@ -137,7 +140,8 @@ public class SimpleAMQQueueTest extends QpidTestCase e.getMessage().contains("name")); } - try { + try + { _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP); assertNull("Queue was created", _queue); } @@ -147,8 +151,10 @@ public class SimpleAMQQueueTest extends QpidTestCase e.getMessage().contains("Host")); } - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), false, - false, _virtualHost, FieldTable.convertToMap(_arguments)); + _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), + "differentName", false, + _owner.asString(), false, + false, false, _arguments); assertNotNull("Queue was not created", _queue); } @@ -1225,12 +1231,12 @@ public class SimpleAMQQueueTest extends QpidTestCase return _subscription; } - public FieldTable getArguments() + public Map<String,Object> getArguments() { return _arguments; } - public void setArguments(FieldTable arguments) + public void setArguments(Map<String,Object> arguments) { _arguments = arguments; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index 4abb7233dc..c115af5a38 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -50,8 +50,8 @@ public class SimpleAMQQueueThreadPoolTest extends QpidTestCase try { - SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "test", false, - "owner", false, false, test, null); + SimpleAMQQueue queue = (SimpleAMQQueue) + test.createQueue(UUIDGenerator.generateRandomUUID(), "test", false, "owner", false, false, false, null); assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 67cf0780da..e9ad4ba236 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -52,6 +52,9 @@ import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRec import org.apache.qpid.server.store.Transaction.Record; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase { @@ -178,7 +181,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testBindQueue() throws Exception { - AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); DurableConfigurationStoreHelper.createBinding(_configStore, binding); @@ -197,7 +200,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testUnbindQueue() throws Exception { - AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); DurableConfigurationStoreHelper.createBinding(_configStore, binding); @@ -212,8 +215,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testCreateQueueAMQQueue() throws Exception { - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, null); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); reopenStore(); Map<String, Object> queueAttributes = new HashMap<String, Object>(); @@ -225,13 +228,12 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testCreateQueueAMQQueueFieldTable() throws Exception { - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); reopenStore(); @@ -241,7 +243,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); - queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.putAll(attributes); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } @@ -250,8 +252,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { Exchange alternateExchange = createTestAlternateExchange(); - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange, null); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); reopenStore(); @@ -275,16 +277,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testUpdateQueueExclusivity() throws Exception { // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); + + DurableConfigurationStoreHelper.createQueue(_configStore, queue); // update the queue to have exclusive=false - queue = createTestQueue(getName(), getName() + "Owner", false); - when(queue.getArguments()).thenReturn(attributes); + queue = createTestQueue(getName(), getName() + "Owner", false, attributes); DurableConfigurationStoreHelper.updateQueue(_configStore, queue); @@ -295,7 +296,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); - queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.putAll(attributes); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -304,17 +305,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testUpdateQueueAlternateExchange() throws Exception { // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); // update the queue to have exclusive=false Exchange alternateExchange = createTestAlternateExchange(); - queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange); - when(queue.getArguments()).thenReturn(attributes); + queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange, attributes); DurableConfigurationStoreHelper.updateQueue(_configStore, queue); @@ -325,7 +324,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); - queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.putAll(attributes); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -334,12 +333,11 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testRemoveQueue() throws Exception { // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); // remove queue DurableConfigurationStoreHelper.removeQueue(_configStore,queue); @@ -349,12 +347,19 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest anyMap()); } - private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException + private AMQQueue createTestQueue(String queueName, + String queueOwner, + boolean exclusive, + final Map<String, Object> arguments) throws AMQStoreException { - return createTestQueue(queueName, queueOwner, exclusive, null); + return createTestQueue(queueName, queueOwner, exclusive, null, arguments); } - private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive, Exchange alternateExchange) throws AMQStoreException + private AMQQueue createTestQueue(String queueName, + String queueOwner, + boolean exclusive, + Exchange alternateExchange, + final Map<String, Object> arguments) throws AMQStoreException { AMQQueue queue = mock(AMQQueue.class); when(queue.getName()).thenReturn(queueName); @@ -363,6 +368,23 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest when(queue.isExclusive()).thenReturn(exclusive); when(queue.getId()).thenReturn(_queueId); when(queue.getAlternateExchange()).thenReturn(alternateExchange); + if(arguments != null && !arguments.isEmpty()) + { + when(queue.getAvailableAttributes()).thenReturn(arguments.keySet()); + final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class); + when(queue.getAttribute(requestedAttribute.capture())).then( + new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + String attrName = requestedAttribute.getValue(); + return arguments.get(attrName); + } + }); + } + return queue; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index caf6acb4bb..cb1fc2737d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -181,9 +181,8 @@ public class BrokerTestHelper public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException { - SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, null, - false, false, virtualHost, Collections.<String, Object>emptyMap()); - virtualHost.getQueueRegistry().registerQueue(queue); + SimpleAMQQueue queue = (SimpleAMQQueue) virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null, + false, false, false, Collections.<String, Object>emptyMap()); return queue; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 2d3483f078..66a71c562f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -45,6 +45,8 @@ import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.*; import org.apache.qpid.server.security.SecurityManager; @@ -82,7 +84,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase private DurableConfigurationStore _store; private ExchangeFactory _exchangeFactory; private ExchangeRegistry _exchangeRegistry; - private QueueRegistry _queueRegistry; + private QueueFactory _queueFactory; @Override public void setUp() throws Exception @@ -105,6 +107,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); + when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue); final ArgumentCaptor<Exchange> registeredExchange = ArgumentCaptor.forClass(Exchange.class); doAnswer(new Answer() @@ -114,37 +117,68 @@ public class DurableConfigurationRecovererTest extends QpidTestCase public Object answer(final InvocationOnMock invocation) throws Throwable { Exchange exchange = registeredExchange.getValue(); - when(_exchangeRegistry.getExchange(exchange.getId())).thenReturn(exchange); - when(_exchangeRegistry.getExchange(exchange.getName())).thenReturn(exchange); + when(_exchangeRegistry.getExchange(eq(exchange.getId()))).thenReturn(exchange); + when(_exchangeRegistry.getExchange(eq(exchange.getName()))).thenReturn(exchange); return null; } }).when(_exchangeRegistry).registerExchange(registeredExchange.capture()); - _queueRegistry = mock(QueueRegistry.class); - when(_vhost.getQueueRegistry()).thenReturn(_queueRegistry); + final ArgumentCaptor<UUID> idArg = ArgumentCaptor.forClass(UUID.class); + final ArgumentCaptor<String> queueArg = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<Map> argsArg = ArgumentCaptor.forClass(Map.class); - when(_queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); + _queueFactory = mock(QueueFactory.class); - final ArgumentCaptor<AMQQueue> registeredQueue = ArgumentCaptor.forClass(AMQQueue.class); - doAnswer(new Answer() - { + when(_queueFactory.createAMQQueueImpl(idArg.capture(), queueArg.capture(), + anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then( + new Answer() + { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable - { - AMQQueue queue = registeredQueue.getValue(); - when(_queueRegistry.getQueue(queue.getId())).thenReturn(queue); - when(_queueRegistry.getQueue(queue.getName())).thenReturn(queue); - return null; - } - }).when(_queueRegistry).registerQueue(registeredQueue.capture()); + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + final AMQQueue queue = mock(AMQQueue.class); + + final String queueName = queueArg.getValue(); + final UUID queueId = idArg.getValue(); + + when(queue.getName()).thenReturn(queueName); + when(queue.getId()).thenReturn(queueId); + when(_vhost.getQueue(eq(queueName))).thenReturn(queue); + when(_vhost.getQueue(eq(queueId))).thenReturn(queue); + + final ArgumentCaptor<Exchange> altExchangeArg = ArgumentCaptor.forClass(Exchange.class); + doAnswer( + new Answer() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + final Exchange value = altExchangeArg.getValue(); + when(queue.getAlternateExchange()).thenReturn(value); + return null; + } + } + ).when(queue).setAlternateExchange(altExchangeArg.capture()); + + Map args = argsArg.getValue(); + if(args.containsKey(Queue.ALTERNATE_EXCHANGE)) + { + final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString()); + final Exchange exchange = _exchangeRegistry.getExchange(exchangeId); + queue.setAlternateExchange(exchange); + } + return queue; + } + }); _exchangeFactory = mock(ExchangeFactory.class); + DurableConfiguredObjectRecoverer[] recoverers = { - new QueueRecoverer(_vhost, _exchangeRegistry), + new QueueRecoverer(_vhost, _exchangeRegistry, _queueFactory), new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory), new BindingRecoverer(_vhost, _exchangeRegistry) }; @@ -356,24 +390,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase final UUID queueId = new UUID(1, 0); final UUID exchangeId = new UUID(2, 0); - /* These lines necessary to get queue creation to work because AMQQueueFactory is called directly rather than - queue creation being on vhost - yuck! */ - SecurityManager securityManager = mock(SecurityManager.class); - when(_vhost.getSecurityManager()).thenReturn(securityManager); - when(securityManager.authoriseCreateQueue(anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(), - any(AMQShortString.class),anyString())).thenReturn(true); - VirtualHostConfiguration configuration = mock(VirtualHostConfiguration.class); - when(_vhost.getConfiguration()).thenReturn(configuration); - QueueConfiguration queueConfiguration = mock(QueueConfiguration.class); - when(configuration.getQueueConfiguration(anyString())).thenReturn(queueConfiguration); - LogActor logActor = mock(LogActor.class); - CurrentActor.set(logActor); - RootMessageLogger rootLogger = mock(RootMessageLogger.class); - when(logActor.getRootMessageLogger()).thenReturn(rootLogger); - /* end of queue creation mock hackery */ - final Exchange customExchange = mock(Exchange.class); + when(customExchange.getId()).thenReturn(exchangeId); + when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME); + when(_exchangeFactory.createExchange(eq(exchangeId), eq(CUSTOM_EXCHANGE_NAME), eq(HeadersExchange.TYPE.getType()), @@ -390,7 +411,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _durableConfigurationRecoverer.completeConfigurationRecovery(); - assertEquals(_queueRegistry.getQueue(queueId).getAlternateExchange(), customExchange); + assertEquals(customExchange, _vhost.getQueue(queueId).getAlternateExchange()); } private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 6769c1c2fc..1ca7ff1b65 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.virtualhost; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ScheduledFuture; import org.apache.qpid.AMQException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -28,6 +29,7 @@ import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; @@ -119,6 +121,43 @@ public class MockVirtualHost implements VirtualHost } @Override + public AMQQueue getQueue(String name) + { + return null; + } + + @Override + public AMQQueue getQueue(UUID id) + { + return null; + } + + @Override + public Collection<AMQQueue> getQueues() + { + return null; + } + + @Override + public int removeQueue(AMQQueue queue) throws AMQException + { + return 0; + } + + @Override + public AMQQueue createQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQException + { + return null; + } + + @Override public Exchange createExchange(UUID id, String exchange, String type, @@ -141,6 +180,12 @@ public class MockVirtualHost implements VirtualHost } @Override + public Exchange getExchange(UUID id) + { + return null; + } + + @Override public Exchange getDefaultExchange() { return null; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java index e72196c383..03cb483e40 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -244,7 +244,7 @@ public class StandardVirtualHostTest extends QpidTestCase VirtualHost vhost = createVirtualHost(vhostName, config); assertNotNull("virtualhost should exist", vhost); - AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName); + AMQQueue queue = vhost.getQueue(queueName); assertNotNull("queue should exist", queue); Exchange defaultExch = vhost.getDefaultExchange(); |
