diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-10-27 22:32:29 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-10-27 22:32:29 +0000 |
| commit | 3dba41c0af250e905e1b37d53d9447aa77a2a0fd (patch) | |
| tree | 3fada5a4dedd094b98a761e52118dcf1d3d55a3a /qpid/java/broker-core/src | |
| parent | 4d7e5969c61eb33aed35e5beeea314efa2e954dd (diff) | |
| download | qpid-python-3dba41c0af250e905e1b37d53d9447aa77a2a0fd.tar.gz | |
QPID-5650: Preserve alternate exchange on upgrade of queue with 'dead letter queue'
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1634713 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
2 files changed, 309 insertions, 1 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java index f1372882a8..458216a020 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.store; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -28,6 +29,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.UUID; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; @@ -36,6 +39,7 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class VirtualHostStoreUpgraderAndRecoverer { @@ -346,6 +350,11 @@ public class VirtualHostStoreUpgraderAndRecoverer private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase { + private static final String ARGUMENTS = "arguments"; + private static final String DLQ_ENABLED_ARGUMENT = "x-qpid-dlq-enabled"; + private static final String ALTERNATE_EXCHANGE = "alternateExchange"; + private static final String VIRTUAL_HOST_DLQ_ENABLED = "queue.deadLetterQueueEnabled"; + private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES); private ConfiguredObjectRecord _virtualHostRecord; @@ -372,12 +381,24 @@ public class VirtualHostStoreUpgraderAndRecoverer String name = (String)attributes.get("name"); _missingAmqpExchanges.remove(name); } - getNextUpgrader().configuredObject(record); + getUpdateMap().put(record.getId(), record); } @Override public void complete() { + boolean virtualHostDLQEnabled = Boolean.parseBoolean(String.valueOf(_virtualHostRecord.getAttributes().get(VIRTUAL_HOST_DLQ_ENABLED))); + for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();) + { + Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next(); + ConfiguredObjectRecord record = entry.getValue(); + if ("Queue".equals(record.getType())) + { + record = upgradeQueueRecordIfNecessary(record, virtualHostDLQEnabled); + } + getNextUpgrader().configuredObject(record); + } + for (Entry<String, String> entry : _missingAmqpExchanges.entrySet()) { String name = entry.getKey(); @@ -399,6 +420,54 @@ public class VirtualHostStoreUpgraderAndRecoverer getNextUpgrader().complete(); } + private ConfiguredObjectRecord upgradeQueueRecordIfNecessary(ConfiguredObjectRecord record, boolean _virtualHostDLQEnabled) + { + Map<String, Object> attributes = new LinkedHashMap<>(record.getAttributes()); + boolean queueArgumentDQLEnabledSet = false; + boolean queueDLQEnabled = false; + + if (attributes.get(ARGUMENTS) instanceof Map) + { + Map<String,Object> arguments = (Map<String,Object>)attributes.get(ARGUMENTS); + queueArgumentDQLEnabledSet = arguments.containsKey(DLQ_ENABLED_ARGUMENT); + queueDLQEnabled = queueArgumentDQLEnabledSet ? Boolean.parseBoolean(String.valueOf(arguments.get(DLQ_ENABLED_ARGUMENT))) : false; + } + + if( ((queueArgumentDQLEnabledSet && queueDLQEnabled) || (!queueArgumentDQLEnabledSet && _virtualHostDLQEnabled )) && attributes.get("alternateExchange") == null) + { + Object queueName = attributes.get("name"); + + if (queueName == null || "".equals(queueName)) + { + throw new IllegalConfigurationException("Queue name is not found in queue configuration entry attributes: " + attributes); + } + + String dleSuffix = System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX); + ConfiguredObjectRecord alternateExchange = findConfiguredObjectRecord("Exchange", queueName + dleSuffix); + + if (alternateExchange != null) + { + attributes.put(ALTERNATE_EXCHANGE, alternateExchange.getId()); + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + } + } + return record; + } + + private ConfiguredObjectRecord findConfiguredObjectRecord(String type, String name) + { + Collection<ConfiguredObjectRecord> records = getUpdatedRecords().values(); + for(ConfiguredObjectRecord record: records) + { + if (type.equals(record.getType()) && name.equals(record.getAttributes().get("name"))) + { + return record; + } + } + return null; + } + } private class Upgrader_2_0_to_2_1 extends StoreUpgraderPhase diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java new file mode 100644 index 0000000000..d5123f455b --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java @@ -0,0 +1,239 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import static java.util.Arrays.asList; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.SystemConfig; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost; +import org.apache.qpid.server.virtualhostnode.TestVirtualHostNode; +import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase +{ + private ConfiguredObjectRecord _hostRecord; + private CurrentThreadTaskExecutor _taskExecutor; + private UUID _hostId; + private VirtualHostNode _virtualHostNode; + private DurableConfigurationStore _durableConfigurationStore; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + UUID hostParentId = UUID.randomUUID(); + _hostId = UUID.randomUUID(); + Map<String, Object> hostAttributes = new HashMap<>(); + hostAttributes.put("modelVersion", "0.0"); + hostAttributes.put("name", "test"); + hostAttributes.put("type", TestMemoryVirtualHost.VIRTUAL_HOST_TYPE); + + _hostRecord = mock(ConfiguredObjectRecord.class); + when(_hostRecord.getId()).thenReturn(_hostId); + when(_hostRecord.getAttributes()).thenReturn(hostAttributes); + when(_hostRecord.getType()).thenReturn("VirtualHost"); + when(_hostRecord.toString()).thenReturn("VirtualHost[name='test',id='" + _hostId + "']"); + + _taskExecutor = new CurrentThreadTaskExecutor(); + _taskExecutor.start(); + + SystemConfig<?> systemConfig = mock(SystemConfig.class); + when(systemConfig.getEventLogger()).thenReturn(new EventLogger()); + + Broker<?> broker = mock(Broker.class); + when(broker.getParent(SystemConfig.class)).thenReturn(systemConfig); + when(broker.getTaskExecutor()).thenReturn(_taskExecutor); + when(broker.getModel()).thenReturn(BrokerModel.getInstance()); + + _durableConfigurationStore = mock(DurableConfigurationStore.class); + Map<String,Object> attributes = new HashMap<>(); + attributes.put(VirtualHostNode.ID, hostParentId); + attributes.put(VirtualHostNode.NAME, "test"); + _virtualHostNode = new TestVirtualHostNode(broker, attributes, _durableConfigurationStore); + } + + @Override + public void tearDown()throws Exception + { + super.tearDown(); + _taskExecutor.stopImmediately(); + } + + public void testRecoverQueueWithDLQEnabled() throws Exception + { + ConfiguredObjectRecord queue = mockQueue("test", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "true")); + ConfiguredObjectRecord dlq = mockQueue("test_DLQ", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "false")); + ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout"); + ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle); + ConfiguredObjectRecord directExchange = mock(ConfiguredObjectRecord.class); + when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct", "test")); + ConfiguredObjectRecord queueBinding = mockBinding("test", queue, directExchange); + setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding); + + VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode); + upgraderAndRecoverer.perform(_durableConfigurationStore); + + VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost(); + host.open(); + + assertNotNull("Virtual host is not recovered", host); + Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test"); + assertNotNull("Queue is not recovered", recoveredQueue); + + Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, "test_DLQ"); + assertNotNull("DLQ queue is not recovered", recoveredDLQ); + + Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, "test_DLE"); + assertNotNull("DLE exchange is not recovered", recoveredDLE); + + assertEquals("Unexpected alternative exchange", recoveredDLE, recoveredQueue.getAlternateExchange()); + } + + public void testRecoverQueueWithDLQEnabledOnVirtualHost() throws Exception + { + _hostRecord.getAttributes().put(VirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED, "true"); + + ConfiguredObjectRecord queue = mockQueue("test", null); + ConfiguredObjectRecord dlq = mockQueue("test_DLQ", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "false")); + ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout"); + ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle); + ConfiguredObjectRecord directExchange = mock(ConfiguredObjectRecord.class); + when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct", "test")); + ConfiguredObjectRecord queueBinding = mockBinding("test", queue, directExchange); + setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding); + + VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode); + upgraderAndRecoverer.perform(_durableConfigurationStore); + + VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost(); + host.open(); + + assertNotNull("Virtual host is not recovered", host); + Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test"); + assertNotNull("Queue is not recovered", recoveredQueue); + + Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, "test_DLQ"); + assertNotNull("DLQ queue is not recovered", recoveredDLQ); + + Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, "test_DLE"); + assertNotNull("DLE exchange is not recovered", recoveredDLE); + + assertEquals("Unexpected alternative exchange", recoveredDLE, recoveredQueue.getAlternateExchange()); + } + + private ConfiguredObjectRecord mockBinding(String bindingName, ConfiguredObjectRecord queue, ConfiguredObjectRecord exchange) + { + ConfiguredObjectRecord binding = mock(ConfiguredObjectRecord.class); + when(binding.getId()).thenReturn(UUID.randomUUID()); + when(binding.getType()).thenReturn("org.apache.qpid.server.model.Binding"); + Map<String,UUID> parents = new HashMap<>(); + parents.put("Queue", queue.getId()); + parents.put("Exchange", exchange.getId()); + when(binding.getParents()).thenReturn(parents); + when(binding.toString()).thenReturn("Binding[" + bindingName + "]"); + + Map<String, Object> attributes = new HashMap<>(); + attributes.put("durable", true); + attributes.put("name", bindingName); + when(binding.getAttributes()).thenReturn(attributes); + return binding; + } + + private ConfiguredObjectRecord mockExchange(String exchangeName, String exchangeType) + { + ConfiguredObjectRecord exchange = mock(ConfiguredObjectRecord.class); + when(exchange.getId()).thenReturn(UUID.randomUUID()); + when(exchange.getType()).thenReturn("org.apache.qpid.server.model.Exchange"); + when(exchange.getParents()).thenReturn(Collections.singletonMap("VirtualHost", _hostId)); + when(exchange.toString()).thenReturn("Exchange[" + exchangeName + "]"); + + Map<String, Object> attributes = new HashMap<>(); + attributes.put("type", exchangeType); + attributes.put("durable", true); + attributes.put("name", exchangeName); + when(exchange.getAttributes()).thenReturn(attributes); + return exchange; + } + + private ConfiguredObjectRecord mockQueue(String queueName, Map<String, Object> arguments) + { + ConfiguredObjectRecord queue = mock(ConfiguredObjectRecord.class); + when(queue.getId()).thenReturn(UUID.randomUUID()); + when(queue.getType()).thenReturn("org.apache.qpid.server.model.Queue"); + when(queue.getParents()).thenReturn(Collections.singletonMap("VirtualHost", _hostId)); + when(queue.toString()).thenReturn("Queue[" + queueName + "]"); + + Map<String, Object> attributes = new HashMap<>(); + attributes.put("durable", true); + attributes.put("name", queueName); + if (arguments != null) + { + attributes.put("arguments", arguments); + } + when(queue.getAttributes()).thenReturn(attributes); + return queue; + } + + + private void setUpVisit(final ConfiguredObjectRecord... records) + { + doAnswer(new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + Iterator<ConfiguredObjectRecord> iterator = asList(records).iterator(); + ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0]; + handler.begin(); + boolean handlerContinue = true; + while(iterator.hasNext() && handlerContinue) + { + handlerContinue = handler.handle(iterator.next()); + } + handler.end(); + return null; + } + }).when(_durableConfigurationStore).visitConfiguredObjectRecords(any(ConfiguredObjectRecordHandler.class)); + } +} |
