diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-08-15 16:42:39 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-08-15 16:42:39 +0000 |
| commit | 7057688d9214cffd217781db3c51abef5e227c93 (patch) | |
| tree | af52519ecd8844b7061ae442c84dec1f83bd45ae /qpid/java/broker/src/test | |
| parent | f203ee690d73b8f6ff19ba8b4f3f39808a1eddde (diff) | |
| download | qpid-python-7057688d9214cffd217781db3c51abef5e227c93.tar.gz | |
QPID-5073 : [Java Broker] Refactor DurableConfigurationStore recovery to allow for additional configured object children other than just Exchange/Binding/Queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1514360 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
4 files changed, 391 insertions, 275 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java index 65fd249d03..3d43ef0f44 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java @@ -37,7 +37,8 @@ public class MessageStoreLogSubjectTest extends AbstractTestLogSubject _testVhost = BrokerTestHelper.createVirtualHost("test"); - _subject = new MessageStoreLogSubject(_testVhost, _testVhost.getMessageStore().getClass().getSimpleName()); + _subject = new MessageStoreLogSubject(_testVhost.getName(), + _testVhost.getMessageStore().getClass().getSimpleName()); } @Override diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index ab1a0f7d0c..67cf0780da 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -56,6 +56,11 @@ import org.apache.qpid.util.FileUtils; public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase { private static final String EXCHANGE_NAME = "exchangeName"; + + private static final String EXCHANGE = org.apache.qpid.server.model.Exchange.class.getSimpleName(); + private static final String BINDING = org.apache.qpid.server.model.Binding.class.getSimpleName(); + private static final String QUEUE = Queue.class.getSimpleName(); + private String _storePath; private String _storeName; private MessageStore _messageStore; @@ -134,7 +139,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest DurableConfigurationStoreHelper.createExchange(_configStore, exchange); reopenStore(); - verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(org.apache.qpid.server.model.Exchange.class.getName()), + verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE), eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(), org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString()))); @@ -186,7 +191,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY); map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,FieldTable.convertToMap(_bindingArgs)); - verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(org.apache.qpid.server.model.Binding.class.getName()), + verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(BINDING), eq(map)); } @@ -201,7 +206,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest reopenStore(); verify(_recoveryHandler, never()).configuredObject(any(UUID.class), - eq(org.apache.qpid.server.model.Binding.class.getName()), + eq(BINDING), anyMap()); } @@ -215,7 +220,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } public void testCreateQueueAMQQueueFieldTable() throws Exception @@ -238,7 +243,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); queueAttributes.put(Queue.ARGUMENTS, attributes); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception @@ -256,7 +261,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } private Exchange createTestAlternateExchange() @@ -292,7 +297,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); queueAttributes.put(Queue.ARGUMENTS, attributes); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } @@ -323,7 +328,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.ARGUMENTS, attributes); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } public void testRemoveQueue() throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java new file mode 100644 index 0000000000..ccc7f6a697 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -0,0 +1,376 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.HeadersExchange; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.DurableConfigurationRecoverer; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; +import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; + +public class DurableConfigurationRecovererTest extends QpidTestCase +{ + private static final UUID QUEUE_ID = new UUID(0,0); + private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1); + private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2); + private static final String CUSTOM_EXCHANGE_NAME = "customExchange"; + + private DurableConfigurationRecoverer _durableConfigurationRecoverer; + private Exchange _directExchange; + private Exchange _topicExchange; + private VirtualHost _vhost; + private DurableConfigurationStore _store; + private ExchangeFactory _exchangeFactory; + private ExchangeRegistry _exchangeRegistry; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + + _directExchange = mock(Exchange.class); + when(_directExchange.getType()).thenReturn(DirectExchange.TYPE); + + + _topicExchange = mock(Exchange.class); + when(_topicExchange.getType()).thenReturn(TopicExchange.TYPE); + + AMQQueue queue = mock(AMQQueue.class); + + _vhost = mock(VirtualHost.class); + + _exchangeRegistry = mock(ExchangeRegistry.class); + when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); + when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); + + QueueRegistry queueRegistry = mock(QueueRegistry.class); + when(_vhost.getQueueRegistry()).thenReturn(queueRegistry); + + when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); + + _exchangeFactory = mock(ExchangeFactory.class); + + DurableConfiguredObjectRecoverer[] recoverers = { + new QueueRecoverer(_vhost, _exchangeRegistry), + new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory), + new BindingRecoverer(_vhost, _exchangeRegistry) + }; + + final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>(); + for(DurableConfiguredObjectRecoverer recoverer : recoverers) + { + recovererMap.put(recoverer.getType(), recoverer); + } + _durableConfigurationRecoverer = + new DurableConfigurationRecoverer(_vhost.getName(), recovererMap, + new DefaultUpgraderProvider(_vhost, _exchangeRegistry)); + + _store = mock(DurableConfigurationStore.class); + + CurrentActor.set(mock(LogActor.class)); + } + + public void testUpgradeEmptyStore() throws Exception + { + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + assertEquals("Did not upgrade to the expected version", + CURRENT_CONFIG_VERSION, + _durableConfigurationRecoverer.completeConfigurationRecovery()); + } + + public void testUpgradeNewerStoreFails() throws Exception + { + try + { + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION + 1); + _durableConfigurationRecoverer.completeConfigurationRecovery(); + fail("Should not be able to start when config model is newer than current"); + } + catch (IllegalStateException e) + { + // pass + } + } + + public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception + { + + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + DIRECT_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); + + final ConfiguredObjectRecord[] expected = { + new ConfiguredObjectRecord(new UUID(1, 0), "Binding", + createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID)) + }; + + verifyCorrectUpdates(expected); + + _durableConfigurationRecoverer.completeConfigurationRecovery(); + } + + + + public void testUpgradeOnlyRemovesSelectorBindings() throws Exception + { + + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + DIRECT_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble", + "not-a-selector", + "moo")); + + + final UUID customExchangeId = new UUID(3,0); + + _durableConfigurationRecoverer.configuredObject(new UUID(2, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + customExchangeId, + QUEUE_ID, + "x-filter-jms-selector", + "wibble", + "not-a-selector", + "moo")); + + _durableConfigurationRecoverer.configuredObject(customExchangeId, + "org.apache.qpid.server.model.Exchange", + createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE)); + + final Exchange customExchange = mock(Exchange.class); + + when(_exchangeFactory.createExchange(eq(customExchangeId), + eq(CUSTOM_EXCHANGE_NAME), + eq(HeadersExchange.TYPE.getType()), + anyBoolean(), + anyBoolean())).thenReturn(customExchange); + doAnswer(new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + when(_exchangeRegistry.getExchange(eq(customExchangeId))).thenReturn(customExchange); + return null; + } + }).when(_exchangeRegistry).registerExchange(customExchange); + + final ConfiguredObjectRecord[] expected = { + new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", + createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")), + new ConfiguredObjectRecord(new UUID(2, 0), "org.apache.qpid.server.model.Binding", + createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo")) + }; + + verifyCorrectUpdates(expected); + + _durableConfigurationRecoverer.completeConfigurationRecovery(); + } + + + public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception + { + + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + TOPIC_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); + + final ConfiguredObjectRecord[] expected = { + new ConfiguredObjectRecord(new UUID(1, 0), "Binding", + createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")) + }; + + verifyCorrectUpdates(expected); + + _durableConfigurationRecoverer.completeConfigurationRecovery(); + } + + public void testUpgradeDoesNotRecur() throws Exception + { + + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "Binding", + createBinding("key", + DIRECT_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); + + doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class)); + + _durableConfigurationRecoverer.completeConfigurationRecovery(); + } + + public void testFailsWithUnresolvedObjects() + { + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + + + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "Binding", + createBinding("key", + new UUID(3,0), + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); + + try + { + _durableConfigurationRecoverer.completeConfigurationRecovery(); + fail("Expected resolution to fail due to unknown object"); + } + catch(IllegalConfigurationException e) + { + assertEquals("Durable configuration has unresolved dependencies", e.getMessage()); + } + + } + + public void testFailsWithUnknownObjectType() + { + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + + + try + { + final Map<String, Object> emptyArguments = Collections.emptyMap(); + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "Wibble", emptyArguments); + _durableConfigurationRecoverer.completeConfigurationRecovery(); + fail("Expected resolution to fail due to unknown object type"); + } + catch(IllegalConfigurationException e) + { + assertEquals("Unkown type for configured object: Wibble", e.getMessage()); + } + + + } + + private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException + { + doAnswer(new Answer() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + Object[] args = invocation.getArguments(); + assertEquals("Updated records are not as expected", new HashSet(Arrays.asList( + expected)), new HashSet(Arrays.asList(args))); + + return null; + } + }).when(_store).update(any(ConfiguredObjectRecord[].class)); + } + + private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args) + { + Map<String, Object> binding = new LinkedHashMap<String, Object>(); + + binding.put("name", bindingKey); + binding.put(Binding.EXCHANGE, exchangeId.toString()); + binding.put(Binding.QUEUE, queueId.toString()); + Map<String,String> argumentMap = new LinkedHashMap<String, String>(); + if(args != null && args.length != 0) + { + String key = null; + for(String arg : args) + { + if(key == null) + { + key = arg; + } + else + { + argumentMap.put(key, arg); + key = null; + } + } + } + binding.put(Binding.ARGUMENTS, argumentMap); + return binding; + } + + + private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type) + { + Map<String, Object> exchange = new LinkedHashMap<String, Object>(); + + exchange.put(org.apache.qpid.server.model.Exchange.NAME, name); + exchange.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType()); + + return exchange; + + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java deleted file mode 100644 index ac81f5d625..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.HeadersExchange; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.test.utils.QpidTestCase; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; - -public class VirtualHostConfigRecoveryHandlerTest extends QpidTestCase -{ - private Exchange _directExchange; - private Exchange _topicExchange; - private VirtualHost _vhost; - private VirtualHostConfigRecoveryHandler _virtualHostConfigRecoveryHandler; - private DurableConfigurationStore _store; - - private static final UUID QUEUE_ID = new UUID(0,0); - private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1); - private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2); - - @Override - public void setUp() throws Exception - { - super.setUp(); - - - _directExchange = mock(Exchange.class); - when(_directExchange.getType()).thenReturn(DirectExchange.TYPE); - - - _topicExchange = mock(Exchange.class); - when(_topicExchange.getType()).thenReturn(TopicExchange.TYPE); - - AMQQueue queue = mock(AMQQueue.class); - - _vhost = mock(VirtualHost.class); - - ExchangeRegistry exchangeRegistry = mock(ExchangeRegistry.class); - when(exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); - when(exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); - - QueueRegistry queueRegistry = mock(QueueRegistry.class); - when(_vhost.getQueueRegistry()).thenReturn(queueRegistry); - - when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); - - ExchangeFactory exchangeFactory = mock(ExchangeFactory.class); - _virtualHostConfigRecoveryHandler = new VirtualHostConfigRecoveryHandler(_vhost, exchangeRegistry, exchangeFactory); - - _store = mock(DurableConfigurationStore.class); - - CurrentActor.set(mock(LogActor.class)); - } - - public void testUpgradeEmptyStore() throws Exception - { - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); - assertEquals("Did not upgrade to the expected version", CURRENT_CONFIG_VERSION, _virtualHostConfigRecoveryHandler.completeConfigurationRecovery()); - } - - public void testUpgradeNewerStoreFails() throws Exception - { - try - { - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION+1); - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); - fail("Should not be able to start when config model is newer than current"); - } - catch (IllegalStateException e) - { - // pass - } - } - - public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception - { - - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); - - final ConfiguredObjectRecord[] expected = { - new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID)) - }; - - verifyCorrectUpdates(expected); - - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); - } - - - - public void testUpgradeOnlyRemovesSelectorBindings() throws Exception - { - - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo")); - - - UUID customExchangeId = new UUID(3,0); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(2, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", customExchangeId, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo")); - - _virtualHostConfigRecoveryHandler.configuredObject(customExchangeId, - "org.apache.qpid.server.model.Exchange", - createExchange("customExchange", HeadersExchange.TYPE)); - - - - final ConfiguredObjectRecord[] expected = { - new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")), - new ConfiguredObjectRecord(new UUID(3, 0), "org.apache.qpid.server.model.Binding", - createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo")) - }; - - verifyCorrectUpdates(expected); - - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); - } - - - public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception - { - - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); - - final ConfiguredObjectRecord[] expected = { - new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", - createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")) - }; - - verifyCorrectUpdates(expected); - - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); - } - - public void testUpgradeDoesNotRecur() throws Exception - { - - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 1); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); - - doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class)); - - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); - } - - private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException - { - doAnswer(new Answer() - { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable - { - Object[] args = invocation.getArguments(); - assertEquals("Updated records are not as expected", new HashSet(Arrays.asList( - expected)), new HashSet(Arrays.asList(args))); - - return null; - } - }).when(_store).update(any(ConfiguredObjectRecord[].class)); - } - - private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args) - { - Map<String, Object> binding = new LinkedHashMap<String, Object>(); - - binding.put("name", bindingKey); - binding.put(Binding.EXCHANGE, exchangeId.toString()); - binding.put(Binding.QUEUE, queueId.toString()); - Map<String,String> argumentMap = new LinkedHashMap<String, String>(); - if(args != null && args.length != 0) - { - String key = null; - for(String arg : args) - { - if(key == null) - { - key = arg; - } - else - { - argumentMap.put(key, arg); - key = null; - } - } - } - binding.put(Binding.ARGUMENTS, argumentMap); - return binding; - } - - - private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type) - { - Map<String, Object> exchange = new LinkedHashMap<String, Object>(); - - exchange.put(org.apache.qpid.server.model.Exchange.NAME, name); - exchange.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType()); - - return exchange; - - } -} |
