summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-08-15 16:42:39 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-08-15 16:42:39 +0000
commit7057688d9214cffd217781db3c51abef5e227c93 (patch)
treeaf52519ecd8844b7061ae442c84dec1f83bd45ae /qpid/java/broker/src/test
parentf203ee690d73b8f6ff19ba8b4f3f39808a1eddde (diff)
downloadqpid-python-7057688d9214cffd217781db3c51abef5e227c93.tar.gz
QPID-5073 : [Java Broker] Refactor DurableConfigurationStore recovery to allow for additional configured object children other than just Exchange/Binding/Queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1514360 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java21
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java376
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java266
4 files changed, 391 insertions, 275 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
index 65fd249d03..3d43ef0f44 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
@@ -37,7 +37,8 @@ public class MessageStoreLogSubjectTest extends AbstractTestLogSubject
_testVhost = BrokerTestHelper.createVirtualHost("test");
- _subject = new MessageStoreLogSubject(_testVhost, _testVhost.getMessageStore().getClass().getSimpleName());
+ _subject = new MessageStoreLogSubject(_testVhost.getName(),
+ _testVhost.getMessageStore().getClass().getSimpleName());
}
@Override
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index ab1a0f7d0c..67cf0780da 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -56,6 +56,11 @@ import org.apache.qpid.util.FileUtils;
public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase
{
private static final String EXCHANGE_NAME = "exchangeName";
+
+ private static final String EXCHANGE = org.apache.qpid.server.model.Exchange.class.getSimpleName();
+ private static final String BINDING = org.apache.qpid.server.model.Binding.class.getSimpleName();
+ private static final String QUEUE = Queue.class.getSimpleName();
+
private String _storePath;
private String _storeName;
private MessageStore _messageStore;
@@ -134,7 +139,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
reopenStore();
- verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(org.apache.qpid.server.model.Exchange.class.getName()),
+ verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE),
eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(),
org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString())));
@@ -186,7 +191,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY);
map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,FieldTable.convertToMap(_bindingArgs));
- verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(org.apache.qpid.server.model.Binding.class.getName()),
+ verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(BINDING),
eq(map));
}
@@ -201,7 +206,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
reopenStore();
verify(_recoveryHandler, never()).configuredObject(any(UUID.class),
- eq(org.apache.qpid.server.model.Binding.class.getName()),
+ eq(BINDING),
anyMap());
}
@@ -215,7 +220,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
public void testCreateQueueAMQQueueFieldTable() throws Exception
@@ -238,7 +243,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
queueAttributes.put(Queue.ARGUMENTS, attributes);
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
@@ -256,7 +261,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
private Exchange createTestAlternateExchange()
@@ -292,7 +297,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
queueAttributes.put(Queue.ARGUMENTS, attributes);
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
@@ -323,7 +328,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.ARGUMENTS, attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
public void testRemoveQueue() throws Exception
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
new file mode 100644
index 0000000000..ccc7f6a697
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -0,0 +1,376 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.HeadersExchange;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
+
+public class DurableConfigurationRecovererTest extends QpidTestCase
+{
+ private static final UUID QUEUE_ID = new UUID(0,0);
+ private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1);
+ private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2);
+ private static final String CUSTOM_EXCHANGE_NAME = "customExchange";
+
+ private DurableConfigurationRecoverer _durableConfigurationRecoverer;
+ private Exchange _directExchange;
+ private Exchange _topicExchange;
+ private VirtualHost _vhost;
+ private DurableConfigurationStore _store;
+ private ExchangeFactory _exchangeFactory;
+ private ExchangeRegistry _exchangeRegistry;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+
+ _directExchange = mock(Exchange.class);
+ when(_directExchange.getType()).thenReturn(DirectExchange.TYPE);
+
+
+ _topicExchange = mock(Exchange.class);
+ when(_topicExchange.getType()).thenReturn(TopicExchange.TYPE);
+
+ AMQQueue queue = mock(AMQQueue.class);
+
+ _vhost = mock(VirtualHost.class);
+
+ _exchangeRegistry = mock(ExchangeRegistry.class);
+ when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
+ when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
+
+ QueueRegistry queueRegistry = mock(QueueRegistry.class);
+ when(_vhost.getQueueRegistry()).thenReturn(queueRegistry);
+
+ when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
+
+ _exchangeFactory = mock(ExchangeFactory.class);
+
+ DurableConfiguredObjectRecoverer[] recoverers = {
+ new QueueRecoverer(_vhost, _exchangeRegistry),
+ new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory),
+ new BindingRecoverer(_vhost, _exchangeRegistry)
+ };
+
+ final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>();
+ for(DurableConfiguredObjectRecoverer recoverer : recoverers)
+ {
+ recovererMap.put(recoverer.getType(), recoverer);
+ }
+ _durableConfigurationRecoverer =
+ new DurableConfigurationRecoverer(_vhost.getName(), recovererMap,
+ new DefaultUpgraderProvider(_vhost, _exchangeRegistry));
+
+ _store = mock(DurableConfigurationStore.class);
+
+ CurrentActor.set(mock(LogActor.class));
+ }
+
+ public void testUpgradeEmptyStore() throws Exception
+ {
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
+ assertEquals("Did not upgrade to the expected version",
+ CURRENT_CONFIG_VERSION,
+ _durableConfigurationRecoverer.completeConfigurationRecovery());
+ }
+
+ public void testUpgradeNewerStoreFails() throws Exception
+ {
+ try
+ {
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION + 1);
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ fail("Should not be able to start when config model is newer than current");
+ }
+ catch (IllegalStateException e)
+ {
+ // pass
+ }
+ }
+
+ public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception
+ {
+
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key",
+ DIRECT_EXCHANGE_ID,
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble"));
+
+ final ConfiguredObjectRecord[] expected = {
+ new ConfiguredObjectRecord(new UUID(1, 0), "Binding",
+ createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID))
+ };
+
+ verifyCorrectUpdates(expected);
+
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ }
+
+
+
+ public void testUpgradeOnlyRemovesSelectorBindings() throws Exception
+ {
+
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key",
+ DIRECT_EXCHANGE_ID,
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble",
+ "not-a-selector",
+ "moo"));
+
+
+ final UUID customExchangeId = new UUID(3,0);
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(2, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key",
+ customExchangeId,
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble",
+ "not-a-selector",
+ "moo"));
+
+ _durableConfigurationRecoverer.configuredObject(customExchangeId,
+ "org.apache.qpid.server.model.Exchange",
+ createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE));
+
+ final Exchange customExchange = mock(Exchange.class);
+
+ when(_exchangeFactory.createExchange(eq(customExchangeId),
+ eq(CUSTOM_EXCHANGE_NAME),
+ eq(HeadersExchange.TYPE.getType()),
+ anyBoolean(),
+ anyBoolean())).thenReturn(customExchange);
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ when(_exchangeRegistry.getExchange(eq(customExchangeId))).thenReturn(customExchange);
+ return null;
+ }
+ }).when(_exchangeRegistry).registerExchange(customExchange);
+
+ final ConfiguredObjectRecord[] expected = {
+ new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
+ createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")),
+ new ConfiguredObjectRecord(new UUID(2, 0), "org.apache.qpid.server.model.Binding",
+ createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo"))
+ };
+
+ verifyCorrectUpdates(expected);
+
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ }
+
+
+ public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception
+ {
+
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key",
+ TOPIC_EXCHANGE_ID,
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble"));
+
+ final ConfiguredObjectRecord[] expected = {
+ new ConfiguredObjectRecord(new UUID(1, 0), "Binding",
+ createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"))
+ };
+
+ verifyCorrectUpdates(expected);
+
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ }
+
+ public void testUpgradeDoesNotRecur() throws Exception
+ {
+
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "Binding",
+ createBinding("key",
+ DIRECT_EXCHANGE_ID,
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble"));
+
+ doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class));
+
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ }
+
+ public void testFailsWithUnresolvedObjects()
+ {
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
+
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "Binding",
+ createBinding("key",
+ new UUID(3,0),
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble"));
+
+ try
+ {
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ fail("Expected resolution to fail due to unknown object");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ assertEquals("Durable configuration has unresolved dependencies", e.getMessage());
+ }
+
+ }
+
+ public void testFailsWithUnknownObjectType()
+ {
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
+
+
+ try
+ {
+ final Map<String, Object> emptyArguments = Collections.emptyMap();
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "Wibble", emptyArguments);
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ fail("Expected resolution to fail due to unknown object type");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ assertEquals("Unkown type for configured object: Wibble", e.getMessage());
+ }
+
+
+ }
+
+ private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException
+ {
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ Object[] args = invocation.getArguments();
+ assertEquals("Updated records are not as expected", new HashSet(Arrays.asList(
+ expected)), new HashSet(Arrays.asList(args)));
+
+ return null;
+ }
+ }).when(_store).update(any(ConfiguredObjectRecord[].class));
+ }
+
+ private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args)
+ {
+ Map<String, Object> binding = new LinkedHashMap<String, Object>();
+
+ binding.put("name", bindingKey);
+ binding.put(Binding.EXCHANGE, exchangeId.toString());
+ binding.put(Binding.QUEUE, queueId.toString());
+ Map<String,String> argumentMap = new LinkedHashMap<String, String>();
+ if(args != null && args.length != 0)
+ {
+ String key = null;
+ for(String arg : args)
+ {
+ if(key == null)
+ {
+ key = arg;
+ }
+ else
+ {
+ argumentMap.put(key, arg);
+ key = null;
+ }
+ }
+ }
+ binding.put(Binding.ARGUMENTS, argumentMap);
+ return binding;
+ }
+
+
+ private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type)
+ {
+ Map<String, Object> exchange = new LinkedHashMap<String, Object>();
+
+ exchange.put(org.apache.qpid.server.model.Exchange.NAME, name);
+ exchange.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType());
+
+ return exchange;
+
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java
deleted file mode 100644
index ac81f5d625..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.HeadersExchange;
-import org.apache.qpid.server.exchange.TopicExchange;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
-
-public class VirtualHostConfigRecoveryHandlerTest extends QpidTestCase
-{
- private Exchange _directExchange;
- private Exchange _topicExchange;
- private VirtualHost _vhost;
- private VirtualHostConfigRecoveryHandler _virtualHostConfigRecoveryHandler;
- private DurableConfigurationStore _store;
-
- private static final UUID QUEUE_ID = new UUID(0,0);
- private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1);
- private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2);
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
-
-
- _directExchange = mock(Exchange.class);
- when(_directExchange.getType()).thenReturn(DirectExchange.TYPE);
-
-
- _topicExchange = mock(Exchange.class);
- when(_topicExchange.getType()).thenReturn(TopicExchange.TYPE);
-
- AMQQueue queue = mock(AMQQueue.class);
-
- _vhost = mock(VirtualHost.class);
-
- ExchangeRegistry exchangeRegistry = mock(ExchangeRegistry.class);
- when(exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
- when(exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
-
- QueueRegistry queueRegistry = mock(QueueRegistry.class);
- when(_vhost.getQueueRegistry()).thenReturn(queueRegistry);
-
- when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
-
- ExchangeFactory exchangeFactory = mock(ExchangeFactory.class);
- _virtualHostConfigRecoveryHandler = new VirtualHostConfigRecoveryHandler(_vhost, exchangeRegistry, exchangeFactory);
-
- _store = mock(DurableConfigurationStore.class);
-
- CurrentActor.set(mock(LogActor.class));
- }
-
- public void testUpgradeEmptyStore() throws Exception
- {
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
- assertEquals("Did not upgrade to the expected version", CURRENT_CONFIG_VERSION, _virtualHostConfigRecoveryHandler.completeConfigurationRecovery());
- }
-
- public void testUpgradeNewerStoreFails() throws Exception
- {
- try
- {
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION+1);
- _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
- fail("Should not be able to start when config model is newer than current");
- }
- catch (IllegalStateException e)
- {
- // pass
- }
- }
-
- public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception
- {
-
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
-
- _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
- "org.apache.qpid.server.model.Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"));
-
- final ConfiguredObjectRecord[] expected = {
- new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID))
- };
-
- verifyCorrectUpdates(expected);
-
- _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
- }
-
-
-
- public void testUpgradeOnlyRemovesSelectorBindings() throws Exception
- {
-
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
-
- _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
- "org.apache.qpid.server.model.Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo"));
-
-
- UUID customExchangeId = new UUID(3,0);
-
- _virtualHostConfigRecoveryHandler.configuredObject(new UUID(2, 0),
- "org.apache.qpid.server.model.Binding",
- createBinding("key", customExchangeId, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo"));
-
- _virtualHostConfigRecoveryHandler.configuredObject(customExchangeId,
- "org.apache.qpid.server.model.Exchange",
- createExchange("customExchange", HeadersExchange.TYPE));
-
-
-
- final ConfiguredObjectRecord[] expected = {
- new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")),
- new ConfiguredObjectRecord(new UUID(3, 0), "org.apache.qpid.server.model.Binding",
- createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo"))
- };
-
- verifyCorrectUpdates(expected);
-
- _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
- }
-
-
- public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception
- {
-
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
-
- _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
- "org.apache.qpid.server.model.Binding",
- createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"));
-
- final ConfiguredObjectRecord[] expected = {
- new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
- createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"))
- };
-
- verifyCorrectUpdates(expected);
-
- _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
- }
-
- public void testUpgradeDoesNotRecur() throws Exception
- {
-
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 1);
-
- _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
- "org.apache.qpid.server.model.Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"));
-
- doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class));
-
- _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
- }
-
- private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException
- {
- doAnswer(new Answer()
- {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable
- {
- Object[] args = invocation.getArguments();
- assertEquals("Updated records are not as expected", new HashSet(Arrays.asList(
- expected)), new HashSet(Arrays.asList(args)));
-
- return null;
- }
- }).when(_store).update(any(ConfiguredObjectRecord[].class));
- }
-
- private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args)
- {
- Map<String, Object> binding = new LinkedHashMap<String, Object>();
-
- binding.put("name", bindingKey);
- binding.put(Binding.EXCHANGE, exchangeId.toString());
- binding.put(Binding.QUEUE, queueId.toString());
- Map<String,String> argumentMap = new LinkedHashMap<String, String>();
- if(args != null && args.length != 0)
- {
- String key = null;
- for(String arg : args)
- {
- if(key == null)
- {
- key = arg;
- }
- else
- {
- argumentMap.put(key, arg);
- key = null;
- }
- }
- }
- binding.put(Binding.ARGUMENTS, argumentMap);
- return binding;
- }
-
-
- private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type)
- {
- Map<String, Object> exchange = new LinkedHashMap<String, Object>();
-
- exchange.put(org.apache.qpid.server.model.Exchange.NAME, name);
- exchange.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType());
-
- return exchange;
-
- }
-}