diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-10-28 19:37:46 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-10-28 19:37:46 +0000 |
| commit | 3e89910f94daefaa565f7b21e4989e44e735b25b (patch) | |
| tree | 48973d444377305331aa9f05d9d8e92e5adf432d | |
| parent | c1edd56bd54dbcb6cb9a1fb1cafe04549682faf0 (diff) | |
| download | qpid-python-3e89910f94daefaa565f7b21e4989e44e735b25b.tar.gz | |
QPID-5650: Set alternate exchange only when queue creation argument 'x-qpid-dlq-enabled' is set
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1634957 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 61 insertions, 68 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java index 458216a020..313e8d1d0c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.store; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -39,6 +38,7 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class VirtualHostStoreUpgraderAndRecoverer @@ -350,14 +350,14 @@ public class VirtualHostStoreUpgraderAndRecoverer private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase { - private static final String ARGUMENTS = "arguments"; - private static final String DLQ_ENABLED_ARGUMENT = "x-qpid-dlq-enabled"; private static final String ALTERNATE_EXCHANGE = "alternateExchange"; - private static final String VIRTUAL_HOST_DLQ_ENABLED = "queue.deadLetterQueueEnabled"; private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES); private ConfiguredObjectRecord _virtualHostRecord; + private Map<UUID, String> _queuesMissingAlternateExchange = new HashMap<>(); + private Map<String, ConfiguredObjectRecord> _exchanges = new HashMap<>(); + public Upgrader_0_4_to_2_0() { super("modelVersion", "0.4", "2.0"); @@ -380,23 +380,30 @@ public class VirtualHostStoreUpgraderAndRecoverer Map<String, Object> attributes = record.getAttributes(); String name = (String)attributes.get("name"); _missingAmqpExchanges.remove(name); + _exchanges.put(name, record); } - getUpdateMap().put(record.getId(), record); + else if("Queue".equals(record.getType())) + { + record = updateQueueRecordIfNecessary(record); + } + getNextUpgrader().configuredObject(record); } @Override public void complete() { - boolean virtualHostDLQEnabled = Boolean.parseBoolean(String.valueOf(_virtualHostRecord.getAttributes().get(VIRTUAL_HOST_DLQ_ENABLED))); - for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();) + for (UUID queueId : _queuesMissingAlternateExchange.keySet()) { - Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next(); - ConfiguredObjectRecord record = entry.getValue(); - if ("Queue".equals(record.getType())) + ConfiguredObjectRecord record = getUpdateMap().get(queueId); + if (record != null) { - record = upgradeQueueRecordIfNecessary(record, virtualHostDLQEnabled); + String dleExchangeName = _queuesMissingAlternateExchange.get(queueId); + ConfiguredObjectRecord alternateExchange = _exchanges.get(dleExchangeName); + if (alternateExchange != null) + { + setAlternateExchangeAttribute(record, alternateExchange); + } } - getNextUpgrader().configuredObject(record); } for (Entry<String, String> entry : _missingAmqpExchanges.entrySet()) @@ -420,45 +427,51 @@ public class VirtualHostStoreUpgraderAndRecoverer getNextUpgrader().complete(); } - private ConfiguredObjectRecord upgradeQueueRecordIfNecessary(ConfiguredObjectRecord record, boolean _virtualHostDLQEnabled) + private ConfiguredObjectRecord updateQueueRecordIfNecessary(ConfiguredObjectRecord record) { - Map<String, Object> attributes = new LinkedHashMap<>(record.getAttributes()); - boolean queueArgumentDQLEnabledSet = false; - boolean queueDLQEnabled = false; - - if (attributes.get(ARGUMENTS) instanceof Map) - { - Map<String,Object> arguments = (Map<String,Object>)attributes.get(ARGUMENTS); - queueArgumentDQLEnabledSet = arguments.containsKey(DLQ_ENABLED_ARGUMENT); - queueDLQEnabled = queueArgumentDQLEnabledSet ? Boolean.parseBoolean(String.valueOf(arguments.get(DLQ_ENABLED_ARGUMENT))) : false; - } - - if( ((queueArgumentDQLEnabledSet && queueDLQEnabled) || (!queueArgumentDQLEnabledSet && _virtualHostDLQEnabled )) && attributes.get("alternateExchange") == null) + Map<String, Object> attributes = record.getAttributes(); + boolean queueDLQEnabled = Boolean.parseBoolean(String.valueOf(attributes.get(AbstractVirtualHost.CREATE_DLQ_ON_CREATION))); + if(queueDLQEnabled && attributes.get(ALTERNATE_EXCHANGE) == null) { Object queueName = attributes.get("name"); - if (queueName == null || "".equals(queueName)) { throw new IllegalConfigurationException("Queue name is not found in queue configuration entry attributes: " + attributes); } String dleSuffix = System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX); - ConfiguredObjectRecord alternateExchange = findConfiguredObjectRecord("Exchange", queueName + dleSuffix); + String dleExchangeName = queueName + dleSuffix; - if (alternateExchange != null) + ConfiguredObjectRecord exchangeRecord = findConfiguredObjectRecordInUpdateMap("Exchange", dleExchangeName); + if (exchangeRecord == null) { - attributes.put(ALTERNATE_EXCHANGE, alternateExchange.getId()); - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents()); - getUpdateMap().put(record.getId(), record); + // add record to update Map if it is not there + if (!getUpdateMap().containsKey(record.getId())) + { + getUpdateMap().put(record.getId(), record); + } + _queuesMissingAlternateExchange.put(record.getId(), dleExchangeName); + } + else + { + record = setAlternateExchangeAttribute(record, exchangeRecord); } } return record; } - private ConfiguredObjectRecord findConfiguredObjectRecord(String type, String name) + private ConfiguredObjectRecord setAlternateExchangeAttribute(ConfiguredObjectRecord record, ConfiguredObjectRecord alternateExchange) + { + Map<String, Object> attributes = new LinkedHashMap<>(record.getAttributes()); + attributes.put(ALTERNATE_EXCHANGE, alternateExchange.getId()); + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + return record; + } + + private ConfiguredObjectRecord findConfiguredObjectRecordInUpdateMap(String type, String name) { - Collection<ConfiguredObjectRecord> records = getUpdatedRecords().values(); - for(ConfiguredObjectRecord record: records) + for(ConfiguredObjectRecord record: getUpdateMap().values()) { if (type.equals(record.getType()) && name.equals(record.getAttributes().get("name"))) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java index d5123f455b..1b13bcbdc6 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.security.PrivilegedAction; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -49,6 +50,8 @@ import org.apache.qpid.test.utils.QpidTestCase; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.security.auth.Subject; + public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase { private ConfiguredObjectRecord _hostRecord; @@ -114,40 +117,17 @@ public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode); upgraderAndRecoverer.perform(_durableConfigurationStore); - VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost(); - host.open(); - - assertNotNull("Virtual host is not recovered", host); - Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test"); - assertNotNull("Queue is not recovered", recoveredQueue); - - Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, "test_DLQ"); - assertNotNull("DLQ queue is not recovered", recoveredDLQ); - - Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, "test_DLE"); - assertNotNull("DLE exchange is not recovered", recoveredDLE); - - assertEquals("Unexpected alternative exchange", recoveredDLE, recoveredQueue.getAlternateExchange()); - } - - public void testRecoverQueueWithDLQEnabledOnVirtualHost() throws Exception - { - _hostRecord.getAttributes().put(VirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED, "true"); - - ConfiguredObjectRecord queue = mockQueue("test", null); - ConfiguredObjectRecord dlq = mockQueue("test_DLQ", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "false")); - ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout"); - ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle); - ConfiguredObjectRecord directExchange = mock(ConfiguredObjectRecord.class); - when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct", "test")); - ConfiguredObjectRecord queueBinding = mockBinding("test", queue, directExchange); - setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding); - - VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode); - upgraderAndRecoverer.perform(_durableConfigurationStore); - - VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost(); - host.open(); + final VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost(); + Subject.doAs(org.apache.qpid.server.security.SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>() + { + @Override + public Void run() + { + host.open(); + return null; + } + } + ); assertNotNull("Virtual host is not recovered", host); Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test"); |
