summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-10-27 22:32:29 +0000
committerAlex Rudyy <orudyy@apache.org>2014-10-27 22:32:29 +0000
commit3dba41c0af250e905e1b37d53d9447aa77a2a0fd (patch)
tree3fada5a4dedd094b98a761e52118dcf1d3d55a3a /qpid/java/broker-core/src
parent4d7e5969c61eb33aed35e5beeea314efa2e954dd (diff)
downloadqpid-python-3dba41c0af250e905e1b37d53d9447aa77a2a0fd.tar.gz
QPID-5650: Preserve alternate exchange on upgrade of queue with 'dead letter queue'
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1634713 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java71
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java239
2 files changed, 309 insertions, 1 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index f1372882a8..458216a020 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -28,6 +29,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
@@ -36,6 +39,7 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class VirtualHostStoreUpgraderAndRecoverer
{
@@ -346,6 +350,11 @@ public class VirtualHostStoreUpgraderAndRecoverer
private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase
{
+ private static final String ARGUMENTS = "arguments";
+ private static final String DLQ_ENABLED_ARGUMENT = "x-qpid-dlq-enabled";
+ private static final String ALTERNATE_EXCHANGE = "alternateExchange";
+ private static final String VIRTUAL_HOST_DLQ_ENABLED = "queue.deadLetterQueueEnabled";
+
private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
private ConfiguredObjectRecord _virtualHostRecord;
@@ -372,12 +381,24 @@ public class VirtualHostStoreUpgraderAndRecoverer
String name = (String)attributes.get("name");
_missingAmqpExchanges.remove(name);
}
- getNextUpgrader().configuredObject(record);
+ getUpdateMap().put(record.getId(), record);
}
@Override
public void complete()
{
+ boolean virtualHostDLQEnabled = Boolean.parseBoolean(String.valueOf(_virtualHostRecord.getAttributes().get(VIRTUAL_HOST_DLQ_ENABLED)));
+ for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
+ {
+ Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
+ ConfiguredObjectRecord record = entry.getValue();
+ if ("Queue".equals(record.getType()))
+ {
+ record = upgradeQueueRecordIfNecessary(record, virtualHostDLQEnabled);
+ }
+ getNextUpgrader().configuredObject(record);
+ }
+
for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
{
String name = entry.getKey();
@@ -399,6 +420,54 @@ public class VirtualHostStoreUpgraderAndRecoverer
getNextUpgrader().complete();
}
+ private ConfiguredObjectRecord upgradeQueueRecordIfNecessary(ConfiguredObjectRecord record, boolean _virtualHostDLQEnabled)
+ {
+ Map<String, Object> attributes = new LinkedHashMap<>(record.getAttributes());
+ boolean queueArgumentDQLEnabledSet = false;
+ boolean queueDLQEnabled = false;
+
+ if (attributes.get(ARGUMENTS) instanceof Map)
+ {
+ Map<String,Object> arguments = (Map<String,Object>)attributes.get(ARGUMENTS);
+ queueArgumentDQLEnabledSet = arguments.containsKey(DLQ_ENABLED_ARGUMENT);
+ queueDLQEnabled = queueArgumentDQLEnabledSet ? Boolean.parseBoolean(String.valueOf(arguments.get(DLQ_ENABLED_ARGUMENT))) : false;
+ }
+
+ if( ((queueArgumentDQLEnabledSet && queueDLQEnabled) || (!queueArgumentDQLEnabledSet && _virtualHostDLQEnabled )) && attributes.get("alternateExchange") == null)
+ {
+ Object queueName = attributes.get("name");
+
+ if (queueName == null || "".equals(queueName))
+ {
+ throw new IllegalConfigurationException("Queue name is not found in queue configuration entry attributes: " + attributes);
+ }
+
+ String dleSuffix = System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
+ ConfiguredObjectRecord alternateExchange = findConfiguredObjectRecord("Exchange", queueName + dleSuffix);
+
+ if (alternateExchange != null)
+ {
+ attributes.put(ALTERNATE_EXCHANGE, alternateExchange.getId());
+ record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
+ }
+ }
+ return record;
+ }
+
+ private ConfiguredObjectRecord findConfiguredObjectRecord(String type, String name)
+ {
+ Collection<ConfiguredObjectRecord> records = getUpdatedRecords().values();
+ for(ConfiguredObjectRecord record: records)
+ {
+ if (type.equals(record.getType()) && name.equals(record.getAttributes().get("name")))
+ {
+ return record;
+ }
+ }
+ return null;
+ }
+
}
private class Upgrader_2_0_to_2_1 extends StoreUpgraderPhase
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
new file mode 100644
index 0000000000..d5123f455b
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import static java.util.Arrays.asList;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.SystemConfig;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
+import org.apache.qpid.server.virtualhostnode.TestVirtualHostNode;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
+{
+ private ConfiguredObjectRecord _hostRecord;
+ private CurrentThreadTaskExecutor _taskExecutor;
+ private UUID _hostId;
+ private VirtualHostNode _virtualHostNode;
+ private DurableConfigurationStore _durableConfigurationStore;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ UUID hostParentId = UUID.randomUUID();
+ _hostId = UUID.randomUUID();
+ Map<String, Object> hostAttributes = new HashMap<>();
+ hostAttributes.put("modelVersion", "0.0");
+ hostAttributes.put("name", "test");
+ hostAttributes.put("type", TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+
+ _hostRecord = mock(ConfiguredObjectRecord.class);
+ when(_hostRecord.getId()).thenReturn(_hostId);
+ when(_hostRecord.getAttributes()).thenReturn(hostAttributes);
+ when(_hostRecord.getType()).thenReturn("VirtualHost");
+ when(_hostRecord.toString()).thenReturn("VirtualHost[name='test',id='" + _hostId + "']");
+
+ _taskExecutor = new CurrentThreadTaskExecutor();
+ _taskExecutor.start();
+
+ SystemConfig<?> systemConfig = mock(SystemConfig.class);
+ when(systemConfig.getEventLogger()).thenReturn(new EventLogger());
+
+ Broker<?> broker = mock(Broker.class);
+ when(broker.getParent(SystemConfig.class)).thenReturn(systemConfig);
+ when(broker.getTaskExecutor()).thenReturn(_taskExecutor);
+ when(broker.getModel()).thenReturn(BrokerModel.getInstance());
+
+ _durableConfigurationStore = mock(DurableConfigurationStore.class);
+ Map<String,Object> attributes = new HashMap<>();
+ attributes.put(VirtualHostNode.ID, hostParentId);
+ attributes.put(VirtualHostNode.NAME, "test");
+ _virtualHostNode = new TestVirtualHostNode(broker, attributes, _durableConfigurationStore);
+ }
+
+ @Override
+ public void tearDown()throws Exception
+ {
+ super.tearDown();
+ _taskExecutor.stopImmediately();
+ }
+
+ public void testRecoverQueueWithDLQEnabled() throws Exception
+ {
+ ConfiguredObjectRecord queue = mockQueue("test", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "true"));
+ ConfiguredObjectRecord dlq = mockQueue("test_DLQ", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "false"));
+ ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout");
+ ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle);
+ ConfiguredObjectRecord directExchange = mock(ConfiguredObjectRecord.class);
+ when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct", "test"));
+ ConfiguredObjectRecord queueBinding = mockBinding("test", queue, directExchange);
+ setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding);
+
+ VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
+ upgraderAndRecoverer.perform(_durableConfigurationStore);
+
+ VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost();
+ host.open();
+
+ assertNotNull("Virtual host is not recovered", host);
+ Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test");
+ assertNotNull("Queue is not recovered", recoveredQueue);
+
+ Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, "test_DLQ");
+ assertNotNull("DLQ queue is not recovered", recoveredDLQ);
+
+ Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, "test_DLE");
+ assertNotNull("DLE exchange is not recovered", recoveredDLE);
+
+ assertEquals("Unexpected alternative exchange", recoveredDLE, recoveredQueue.getAlternateExchange());
+ }
+
+ public void testRecoverQueueWithDLQEnabledOnVirtualHost() throws Exception
+ {
+ _hostRecord.getAttributes().put(VirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED, "true");
+
+ ConfiguredObjectRecord queue = mockQueue("test", null);
+ ConfiguredObjectRecord dlq = mockQueue("test_DLQ", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "false"));
+ ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout");
+ ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle);
+ ConfiguredObjectRecord directExchange = mock(ConfiguredObjectRecord.class);
+ when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct", "test"));
+ ConfiguredObjectRecord queueBinding = mockBinding("test", queue, directExchange);
+ setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding);
+
+ VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
+ upgraderAndRecoverer.perform(_durableConfigurationStore);
+
+ VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost();
+ host.open();
+
+ assertNotNull("Virtual host is not recovered", host);
+ Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test");
+ assertNotNull("Queue is not recovered", recoveredQueue);
+
+ Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, "test_DLQ");
+ assertNotNull("DLQ queue is not recovered", recoveredDLQ);
+
+ Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, "test_DLE");
+ assertNotNull("DLE exchange is not recovered", recoveredDLE);
+
+ assertEquals("Unexpected alternative exchange", recoveredDLE, recoveredQueue.getAlternateExchange());
+ }
+
+ private ConfiguredObjectRecord mockBinding(String bindingName, ConfiguredObjectRecord queue, ConfiguredObjectRecord exchange)
+ {
+ ConfiguredObjectRecord binding = mock(ConfiguredObjectRecord.class);
+ when(binding.getId()).thenReturn(UUID.randomUUID());
+ when(binding.getType()).thenReturn("org.apache.qpid.server.model.Binding");
+ Map<String,UUID> parents = new HashMap<>();
+ parents.put("Queue", queue.getId());
+ parents.put("Exchange", exchange.getId());
+ when(binding.getParents()).thenReturn(parents);
+ when(binding.toString()).thenReturn("Binding[" + bindingName + "]");
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("durable", true);
+ attributes.put("name", bindingName);
+ when(binding.getAttributes()).thenReturn(attributes);
+ return binding;
+ }
+
+ private ConfiguredObjectRecord mockExchange(String exchangeName, String exchangeType)
+ {
+ ConfiguredObjectRecord exchange = mock(ConfiguredObjectRecord.class);
+ when(exchange.getId()).thenReturn(UUID.randomUUID());
+ when(exchange.getType()).thenReturn("org.apache.qpid.server.model.Exchange");
+ when(exchange.getParents()).thenReturn(Collections.singletonMap("VirtualHost", _hostId));
+ when(exchange.toString()).thenReturn("Exchange[" + exchangeName + "]");
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("type", exchangeType);
+ attributes.put("durable", true);
+ attributes.put("name", exchangeName);
+ when(exchange.getAttributes()).thenReturn(attributes);
+ return exchange;
+ }
+
+ private ConfiguredObjectRecord mockQueue(String queueName, Map<String, Object> arguments)
+ {
+ ConfiguredObjectRecord queue = mock(ConfiguredObjectRecord.class);
+ when(queue.getId()).thenReturn(UUID.randomUUID());
+ when(queue.getType()).thenReturn("org.apache.qpid.server.model.Queue");
+ when(queue.getParents()).thenReturn(Collections.singletonMap("VirtualHost", _hostId));
+ when(queue.toString()).thenReturn("Queue[" + queueName + "]");
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("durable", true);
+ attributes.put("name", queueName);
+ if (arguments != null)
+ {
+ attributes.put("arguments", arguments);
+ }
+ when(queue.getAttributes()).thenReturn(attributes);
+ return queue;
+ }
+
+
+ private void setUpVisit(final ConfiguredObjectRecord... records)
+ {
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ Iterator<ConfiguredObjectRecord> iterator = asList(records).iterator();
+ ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0];
+ handler.begin();
+ boolean handlerContinue = true;
+ while(iterator.hasNext() && handlerContinue)
+ {
+ handlerContinue = handler.handle(iterator.next());
+ }
+ handler.end();
+ return null;
+ }
+ }).when(_durableConfigurationStore).visitConfiguredObjectRecords(any(ConfiguredObjectRecordHandler.class));
+ }
+}