summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-10-28 19:37:46 +0000
committerAlex Rudyy <orudyy@apache.org>2014-10-28 19:37:46 +0000
commit3e89910f94daefaa565f7b21e4989e44e735b25b (patch)
tree48973d444377305331aa9f05d9d8e92e5adf432d
parentc1edd56bd54dbcb6cb9a1fb1cafe04549682faf0 (diff)
downloadqpid-python-3e89910f94daefaa565f7b21e4989e44e735b25b.tar.gz
QPID-5650: Set alternate exchange only when queue creation argument 'x-qpid-dlq-enabled' is set
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1634957 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java81
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java48
2 files changed, 61 insertions, 68 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index 458216a020..313e8d1d0c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.store;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -39,6 +38,7 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class VirtualHostStoreUpgraderAndRecoverer
@@ -350,14 +350,14 @@ public class VirtualHostStoreUpgraderAndRecoverer
private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase
{
- private static final String ARGUMENTS = "arguments";
- private static final String DLQ_ENABLED_ARGUMENT = "x-qpid-dlq-enabled";
private static final String ALTERNATE_EXCHANGE = "alternateExchange";
- private static final String VIRTUAL_HOST_DLQ_ENABLED = "queue.deadLetterQueueEnabled";
private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
private ConfiguredObjectRecord _virtualHostRecord;
+ private Map<UUID, String> _queuesMissingAlternateExchange = new HashMap<>();
+ private Map<String, ConfiguredObjectRecord> _exchanges = new HashMap<>();
+
public Upgrader_0_4_to_2_0()
{
super("modelVersion", "0.4", "2.0");
@@ -380,23 +380,30 @@ public class VirtualHostStoreUpgraderAndRecoverer
Map<String, Object> attributes = record.getAttributes();
String name = (String)attributes.get("name");
_missingAmqpExchanges.remove(name);
+ _exchanges.put(name, record);
}
- getUpdateMap().put(record.getId(), record);
+ else if("Queue".equals(record.getType()))
+ {
+ record = updateQueueRecordIfNecessary(record);
+ }
+ getNextUpgrader().configuredObject(record);
}
@Override
public void complete()
{
- boolean virtualHostDLQEnabled = Boolean.parseBoolean(String.valueOf(_virtualHostRecord.getAttributes().get(VIRTUAL_HOST_DLQ_ENABLED)));
- for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
+ for (UUID queueId : _queuesMissingAlternateExchange.keySet())
{
- Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
- ConfiguredObjectRecord record = entry.getValue();
- if ("Queue".equals(record.getType()))
+ ConfiguredObjectRecord record = getUpdateMap().get(queueId);
+ if (record != null)
{
- record = upgradeQueueRecordIfNecessary(record, virtualHostDLQEnabled);
+ String dleExchangeName = _queuesMissingAlternateExchange.get(queueId);
+ ConfiguredObjectRecord alternateExchange = _exchanges.get(dleExchangeName);
+ if (alternateExchange != null)
+ {
+ setAlternateExchangeAttribute(record, alternateExchange);
+ }
}
- getNextUpgrader().configuredObject(record);
}
for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
@@ -420,45 +427,51 @@ public class VirtualHostStoreUpgraderAndRecoverer
getNextUpgrader().complete();
}
- private ConfiguredObjectRecord upgradeQueueRecordIfNecessary(ConfiguredObjectRecord record, boolean _virtualHostDLQEnabled)
+ private ConfiguredObjectRecord updateQueueRecordIfNecessary(ConfiguredObjectRecord record)
{
- Map<String, Object> attributes = new LinkedHashMap<>(record.getAttributes());
- boolean queueArgumentDQLEnabledSet = false;
- boolean queueDLQEnabled = false;
-
- if (attributes.get(ARGUMENTS) instanceof Map)
- {
- Map<String,Object> arguments = (Map<String,Object>)attributes.get(ARGUMENTS);
- queueArgumentDQLEnabledSet = arguments.containsKey(DLQ_ENABLED_ARGUMENT);
- queueDLQEnabled = queueArgumentDQLEnabledSet ? Boolean.parseBoolean(String.valueOf(arguments.get(DLQ_ENABLED_ARGUMENT))) : false;
- }
-
- if( ((queueArgumentDQLEnabledSet && queueDLQEnabled) || (!queueArgumentDQLEnabledSet && _virtualHostDLQEnabled )) && attributes.get("alternateExchange") == null)
+ Map<String, Object> attributes = record.getAttributes();
+ boolean queueDLQEnabled = Boolean.parseBoolean(String.valueOf(attributes.get(AbstractVirtualHost.CREATE_DLQ_ON_CREATION)));
+ if(queueDLQEnabled && attributes.get(ALTERNATE_EXCHANGE) == null)
{
Object queueName = attributes.get("name");
-
if (queueName == null || "".equals(queueName))
{
throw new IllegalConfigurationException("Queue name is not found in queue configuration entry attributes: " + attributes);
}
String dleSuffix = System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
- ConfiguredObjectRecord alternateExchange = findConfiguredObjectRecord("Exchange", queueName + dleSuffix);
+ String dleExchangeName = queueName + dleSuffix;
- if (alternateExchange != null)
+ ConfiguredObjectRecord exchangeRecord = findConfiguredObjectRecordInUpdateMap("Exchange", dleExchangeName);
+ if (exchangeRecord == null)
{
- attributes.put(ALTERNATE_EXCHANGE, alternateExchange.getId());
- record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents());
- getUpdateMap().put(record.getId(), record);
+ // add record to update Map if it is not there
+ if (!getUpdateMap().containsKey(record.getId()))
+ {
+ getUpdateMap().put(record.getId(), record);
+ }
+ _queuesMissingAlternateExchange.put(record.getId(), dleExchangeName);
+ }
+ else
+ {
+ record = setAlternateExchangeAttribute(record, exchangeRecord);
}
}
return record;
}
- private ConfiguredObjectRecord findConfiguredObjectRecord(String type, String name)
+ private ConfiguredObjectRecord setAlternateExchangeAttribute(ConfiguredObjectRecord record, ConfiguredObjectRecord alternateExchange)
+ {
+ Map<String, Object> attributes = new LinkedHashMap<>(record.getAttributes());
+ attributes.put(ALTERNATE_EXCHANGE, alternateExchange.getId());
+ record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
+ return record;
+ }
+
+ private ConfiguredObjectRecord findConfiguredObjectRecordInUpdateMap(String type, String name)
{
- Collection<ConfiguredObjectRecord> records = getUpdatedRecords().values();
- for(ConfiguredObjectRecord record: records)
+ for(ConfiguredObjectRecord record: getUpdateMap().values())
{
if (type.equals(record.getType()) && name.equals(record.getAttributes().get("name")))
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
index d5123f455b..1b13bcbdc6 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -49,6 +50,8 @@ import org.apache.qpid.test.utils.QpidTestCase;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import javax.security.auth.Subject;
+
public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
{
private ConfiguredObjectRecord _hostRecord;
@@ -114,40 +117,17 @@ public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
upgraderAndRecoverer.perform(_durableConfigurationStore);
- VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost();
- host.open();
-
- assertNotNull("Virtual host is not recovered", host);
- Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test");
- assertNotNull("Queue is not recovered", recoveredQueue);
-
- Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, "test_DLQ");
- assertNotNull("DLQ queue is not recovered", recoveredDLQ);
-
- Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, "test_DLE");
- assertNotNull("DLE exchange is not recovered", recoveredDLE);
-
- assertEquals("Unexpected alternative exchange", recoveredDLE, recoveredQueue.getAlternateExchange());
- }
-
- public void testRecoverQueueWithDLQEnabledOnVirtualHost() throws Exception
- {
- _hostRecord.getAttributes().put(VirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED, "true");
-
- ConfiguredObjectRecord queue = mockQueue("test", null);
- ConfiguredObjectRecord dlq = mockQueue("test_DLQ", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "false"));
- ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout");
- ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle);
- ConfiguredObjectRecord directExchange = mock(ConfiguredObjectRecord.class);
- when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct", "test"));
- ConfiguredObjectRecord queueBinding = mockBinding("test", queue, directExchange);
- setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding);
-
- VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
- upgraderAndRecoverer.perform(_durableConfigurationStore);
-
- VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost();
- host.open();
+ final VirtualHost<?,?,?> host = _virtualHostNode.getVirtualHost();
+ Subject.doAs(org.apache.qpid.server.security.SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ host.open();
+ return null;
+ }
+ }
+ );
assertNotNull("Virtual host is not recovered", host);
Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test");