From ecc7f3fc1c56cde10e9960afd1e4fbd4bcb07abf Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Tue, 22 Apr 2014 12:32:23 +0000 Subject: QPID-5715: [Java Broker] Refactor VirtualHostStoreUpgraderAndRecoverer/BrokerStoreUpgrader to avoid duplicated code git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1589112 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/server/Broker.java | 4 +- .../configuration/startup/BrokerStoreUpgrader.java | 722 -------------------- .../configuration/startup/StoreUpgraderPhase.java | 49 -- .../startup/UpgraderPhaseFactory.java | 45 -- .../apache/qpid/server/model/SystemContext.java | 3 - .../qpid/server/model/SystemContextImpl.java | 135 ---- .../qpid/server/registry/ApplicationRegistry.java | 7 +- .../store/BrokerStoreUpgraderAndRecoverer.java | 571 ++++++++++++++++ .../apache/qpid/server/store/GenericRecoverer.java | 218 ++++++ .../qpid/server/store/GenericStoreUpgrader.java | 170 +++++ .../qpid/server/store/StoreUpgraderPhase.java | 58 ++ .../VirtualHostStoreUpgraderAndRecoverer.java | 741 ++++++--------------- .../configuration/startup/BrokerRecovererTest.java | 355 ---------- .../qpid/server/store/BrokerRecovererTest.java | 338 ++++++++++ 14 files changed, 1583 insertions(+), 1833 deletions(-) delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgraderPhase.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UpgraderPhaseFactory.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericStoreUpgrader.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreUpgraderPhase.java delete mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java create mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java index b2559f8668..3ccfc276fa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.Set; @@ -34,7 +35,6 @@ import javax.security.auth.Subject; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; - import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator; import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -152,6 +152,8 @@ public class Broker store = new ManagementModeStoreHandler(store, options); } + store.openConfigurationStore(systemContext, Collections.emptyMap()); + _applicationRegistry = new ApplicationRegistry(store,systemContext); try { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java deleted file mode 100644 index 1a942a2e4a..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java +++ /dev/null @@ -1,722 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.startup; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.BrokerModel; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.SystemContext; -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader; -import org.apache.qpid.server.store.NullUpgrader; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; -import org.apache.qpid.server.util.Action; - -public class BrokerStoreUpgrader -{ - private final SystemContext _systemContext; - private Map _upgraders = new HashMap(); - - - // Note: don't use externally defined constants in upgraders in case they change, the values here MUST stay the same - // no matter what changes are made to the code in the future - - public BrokerStoreUpgrader(SystemContext systemContext) - { - _systemContext = systemContext; - - register(new Upgrader_1_0_to_1_1()); - register(new Upgrader_1_1_to_1_2()); - register(new Upgrader_1_2_to_1_3()); - register(new Upgrader_1_3_to_1_4()); - } - - private void register(UpgraderPhaseFactory factory) - { - _upgraders.put(factory.getFromVersion(), factory); - } - - private final class Upgrader_1_0_to_1_1 extends UpgraderPhaseFactory - { - private Upgrader_1_0_to_1_1() - { - super("1.0", "1.1"); - } - - @Override - public StoreUpgraderPhase newInstance() - { - return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion()) - { - @Override - public void configuredObject(ConfiguredObjectRecord record) - { - if (record.getType().equals("Broker")) - { - record = upgradeRootRecord(record); - } - else if (record.getType().equals("VirtualHost") && record.getAttributes().containsKey("storeType")) - { - Map updatedAttributes = new HashMap(record.getAttributes()); - updatedAttributes.put("type", "STANDARD"); - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); - } - - getNextUpgrader().configuredObject(record); - } - - - @Override - public void complete() - { - getNextUpgrader().complete(); - } - }; - } - } - - private static final class Upgrader_1_1_to_1_2 extends UpgraderPhaseFactory - { - private Upgrader_1_1_to_1_2() - { - super("1.1", "1.2"); - } - - @Override - public StoreUpgraderPhase newInstance() - { - return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion()) - { - - @Override - public void configuredObject(ConfiguredObjectRecord record) - { - if (record.getType().equals("Broker")) - { - record = upgradeRootRecord(record); - } - - getNextUpgrader().configuredObject(record); - - } - - @Override - public void complete() - { - getNextUpgrader().complete(); - } - }; - } - } - - private static final class Upgrader_1_2_to_1_3 extends UpgraderPhaseFactory - { - private Upgrader_1_2_to_1_3() - { - super("1.2", "1.3"); - } - - @Override - public StoreUpgraderPhase newInstance() - { - return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion()) - { - - @Override - public void configuredObject(ConfiguredObjectRecord record) - { - if (record.getType().equals("TrustStore") && record.getAttributes().containsKey("type")) - { - Map updatedAttributes = new HashMap(record.getAttributes()); - updatedAttributes.put("trustStoreType", updatedAttributes.remove("type")); - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); - - } - else if (record.getType().equals("KeyStore") && record.getAttributes().containsKey("type")) - { - Map updatedAttributes = new HashMap(record.getAttributes()); - updatedAttributes.put("keyStoreType", updatedAttributes.remove("type")); - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); - - } - else if (record.getType().equals("Broker")) - { - record = upgradeRootRecord(record); - } - - getNextUpgrader().configuredObject(record); - - } - - @Override - public void complete() - { - getNextUpgrader().complete(); - } - }; - } - } - - private static final class Upgrader_1_3_to_1_4 extends UpgraderPhaseFactory - { - private Upgrader_1_3_to_1_4() - { - super("1.3", "1.4"); - } - - @Override - public StoreUpgraderPhase newInstance() - { - return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion()) - { - - @SuppressWarnings("serial") - private Map _vhostUpgraderMap = new HashMap() - {{ - put("BDB_HA", new BdbHaVirtualHostUpgrader()); - put("STANDARD", new StandardVirtualHostUpgrader()); - }}; - - @Override - public void configuredObject(ConfiguredObjectRecord record) - { - if (record.getType().equals("VirtualHost")) - { - Map attributes = record.getAttributes(); - if (attributes.containsKey("configPath")) - { - throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name") + " having XML configuration is not supported. Virtual host configuration file is " + attributes.get("configPath")); - } - - String type = (String) attributes.get("type"); - VirtualHostEntryUpgrader vhostUpgrader = _vhostUpgraderMap.get(type); - if (vhostUpgrader == null) - { - throw new IllegalConfigurationException("Don't know how to perform an upgrade from version for virtualhost type " + type); - } - record = vhostUpgrader.upgrade(record); - getUpdateMap().put(record.getId(), record); - } - else if (record.getType().equals("Plugin") && record.getAttributes().containsKey("pluginType")) - { - Map updatedAttributes = new HashMap(record.getAttributes()); - updatedAttributes.put("type", updatedAttributes.remove("pluginType")); - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); - - } - else if (record.getType().equals("Broker")) - { - record = upgradeRootRecord(record); - } - - getNextUpgrader().configuredObject(record); - - } - - @Override - public void complete() - { - getNextUpgrader().complete(); - } - }; - } - } - - private static interface VirtualHostEntryUpgrader - { - ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost); - } - - private static class StandardVirtualHostUpgrader implements VirtualHostEntryUpgrader - { - @SuppressWarnings("serial") - Map _messageStoreAttributeTransformers = new HashMap() - {{ - put("DERBY", new AttributesTransformer(). - addAttributeTransformer("storePath", copyAttribute()). - addAttributeTransformer("storeUnderfullSize", copyAttribute()). - addAttributeTransformer("storeOverfullSize", copyAttribute()). - addAttributeTransformer("storeType", mutateAttributeValue("DERBY"))); - put("MEMORY", new AttributesTransformer(). - addAttributeTransformer("storeType", mutateAttributeValue("Memory"))); - put("BDB", new AttributesTransformer(). - addAttributeTransformer("storePath", copyAttribute()). - addAttributeTransformer("storeUnderfullSize", copyAttribute()). - addAttributeTransformer("storeOverfullSize", copyAttribute()). - addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()). - addAttributeTransformer("storeType", mutateAttributeValue("BDB"))); - put("JDBC", new AttributesTransformer(). - addAttributeTransformer("storePath", mutateAttributeName("connectionURL")). - addAttributeTransformer("connectionURL", copyAttribute()). - addAttributeTransformer("connectionPool", copyAttribute()). - addAttributeTransformer("jdbcBigIntType", copyAttribute()). - addAttributeTransformer("jdbcBytesForBlob", copyAttribute()). - addAttributeTransformer("jdbcBlobType", copyAttribute()). - addAttributeTransformer("jdbcVarbinaryType", copyAttribute()). - addAttributeTransformer("partitionCount", copyAttribute()). - addAttributeTransformer("maxConnectionsPerPartition", copyAttribute()). - addAttributeTransformer("minConnectionsPerPartition", copyAttribute()). - addAttributeTransformer("storeType", mutateAttributeValue("JDBC"))); - }}; - - @SuppressWarnings("serial") - Map _configurationStoreAttributeTransformers = new HashMap() - {{ - put("DERBY", new AttributesTransformer(). - addAttributeTransformer("configStorePath", mutateAttributeName("storePath")). - addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("DERBY"))); - put("MEMORY", new AttributesTransformer(). - addAttributeTransformer("configStoreType", mutateAttributeValue("Memory"))); - put("JSON", new AttributesTransformer(). - addAttributeTransformer("configStorePath", mutateAttributeName("storePath")). - addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("JSON"))); - put("BDB", new AttributesTransformer(). - addAttributeTransformer("configStorePath", mutateAttributeName("storePath")). - addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()). - addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("BDB"))); - put("JDBC", new AttributesTransformer(). - addAttributeTransformer("configStorePath", mutateAttributeName("connectionURL")). - addAttributeTransformer("configConnectionURL", mutateAttributeName("connectionURL")). - addAttributeTransformer("connectionPool", copyAttribute()). - addAttributeTransformer("jdbcBigIntType", copyAttribute()). - addAttributeTransformer("jdbcBytesForBlob", copyAttribute()). - addAttributeTransformer("jdbcBlobType", copyAttribute()). - addAttributeTransformer("jdbcVarbinaryType", copyAttribute()). - addAttributeTransformer("partitionCount", copyAttribute()). - addAttributeTransformer("maxConnectionsPerPartition", copyAttribute()). - addAttributeTransformer("minConnectionsPerPartition", copyAttribute()). - addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("JDBC"))); - }}; - - @Override - public ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost) - { - Map attributes = vhost.getAttributes(); - Map newAttributes = new HashMap(attributes); - - String capitalisedStoreType = String.valueOf(attributes.get("storeType")).toUpperCase(); - AttributesTransformer vhAttrsToMessageStoreSettings = _messageStoreAttributeTransformers.get(capitalisedStoreType); - Map messageStoreSettings = null; - if (vhAttrsToMessageStoreSettings != null) - { - messageStoreSettings = vhAttrsToMessageStoreSettings.upgrade(attributes); - } - - if (attributes.containsKey("configStoreType")) - { - String capitaliseConfigStoreType = ((String) attributes.get("configStoreType")).toUpperCase(); - AttributesTransformer vhAttrsToConfigurationStoreSettings = _configurationStoreAttributeTransformers - .get(capitaliseConfigStoreType); - Map configurationStoreSettings = vhAttrsToConfigurationStoreSettings.upgrade(attributes); - newAttributes.keySet().removeAll(vhAttrsToConfigurationStoreSettings.getNamesToBeDeleted()); - newAttributes.put("configurationStoreSettings", configurationStoreSettings); - } - - if (vhAttrsToMessageStoreSettings != null) - { - newAttributes.keySet().removeAll(vhAttrsToMessageStoreSettings.getNamesToBeDeleted()); - newAttributes.put("messageStoreSettings", messageStoreSettings); - } - - return new ConfiguredObjectRecordImpl(vhost.getId(), vhost.getType(), newAttributes, vhost.getParents()); - } - } - - private static class BdbHaVirtualHostUpgrader implements VirtualHostEntryUpgrader - { - - private final AttributesTransformer haAttributesTransformer = new AttributesTransformer(). - addAttributeTransformer("storePath", copyAttribute()). - addAttributeTransformer("storeUnderfullSize", copyAttribute()). - addAttributeTransformer("storeOverfullSize", copyAttribute()). - addAttributeTransformer("haNodeName", copyAttribute()). - addAttributeTransformer("haGroupName", copyAttribute()). - addAttributeTransformer("haHelperAddress", copyAttribute()). - addAttributeTransformer("haCoalescingSync", copyAttribute()). - addAttributeTransformer("haNodeAddress", copyAttribute()). - addAttributeTransformer("haDurability", copyAttribute()). - addAttributeTransformer("haDesignatedPrimary", copyAttribute()). - addAttributeTransformer("haReplicationConfig", copyAttribute()). - addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()). - addAttributeTransformer("storeType", removeAttribute()); - - @Override - public ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost) - { - Map attributes = vhost.getAttributes(); - - Map messageStoreSettings = haAttributesTransformer.upgrade(attributes); - - Map newAttributes = new HashMap(attributes); - newAttributes.keySet().removeAll(haAttributesTransformer.getNamesToBeDeleted()); - newAttributes.put("messageStoreSettings", messageStoreSettings); - - return new ConfiguredObjectRecordImpl(vhost.getId(), vhost.getType(), newAttributes, vhost.getParents()); - } - } - - private static class AttributesTransformer - { - private final Map> _transformers = new HashMap>(); - private Set _namesToBeDeleted = new HashSet(); - - public AttributesTransformer addAttributeTransformer(String string, AttributeTransformer... attributeTransformers) - { - _transformers.put(string, Arrays.asList(attributeTransformers)); - return this; - } - - public Map upgrade(Map attributes) - { - Map settings = new HashMap(); - for (Map.Entry> entry : _transformers.entrySet()) - { - String attributeName = entry.getKey(); - if (attributes.containsKey(attributeName)) - { - Object attributeValue = attributes.get(attributeName); - MutableEntry newEntry = new MutableEntry(attributeName, attributeValue); - - List transformers = entry.getValue(); - for (AttributeTransformer attributeTransformer : transformers) - { - newEntry = attributeTransformer.transform(newEntry); - if (newEntry == null) - { - break; - } - } - if (newEntry != null) - { - settings.put(newEntry.getKey(), newEntry.getValue()); - } - - _namesToBeDeleted.add(attributeName); - } - } - return settings; - } - - public Set getNamesToBeDeleted() - { - return _namesToBeDeleted; - } - } - - private static AttributeTransformer copyAttribute() - { - return CopyAttribute.INSTANCE; - } - - private static AttributeTransformer removeAttribute() - { - return RemoveAttribute.INSTANCE; - } - - private static AttributeTransformer mutateAttributeValue(Object newValue) - { - return new MutateAttributeValue(newValue); - } - - private static AttributeTransformer mutateAttributeName(String newName) - { - return new MutateAttributeName(newName); - } - - private static interface AttributeTransformer - { - MutableEntry transform(MutableEntry entry); - } - - private static class CopyAttribute implements AttributeTransformer - { - private static final CopyAttribute INSTANCE = new CopyAttribute(); - - private CopyAttribute() - { - } - - @Override - public MutableEntry transform(MutableEntry entry) - { - return entry; - } - } - - private static class RemoveAttribute implements AttributeTransformer - { - private static final RemoveAttribute INSTANCE = new RemoveAttribute(); - - private RemoveAttribute() - { - } - - @Override - public MutableEntry transform(MutableEntry entry) - { - return null; - } - } - - private static class MutateAttributeName implements AttributeTransformer - { - private final String _newName; - - public MutateAttributeName(String newName) - { - _newName = newName; - } - - @Override - public MutableEntry transform(MutableEntry entry) - { - entry.setKey(_newName); - return entry; - } - } - - private static class MutateAttributeValue implements AttributeTransformer - { - private final Object _newValue; - - public MutateAttributeValue(Object newValue) - { - _newValue = newValue; - } - - @Override - public MutableEntry transform(MutableEntry entry) - { - entry.setValue(_newValue); - return entry; - } - } - - private static class MutableEntry - { - private String _key; - private Object _value; - - public MutableEntry(String key, Object value) - { - _key = key; - _value = value; - } - - public String getKey() - { - return _key; - } - - public void setKey(String key) - { - _key = key; - } - - public Object getValue() - { - return _value; - } - - public void setValue(Object value) - { - _value = value; - } - } - - public Broker upgrade(DurableConfigurationStore store) - { - final BrokerStoreRecoveryHandler recoveryHandler = new BrokerStoreRecoveryHandler(_systemContext, store, _upgraders); - store.openConfigurationStore(_systemContext, Collections.emptyMap()); - store.visitConfiguredObjectRecords(recoveryHandler); - - return recoveryHandler.getBroker(); - } - - - private static class BrokerStoreRecoveryHandler implements ConfiguredObjectRecordHandler - { - private static Logger LOGGER = Logger.getLogger(BrokerStoreRecoveryHandler.class); - - private DurableConfigurationStoreUpgrader _upgrader; - private DurableConfigurationStore _store; - private final Map _records = new HashMap(); - private final SystemContext _systemContext; - private Map _upgraders; - - private BrokerStoreRecoveryHandler(final SystemContext systemContext, DurableConfigurationStore store, Map upgraders) - { - _systemContext = systemContext; - _store = store; - _upgraders = upgraders; - } - - - @Override - public void begin() - { - } - - @Override - public boolean handle(final ConfiguredObjectRecord object) - { - _records.put(object.getId(), object); - return true; - } - - @Override - public void end() - { - String version = getCurrentVersion(); - - while(!BrokerModel.MODEL_VERSION.equals(version)) - { - LOGGER.debug("Adding broker store upgrader from model version: " + version); - final UpgraderPhaseFactory upgraderPhaseFactory = _upgraders.get(version); - StoreUpgraderPhase upgrader = upgraderPhaseFactory.newInstance(); - if(_upgrader == null) - { - _upgrader = upgrader; - } - else - { - _upgrader.setNextUpgrader(upgrader); - } - version = upgraderPhaseFactory.getToVersion(); - } - - if(_upgrader == null) - { - _upgrader = new NullUpgrader(); - } - else - { - _upgrader.setNextUpgrader(new NullUpgrader()); - } - - for(ConfiguredObjectRecord record : _records.values()) - { - _upgrader.configuredObject(record); - } - - Map deletedRecords = _upgrader.getDeletedRecords(); - Map updatedRecords = _upgrader.getUpdatedRecords(); - - LOGGER.debug("Broker store upgrade: " + deletedRecords.size() + " records deleted"); - LOGGER.debug("Broker store upgrade: " + updatedRecords.size() + " records updated"); - LOGGER.debug("Broker store upgrade: " + _records.size() + " total records"); - - _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()])); - _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()])); - - - - - _records.keySet().removeAll(deletedRecords.keySet()); - _records.putAll(updatedRecords); - - _systemContext.resolveObjects(_records.values().toArray(new ConfiguredObjectRecord[_records.size()])); - - final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(_store); - applyRecursively(_systemContext.getBroker(), - new Action>() - { - @Override - public void performAction(final ConfiguredObject object) - { - object.addChangeListener(configChangeListener); - } - - - }); - - } - - private void applyRecursively(final ConfiguredObject object, final Action> action) - { - applyRecursively(object, action, new HashSet>()); - } - - private void applyRecursively(final ConfiguredObject object, - final Action> action, - final HashSet> visited) - { - if(!visited.contains(object)) - { - visited.add(object); - action.performAction(object); - for(Class childClass : object.getModel().getChildTypes(object.getCategoryClass())) - { - Collection children = object.getChildren(childClass); - if(children != null) - { - for(ConfiguredObject child : children) - { - applyRecursively(child, action, visited); - } - } - } - } - } - - private String getCurrentVersion() - { - for(ConfiguredObjectRecord record : _records.values()) - { - if(record.getType().equals("Broker")) - { - String version = (String) record.getAttributes().get(Broker.MODEL_VERSION); - if(version == null) - { - version = "1.0"; - } - return version; - } - } - return BrokerModel.MODEL_VERSION; - } - - public Broker getBroker() - { - return _systemContext.getBroker(); - } - } - - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgraderPhase.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgraderPhase.java deleted file mode 100644 index 5762a8f8e6..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgraderPhase.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.startup; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; -import org.apache.qpid.server.store.NonNullUpgrader; - -public abstract class StoreUpgraderPhase extends NonNullUpgrader -{ - private final String _toVersion; - private final String _versionAttributeName; - - protected StoreUpgraderPhase(String versionAttributeName, String toVersion) - { - _toVersion = toVersion; - _versionAttributeName = versionAttributeName; - } - - protected ConfiguredObjectRecord upgradeRootRecord(ConfiguredObjectRecord record) - { - Map updatedAttributes = new HashMap(record.getAttributes()); - updatedAttributes.put(_versionAttributeName, _toVersion); - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); - return record; - } -} \ No newline at end of file diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UpgraderPhaseFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UpgraderPhaseFactory.java deleted file mode 100644 index 26686c67cd..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UpgraderPhaseFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.startup; - -public abstract class UpgraderPhaseFactory -{ - private final String _toVersion; - private final String _fromVersion; - - protected UpgraderPhaseFactory(String fromVersion, String toVersion) - { - _toVersion = toVersion; - _fromVersion = fromVersion; - } - - public String getToVersion() - { - return _toVersion; - } - - public String getFromVersion() - { - return _fromVersion; - } - - public abstract StoreUpgraderPhase newInstance(); -} \ No newline at end of file diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java index 03032ad1ed..04634ea2a6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java @@ -23,13 +23,10 @@ package org.apache.qpid.server.model; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; -import org.apache.qpid.server.store.ConfiguredObjectRecord; @ManagedObject (creatable = false) public interface SystemContext> extends ConfiguredObject { - void resolveObjects(ConfiguredObjectRecord... records); - EventLogger getEventLogger(); BrokerOptions getBrokerOptions(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java index f0f5e96081..d9e9950972 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java @@ -83,141 +83,6 @@ public class SystemContextImpl extends AbstractConfiguredObject> resolvedObjects = new HashMap>(); - resolvedObjects.put(getId(), SystemContextImpl.this); - - Collection recordsWithUnresolvedParents = - new ArrayList(Arrays.asList(records)); - Collection> recordsWithUnresolvedDependencies = - new ArrayList>(); - - boolean updatesMade; - - do - { - updatesMade = false; - Iterator iter = recordsWithUnresolvedParents.iterator(); - while (iter.hasNext()) - { - ConfiguredObjectRecord record = iter.next(); - Collection> parents = new ArrayList>(); - boolean foundParents = true; - for (ConfiguredObjectRecord parent : record.getParents().values()) - { - if (!resolvedObjects.containsKey(parent.getId())) - { - foundParents = false; - break; - } - else - { - parents.add(resolvedObjects.get(parent.getId())); - } - } - if (foundParents) - { - iter.remove(); - UnresolvedConfiguredObject recovered = - factory.recover(record, parents.toArray(new ConfiguredObject[parents.size()])); - Collection> dependencies = - recovered.getUnresolvedDependencies(); - if (dependencies.isEmpty()) - { - updatesMade = true; - ConfiguredObject resolved = recovered.resolve(); - resolvedObjects.put(resolved.getId(), resolved); - } - else - { - recordsWithUnresolvedDependencies.add(recovered); - } - } - - } - - Iterator> unresolvedIter = - recordsWithUnresolvedDependencies.iterator(); - - while (unresolvedIter.hasNext()) - { - UnresolvedConfiguredObject unresolvedObject = unresolvedIter.next(); - Collection> dependencies = - new ArrayList>(unresolvedObject.getUnresolvedDependencies()); - - for (ConfiguredObjectDependency dependency : dependencies) - { - if (dependency instanceof ConfiguredObjectIdDependency) - { - UUID id = ((ConfiguredObjectIdDependency) dependency).getId(); - if (resolvedObjects.containsKey(id)) - { - dependency.resolve(resolvedObjects.get(id)); - } - } - else if (dependency instanceof ConfiguredObjectNameDependency) - { - ConfiguredObject dependentObject = null; - for (ConfiguredObject parent : unresolvedObject.getParents()) - { - dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), - ((ConfiguredObjectNameDependency) dependency) - .getName() - ); - if (dependentObject != null) - { - break; - } - } - if (dependentObject != null) - { - dependency.resolve(dependentObject); - } - } - else - { - throw new ServerScopedRuntimeException("Unknown dependency type " - + dependency.getClass() - .getSimpleName()); - } - } - if (unresolvedObject.getUnresolvedDependencies().isEmpty()) - { - updatesMade = true; - unresolvedIter.remove(); - ConfiguredObject resolved = unresolvedObject.resolve(); - resolvedObjects.put(resolved.getId(), resolved); - } - } - - } while (updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() - && recordsWithUnresolvedParents.isEmpty())); - - if (!recordsWithUnresolvedDependencies.isEmpty()) - { - throw new IllegalArgumentException("Cannot resolve some objects: " - + recordsWithUnresolvedDependencies); - } - if (!recordsWithUnresolvedParents.isEmpty()) - { - throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" - + recordsWithUnresolvedParents); - } - } - }); - } - @Override protected boolean setState(final State currentState, final State desiredState) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 6ddf5f83dd..d3e80ecee7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -21,11 +21,9 @@ package org.apache.qpid.server.registry; import org.apache.log4j.Logger; - import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.configuration.startup.BrokerStoreUpgrader; import org.apache.qpid.server.logging.CompositeStartupMessageLogger; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.Log4jMessageLogger; @@ -35,6 +33,7 @@ import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.SystemContext; +import org.apache.qpid.server.store.BrokerStoreUpgraderAndRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.util.SystemUtils; @@ -77,8 +76,8 @@ public class ApplicationRegistry implements IApplicationRegistry logStartupMessages(startupLogger); - BrokerStoreUpgrader upgrader = new BrokerStoreUpgrader(_systemContext); - _broker = upgrader.upgrade(_store); + BrokerStoreUpgraderAndRecoverer upgrader = new BrokerStoreUpgraderAndRecoverer(_systemContext); + _broker = upgrader.perform(_store); _broker.setEventLogger(startupLogger); _broker.open(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java new file mode 100644 index 0000000000..7e49e0f6d3 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java @@ -0,0 +1,571 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.SystemContext; +import org.apache.qpid.server.util.Action; + +public class BrokerStoreUpgraderAndRecoverer +{ + private final SystemContext _systemContext; + private final Map _upgraders = new HashMap(); + + // Note: don't use externally defined constants in upgraders in case they change, the values here MUST stay the same + // no matter what changes are made to the code in the future + public BrokerStoreUpgraderAndRecoverer(SystemContext systemContext) + { + _systemContext = systemContext; + + register(new Upgrader_1_0_to_1_1()); + register(new Upgrader_1_1_to_1_2()); + register(new Upgrader_1_2_to_1_3()); + register(new Upgrader_1_3_to_1_4()); + } + + private void register(StoreUpgraderPhase upgrader) + { + _upgraders.put(upgrader.getFromVersion(), upgrader); + } + + private static final class Upgrader_1_0_to_1_1 extends StoreUpgraderPhase + { + private Upgrader_1_0_to_1_1() + { + super("modelVersion", "1.0", "1.1"); + } + + @Override + public void configuredObject(ConfiguredObjectRecord record) + { + if (record.getType().equals("Broker")) + { + record = upgradeRootRecord(record); + } + else if (record.getType().equals("VirtualHost") && record.getAttributes().containsKey("storeType")) + { + Map updatedAttributes = new HashMap(record.getAttributes()); + updatedAttributes.put("type", "STANDARD"); + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + } + + getNextUpgrader().configuredObject(record); + } + + @Override + public void complete() + { + getNextUpgrader().complete(); + } + + } + + private static final class Upgrader_1_1_to_1_2 extends StoreUpgraderPhase + { + private Upgrader_1_1_to_1_2() + { + super("modelVersion", "1.1", "1.2"); + } + + @Override + public void configuredObject(ConfiguredObjectRecord record) + { + if (record.getType().equals("Broker")) + { + record = upgradeRootRecord(record); + } + + getNextUpgrader().configuredObject(record); + + } + + @Override + public void complete() + { + getNextUpgrader().complete(); + } + + } + + private static final class Upgrader_1_2_to_1_3 extends StoreUpgraderPhase + { + private Upgrader_1_2_to_1_3() + { + super("modelVersion", "1.2", "1.3"); + } + + @Override + public void configuredObject(ConfiguredObjectRecord record) + { + if (record.getType().equals("TrustStore") && record.getAttributes().containsKey("type")) + { + Map updatedAttributes = new HashMap(record.getAttributes()); + updatedAttributes.put("trustStoreType", updatedAttributes.remove("type")); + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + + } + else if (record.getType().equals("KeyStore") && record.getAttributes().containsKey("type")) + { + Map updatedAttributes = new HashMap(record.getAttributes()); + updatedAttributes.put("keyStoreType", updatedAttributes.remove("type")); + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + + } + else if (record.getType().equals("Broker")) + { + record = upgradeRootRecord(record); + } + + getNextUpgrader().configuredObject(record); + + } + + @Override + public void complete() + { + getNextUpgrader().complete(); + } + + } + + private static final class Upgrader_1_3_to_1_4 extends StoreUpgraderPhase + { + private Upgrader_1_3_to_1_4() + { + super("modelVersion", "1.3", "1.4"); + } + + @SuppressWarnings("serial") + private Map _vhostUpgraderMap = new HashMap() + {{ + put("BDB_HA", new BdbHaVirtualHostUpgrader()); + put("STANDARD", new StandardVirtualHostUpgrader()); + }}; + + @Override + public void configuredObject(ConfiguredObjectRecord record) + { + if (record.getType().equals("VirtualHost")) + { + Map attributes = record.getAttributes(); + if (attributes.containsKey("configPath")) + { + throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name") + " having XML configuration is not supported. Virtual host configuration file is " + attributes.get("configPath")); + } + + String type = (String) attributes.get("type"); + VirtualHostEntryUpgrader vhostUpgrader = _vhostUpgraderMap.get(type); + if (vhostUpgrader == null) + { + throw new IllegalConfigurationException("Don't know how to perform an upgrade from version for virtualhost type " + type); + } + record = vhostUpgrader.upgrade(record); + getUpdateMap().put(record.getId(), record); + } + else if (record.getType().equals("Plugin") && record.getAttributes().containsKey("pluginType")) + { + Map updatedAttributes = new HashMap(record.getAttributes()); + updatedAttributes.put("type", updatedAttributes.remove("pluginType")); + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + + } + else if (record.getType().equals("Broker")) + { + record = upgradeRootRecord(record); + } + + getNextUpgrader().configuredObject(record); + + } + + @Override + public void complete() + { + getNextUpgrader().complete(); + } + + } + + private static interface VirtualHostEntryUpgrader + { + ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost); + } + + private static class StandardVirtualHostUpgrader implements VirtualHostEntryUpgrader + { + @SuppressWarnings("serial") + Map _messageStoreAttributeTransformers = new HashMap() + {{ + put("DERBY", new AttributesTransformer(). + addAttributeTransformer("storePath", copyAttribute()). + addAttributeTransformer("storeUnderfullSize", copyAttribute()). + addAttributeTransformer("storeOverfullSize", copyAttribute()). + addAttributeTransformer("storeType", mutateAttributeValue("DERBY"))); + put("MEMORY", new AttributesTransformer(). + addAttributeTransformer("storeType", mutateAttributeValue("Memory"))); + put("BDB", new AttributesTransformer(). + addAttributeTransformer("storePath", copyAttribute()). + addAttributeTransformer("storeUnderfullSize", copyAttribute()). + addAttributeTransformer("storeOverfullSize", copyAttribute()). + addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()). + addAttributeTransformer("storeType", mutateAttributeValue("BDB"))); + put("JDBC", new AttributesTransformer(). + addAttributeTransformer("storePath", mutateAttributeName("connectionURL")). + addAttributeTransformer("connectionURL", copyAttribute()). + addAttributeTransformer("connectionPool", copyAttribute()). + addAttributeTransformer("jdbcBigIntType", copyAttribute()). + addAttributeTransformer("jdbcBytesForBlob", copyAttribute()). + addAttributeTransformer("jdbcBlobType", copyAttribute()). + addAttributeTransformer("jdbcVarbinaryType", copyAttribute()). + addAttributeTransformer("partitionCount", copyAttribute()). + addAttributeTransformer("maxConnectionsPerPartition", copyAttribute()). + addAttributeTransformer("minConnectionsPerPartition", copyAttribute()). + addAttributeTransformer("storeType", mutateAttributeValue("JDBC"))); + }}; + + @SuppressWarnings("serial") + Map _configurationStoreAttributeTransformers = new HashMap() + {{ + put("DERBY", new AttributesTransformer(). + addAttributeTransformer("configStorePath", mutateAttributeName("storePath")). + addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("DERBY"))); + put("MEMORY", new AttributesTransformer(). + addAttributeTransformer("configStoreType", mutateAttributeValue("Memory"))); + put("JSON", new AttributesTransformer(). + addAttributeTransformer("configStorePath", mutateAttributeName("storePath")). + addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("JSON"))); + put("BDB", new AttributesTransformer(). + addAttributeTransformer("configStorePath", mutateAttributeName("storePath")). + addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()). + addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("BDB"))); + put("JDBC", new AttributesTransformer(). + addAttributeTransformer("configStorePath", mutateAttributeName("connectionURL")). + addAttributeTransformer("configConnectionURL", mutateAttributeName("connectionURL")). + addAttributeTransformer("connectionPool", copyAttribute()). + addAttributeTransformer("jdbcBigIntType", copyAttribute()). + addAttributeTransformer("jdbcBytesForBlob", copyAttribute()). + addAttributeTransformer("jdbcBlobType", copyAttribute()). + addAttributeTransformer("jdbcVarbinaryType", copyAttribute()). + addAttributeTransformer("partitionCount", copyAttribute()). + addAttributeTransformer("maxConnectionsPerPartition", copyAttribute()). + addAttributeTransformer("minConnectionsPerPartition", copyAttribute()). + addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("JDBC"))); + }}; + + @Override + public ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost) + { + Map attributes = vhost.getAttributes(); + Map newAttributes = new HashMap(attributes); + + String capitalisedStoreType = String.valueOf(attributes.get("storeType")).toUpperCase(); + AttributesTransformer vhAttrsToMessageStoreSettings = _messageStoreAttributeTransformers.get(capitalisedStoreType); + Map messageStoreSettings = null; + if (vhAttrsToMessageStoreSettings != null) + { + messageStoreSettings = vhAttrsToMessageStoreSettings.upgrade(attributes); + } + + if (attributes.containsKey("configStoreType")) + { + String capitaliseConfigStoreType = ((String) attributes.get("configStoreType")).toUpperCase(); + AttributesTransformer vhAttrsToConfigurationStoreSettings = _configurationStoreAttributeTransformers + .get(capitaliseConfigStoreType); + Map configurationStoreSettings = vhAttrsToConfigurationStoreSettings.upgrade(attributes); + newAttributes.keySet().removeAll(vhAttrsToConfigurationStoreSettings.getNamesToBeDeleted()); + newAttributes.put("configurationStoreSettings", configurationStoreSettings); + } + + if (vhAttrsToMessageStoreSettings != null) + { + newAttributes.keySet().removeAll(vhAttrsToMessageStoreSettings.getNamesToBeDeleted()); + newAttributes.put("messageStoreSettings", messageStoreSettings); + } + + return new ConfiguredObjectRecordImpl(vhost.getId(), vhost.getType(), newAttributes, vhost.getParents()); + } + } + + private static class BdbHaVirtualHostUpgrader implements VirtualHostEntryUpgrader + { + + private final AttributesTransformer haAttributesTransformer = new AttributesTransformer(). + addAttributeTransformer("storePath", copyAttribute()). + addAttributeTransformer("storeUnderfullSize", copyAttribute()). + addAttributeTransformer("storeOverfullSize", copyAttribute()). + addAttributeTransformer("haNodeName", copyAttribute()). + addAttributeTransformer("haGroupName", copyAttribute()). + addAttributeTransformer("haHelperAddress", copyAttribute()). + addAttributeTransformer("haCoalescingSync", copyAttribute()). + addAttributeTransformer("haNodeAddress", copyAttribute()). + addAttributeTransformer("haDurability", copyAttribute()). + addAttributeTransformer("haDesignatedPrimary", copyAttribute()). + addAttributeTransformer("haReplicationConfig", copyAttribute()). + addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()). + addAttributeTransformer("storeType", removeAttribute()); + + @Override + public ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost) + { + Map attributes = vhost.getAttributes(); + + Map messageStoreSettings = haAttributesTransformer.upgrade(attributes); + + Map newAttributes = new HashMap(attributes); + newAttributes.keySet().removeAll(haAttributesTransformer.getNamesToBeDeleted()); + newAttributes.put("messageStoreSettings", messageStoreSettings); + + return new ConfiguredObjectRecordImpl(vhost.getId(), vhost.getType(), newAttributes, vhost.getParents()); + } + } + + private static class AttributesTransformer + { + private final Map> _transformers = new HashMap>(); + private Set _namesToBeDeleted = new HashSet(); + + public AttributesTransformer addAttributeTransformer(String string, AttributeTransformer... attributeTransformers) + { + _transformers.put(string, Arrays.asList(attributeTransformers)); + return this; + } + + public Map upgrade(Map attributes) + { + Map settings = new HashMap(); + for (Map.Entry> entry : _transformers.entrySet()) + { + String attributeName = entry.getKey(); + if (attributes.containsKey(attributeName)) + { + Object attributeValue = attributes.get(attributeName); + MutableEntry newEntry = new MutableEntry(attributeName, attributeValue); + + List transformers = entry.getValue(); + for (AttributeTransformer attributeTransformer : transformers) + { + newEntry = attributeTransformer.transform(newEntry); + if (newEntry == null) + { + break; + } + } + if (newEntry != null) + { + settings.put(newEntry.getKey(), newEntry.getValue()); + } + + _namesToBeDeleted.add(attributeName); + } + } + return settings; + } + + public Set getNamesToBeDeleted() + { + return _namesToBeDeleted; + } + } + + private static AttributeTransformer copyAttribute() + { + return CopyAttribute.INSTANCE; + } + + private static AttributeTransformer removeAttribute() + { + return RemoveAttribute.INSTANCE; + } + + private static AttributeTransformer mutateAttributeValue(Object newValue) + { + return new MutateAttributeValue(newValue); + } + + private static AttributeTransformer mutateAttributeName(String newName) + { + return new MutateAttributeName(newName); + } + + private static interface AttributeTransformer + { + MutableEntry transform(MutableEntry entry); + } + + private static class CopyAttribute implements AttributeTransformer + { + private static final CopyAttribute INSTANCE = new CopyAttribute(); + + private CopyAttribute() + { + } + + @Override + public MutableEntry transform(MutableEntry entry) + { + return entry; + } + } + + private static class RemoveAttribute implements AttributeTransformer + { + private static final RemoveAttribute INSTANCE = new RemoveAttribute(); + + private RemoveAttribute() + { + } + + @Override + public MutableEntry transform(MutableEntry entry) + { + return null; + } + } + + private static class MutateAttributeName implements AttributeTransformer + { + private final String _newName; + + public MutateAttributeName(String newName) + { + _newName = newName; + } + + @Override + public MutableEntry transform(MutableEntry entry) + { + entry.setKey(_newName); + return entry; + } + } + + private static class MutateAttributeValue implements AttributeTransformer + { + private final Object _newValue; + + public MutateAttributeValue(Object newValue) + { + _newValue = newValue; + } + + @Override + public MutableEntry transform(MutableEntry entry) + { + entry.setValue(_newValue); + return entry; + } + } + + private static class MutableEntry + { + private String _key; + private Object _value; + + public MutableEntry(String key, Object value) + { + _key = key; + _value = value; + } + + public String getKey() + { + return _key; + } + + public void setKey(String key) + { + _key = key; + } + + public Object getValue() + { + return _value; + } + + public void setValue(Object value) + { + _value = value; + } + } + + public Broker perform(final DurableConfigurationStore store) + { + final String brokerCategory = Broker.class.getSimpleName(); + final GenericStoreUpgrader upgrader = new GenericStoreUpgrader(brokerCategory, Broker.MODEL_VERSION, store, _upgraders); + upgrader.upgrade(); + + new GenericRecoverer(_systemContext, brokerCategory).recover(upgrader.getRecords()); + + final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store); + applyRecursively(_systemContext.getBroker(), new Action>() + { + @Override + public void performAction(final ConfiguredObject object) + { + object.addChangeListener(configChangeListener); + } + }); + + return _systemContext.getBroker(); + } + + private void applyRecursively(final ConfiguredObject object, final Action> action) + { + applyRecursively(object, action, new HashSet>()); + } + + private void applyRecursively(final ConfiguredObject object, + final Action> action, + final HashSet> visited) + { + if(!visited.contains(object)) + { + visited.add(object); + action.performAction(object); + for(Class childClass : object.getModel().getChildTypes(object.getCategoryClass())) + { + Collection children = object.getChildren(childClass); + if(children != null) + { + for(ConfiguredObject child : children) + { + applyRecursively(child, action, visited); + } + } + } + } + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java new file mode 100644 index 0000000000..01be6cf556 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java @@ -0,0 +1,218 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.util.ServerScopedRuntimeException; + +public class GenericRecoverer +{ + private static final Logger LOGGER = Logger.getLogger(GenericRecoverer.class); + + private final ConfiguredObject _parentOfRoot; + private final String _rootCategory; + + public GenericRecoverer(ConfiguredObject parentOfRoot, String rootCategory) + { + _parentOfRoot = parentOfRoot; + _rootCategory = rootCategory; + } + + public void recover(final List records) + { + _parentOfRoot.getTaskExecutor().run(new TaskExecutor.VoidTask() + { + @Override + public void execute() + { + performRecover(records); + } + + @Override + public String toString() + { + return _rootCategory + " recovery"; + } + }); + + } + + private void performRecover(List records) + { + ConfiguredObjectRecord rootRecord = null; + for (ConfiguredObjectRecord record : records) + { + if (_rootCategory.equals(record.getType())) + { + rootRecord = record; + break; + } + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Root record " + rootRecord); + } + + if (rootRecord != null) + { + + if (rootRecord.getParents() == null || rootRecord.getParents().isEmpty()) + { + records = new ArrayList(records); + + String parentOfRootCategory = _parentOfRoot.getCategoryClass().getSimpleName(); + ConfiguredObjectRecord parentRecord = new ConfiguredObjectRecordImpl(_parentOfRoot.getId(), parentOfRootCategory, Collections.emptyMap()); + Map rootParents = Collections.singletonMap(parentOfRootCategory, parentRecord); + records.remove(rootRecord); + records.add(new ConfiguredObjectRecordImpl(rootRecord.getId(), _rootCategory, rootRecord.getAttributes(), rootParents)); + } + + resolveObjects(_parentOfRoot, records); + } + } + + private void resolveObjects(ConfiguredObject parentObject, List records) + { + ConfiguredObjectFactory factory = parentObject.getObjectFactory(); + Map> resolvedObjects = new HashMap>(); + resolvedObjects.put(parentObject.getId(), parentObject); + + Collection recordsWithUnresolvedParents = new ArrayList(records); + Collection> recordsWithUnresolvedDependencies = + new ArrayList>(); + + boolean updatesMade; + + do + { + updatesMade = false; + Iterator iter = recordsWithUnresolvedParents.iterator(); + while (iter.hasNext()) + { + ConfiguredObjectRecord record = iter.next(); + Collection> parents = new ArrayList>(); + boolean foundParents = true; + for (ConfiguredObjectRecord parent : record.getParents().values()) + { + if (!resolvedObjects.containsKey(parent.getId())) + { + foundParents = false; + break; + } + else + { + parents.add(resolvedObjects.get(parent.getId())); + } + } + if (foundParents) + { + iter.remove(); + ConfiguredObject[] parentArray = parents.toArray(new ConfiguredObject[parents.size()]); + UnresolvedConfiguredObject recovered = factory.recover(record, parentArray); + Collection> dependencies = recovered.getUnresolvedDependencies(); + if (dependencies.isEmpty()) + { + updatesMade = true; + ConfiguredObject resolved = recovered.resolve(); + resolvedObjects.put(resolved.getId(), resolved); + } + else + { + recordsWithUnresolvedDependencies.add(recovered); + } + } + + } + + Iterator> unresolvedIter = recordsWithUnresolvedDependencies.iterator(); + + while(unresolvedIter.hasNext()) + { + UnresolvedConfiguredObject unresolvedObject = unresolvedIter.next(); + Collection> dependencies = + new ArrayList>(unresolvedObject.getUnresolvedDependencies()); + + for(ConfiguredObjectDependency dependency : dependencies) + { + if(dependency instanceof ConfiguredObjectIdDependency) + { + UUID id = ((ConfiguredObjectIdDependency)dependency).getId(); + if(resolvedObjects.containsKey(id)) + { + dependency.resolve(resolvedObjects.get(id)); + } + } + else if(dependency instanceof ConfiguredObjectNameDependency) + { + ConfiguredObject dependentObject = null; + for(ConfiguredObject parent : unresolvedObject.getParents()) + { + dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), ((ConfiguredObjectNameDependency)dependency).getName()); + if(dependentObject != null) + { + break; + } + } + if(dependentObject != null) + { + dependency.resolve(dependentObject); + } + } + else + { + throw new ServerScopedRuntimeException("Unknown dependency type " + dependency.getClass().getSimpleName()); + } + } + if(unresolvedObject.getUnresolvedDependencies().isEmpty()) + { + updatesMade = true; + unresolvedIter.remove(); + ConfiguredObject resolved = unresolvedObject.resolve(); + resolvedObjects.put(resolved.getId(), resolved); + } + } + + } while(updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() && recordsWithUnresolvedParents.isEmpty())); + + if(!recordsWithUnresolvedDependencies.isEmpty()) + { + throw new IllegalArgumentException("Cannot resolve some objects: " + recordsWithUnresolvedDependencies); + } + if(!recordsWithUnresolvedParents.isEmpty()) + { + throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + recordsWithUnresolvedParents); + } + } + +} \ No newline at end of file diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericStoreUpgrader.java new file mode 100644 index 0000000000..26b60f765f --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericStoreUpgrader.java @@ -0,0 +1,170 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; + +public class GenericStoreUpgrader +{ + private static final Logger LOGGER = Logger.getLogger(GenericStoreUpgrader.class); + + private final Map _records = new HashMap(); + private final Map _upgraders; + private final DurableConfigurationStore _store; + private final String _rootCategory; + private final String _modelVersionAttributeName; + + public GenericStoreUpgrader(String rootCategory, String rootModelVersionAttributeName, DurableConfigurationStore configurationStore, Map upgraders) + { + super(); + _upgraders = upgraders; + _store = configurationStore; + _rootCategory = rootCategory; + _modelVersionAttributeName = rootModelVersionAttributeName; + } + + + public List getRecords() + { + return new ArrayList(_records.values()); + } + + public void upgrade() + { + ConfiguredObjectRecordHandler handler = new ConfiguredObjectRecordHandler() + { + @Override + public void begin() + { + } + + @Override + public boolean handle(final ConfiguredObjectRecord record) + { + _records.put(record.getId(), record); + return true; + } + + @Override + public void end() + { + performUpgrade(); + } + }; + + _store.visitConfiguredObjectRecords(handler); + } + + private void performUpgrade() + { + String version = getCurrentVersion(); + + if (LOGGER.isInfoEnabled()) + { + LOGGER.info(_rootCategory + " store has model version " + version + ". Number of record(s) " + _records.size()); + } + + DurableConfigurationStoreUpgrader upgrader = buildUpgraderChain(version); + + for(ConfiguredObjectRecord record : _records.values()) + { + upgrader.configuredObject(record); + } + + upgrader.complete(); + + Map deletedRecords = upgrader.getDeletedRecords(); + Map updatedRecords = upgrader.getUpdatedRecords(); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(_rootCategory + " store upgrade is about to complete. " + _records.size() + " total record(s)." + + " Records to update " + updatedRecords.size() + + " Records to delete " + deletedRecords.size()); + } + + _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()])); + _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()])); + + _records.keySet().removeAll(deletedRecords.keySet()); + _records.putAll(updatedRecords); + } + + private DurableConfigurationStoreUpgrader buildUpgraderChain(String version) + { + DurableConfigurationStoreUpgrader head = null; + while(!BrokerModel.MODEL_VERSION.equals(version)) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Adding " + _rootCategory + " store upgrader from model version: " + version); + } + + StoreUpgraderPhase upgrader = _upgraders.get(version); + if (upgrader == null) + { + throw new IllegalConfigurationException("No phase upgrader for version " + version); + } + + if(head == null) + { + head = upgrader; + } + else + { + head.setNextUpgrader(upgrader); + } + version = upgrader.getToVersion(); + } + + if(head == null) + { + head = new NullUpgrader(); + } + else + { + head.setNextUpgrader(new NullUpgrader()); + } + + return head; + } + + private String getCurrentVersion() + { + for(ConfiguredObjectRecord record : _records.values()) + { + if(_rootCategory.equals(record.getType())) + { + return (String) record.getAttributes().get(_modelVersionAttributeName); + } + } + return BrokerModel.MODEL_VERSION; + } +} \ No newline at end of file diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreUpgraderPhase.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreUpgraderPhase.java new file mode 100644 index 0000000000..b40dc73186 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreUpgraderPhase.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.HashMap; +import java.util.Map; + +public abstract class StoreUpgraderPhase extends NonNullUpgrader +{ + private final String _fromVersion; + private final String _toVersion; + private final String _versionAttributeName; + + public StoreUpgraderPhase(String versionAttributeName, String fromVersion, String toVersion) + { + _toVersion = toVersion; + _fromVersion = fromVersion; + _versionAttributeName = versionAttributeName; + } + + protected ConfiguredObjectRecord upgradeRootRecord(ConfiguredObjectRecord record) + { + Map updatedAttributes = new HashMap(record.getAttributes()); + updatedAttributes.put(_versionAttributeName, _toVersion); + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + return record; + } + + public String getFromVersion() + { + return _fromVersion; + } + + public String getToVersion() + { + return _toVersion; + } + +} \ No newline at end of file diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java index 226371a065..e1cffeb942 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.server.store; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -31,13 +28,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.UUID; -import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.startup.StoreUpgraderPhase; -import org.apache.qpid.server.configuration.startup.UpgraderPhaseFactory; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.BrokerModel; -import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; @@ -45,14 +37,11 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; -import org.apache.qpid.server.util.ServerScopedRuntimeException; public class VirtualHostStoreUpgraderAndRecoverer { - private final ConfiguredObjectFactory _objectFactory; private final VirtualHostNode _virtualHostNode; - private Map _upgraders = new HashMap(); + private Map _upgraders = new HashMap(); @SuppressWarnings("serial") private static final Map DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap() @@ -68,12 +57,11 @@ public class VirtualHostStoreUpgraderAndRecoverer public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode virtualHostNode, ConfiguredObjectFactory objectFactory) { _virtualHostNode = virtualHostNode; - _objectFactory = objectFactory; - register(new UpgraderFactory_0_0()); - register(new UpgraderFactory_0_1()); - register(new UpgraderFactory_0_2()); - register(new UpgraderFactory_0_3()); - register(new UpgraderFactory_0_4()); + register(new Upgrader_0_0_to_0_1()); + register(new Upgrader_0_1_to_0_2()); + register(new Upgrader_0_2_to_0_3()); + register(new Upgrader_0_3_to_0_4()); + register(new Upgrader_0_4_to_0_5()); Map defaultExchangeIds = new HashMap(); for (String exchangeName : DEFAULT_EXCHANGES.keySet()) @@ -84,9 +72,9 @@ public class VirtualHostStoreUpgraderAndRecoverer _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds); } - private void register(UpgraderPhaseFactory factory) + private void register(StoreUpgraderPhase upgrader) { - _upgraders.put(factory.getFromVersion(), factory); + _upgraders.put(upgrader.getFromVersion(), upgrader); } /* @@ -94,100 +82,91 @@ public class VirtualHostStoreUpgraderAndRecoverer * such bindings would have been ignored, starting from the point at which the config version changed, these * arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue. */ - private class UpgraderFactory_0_0 extends UpgraderPhaseFactory + private class Upgrader_0_0_to_0_1 extends StoreUpgraderPhase { private final Map _records = new HashMap(); - public UpgraderFactory_0_0() + public Upgrader_0_0_to_0_1() { - super("0.0", "0.1"); + super("modelVersion", "0.0", "0.1"); } - @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(final ConfiguredObjectRecord record) { - return new StoreUpgraderPhase("modelVersion", getToVersion()) - { + _records.put(record.getId(), record); + } - @Override - public void configuredObject(final ConfiguredObjectRecord record) - { - _records.put(record.getId(), record); - } + private void removeSelectorArguments(Map binding) + { + @SuppressWarnings("unchecked") + Map arguments = new LinkedHashMap((Map)binding.get(Binding.ARGUMENTS)); - private void removeSelectorArguments(Map binding) - { - @SuppressWarnings("unchecked") - Map arguments = new LinkedHashMap((Map)binding.get(Binding.ARGUMENTS)); + FilterSupport.removeFilters(arguments); + binding.put(Binding.ARGUMENTS, arguments); + } - FilterSupport.removeFilters(arguments); - binding.put(Binding.ARGUMENTS, arguments); - } + private boolean isTopicExchange(ConfiguredObjectRecord entry) + { + ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange"); + if (exchangeRecord == null) + { + return false; + } + UUID exchangeId = exchangeRecord.getId(); - private boolean isTopicExchange(ConfiguredObjectRecord entry) + if(_records.containsKey(exchangeId)) + { + return "topic".equals(_records.get(exchangeId) + .getAttributes() + .get(org.apache.qpid.server.model.Exchange.TYPE)); + } + else + { + if (_defaultExchangeIds.get("amq.topic").equals(exchangeId)) { - ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange"); - if (exchangeRecord == null) - { - return false; - } - UUID exchangeId = exchangeRecord.getId(); + return true; + } - if(_records.containsKey(exchangeId)) - { - return "topic".equals(_records.get(exchangeId) - .getAttributes() - .get(org.apache.qpid.server.model.Exchange.TYPE)); - } - else - { - if (_defaultExchangeIds.get("amq.topic").equals(exchangeId)) - { - return true; - } + return false; + } - return false; - } + } - } + private boolean hasSelectorArguments(Map binding) + { + @SuppressWarnings("unchecked") + Map arguments = (Map) binding.get(Binding.ARGUMENTS); + return (arguments != null) && FilterSupport.argumentsContainFilter(arguments); + } - private boolean hasSelectorArguments(Map binding) + @Override + public void complete() + { + for(Map.Entry entry : _records.entrySet()) + { + ConfiguredObjectRecord record = entry.getValue(); + String type = record.getType(); + Map attributes = record.getAttributes(); + UUID id = record.getId(); + if ("org.apache.qpid.server.model.VirtualHost".equals(type)) { - @SuppressWarnings("unchecked") - Map arguments = (Map) binding.get(Binding.ARGUMENTS); - return (arguments != null) && FilterSupport.argumentsContainFilter(arguments); + record = upgradeRootRecord(record); } - - @Override - public void complete() + else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record)) { - for(Map.Entry entry : _records.entrySet()) - { - ConfiguredObjectRecord record = entry.getValue(); - String type = record.getType(); - Map attributes = record.getAttributes(); - UUID id = record.getId(); - if ("org.apache.qpid.server.model.VirtualHost".equals(type)) - { - record = upgradeRootRecord(record); - } - else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record)) - { - attributes = new LinkedHashMap(attributes); - removeSelectorArguments(attributes); - - record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents()); - getUpdateMap().put(id, record); - entry.setValue(record); - - } - getNextUpgrader().configuredObject(record); - } + attributes = new LinkedHashMap(attributes); + removeSelectorArguments(attributes); + + record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents()); + getUpdateMap().put(id, record); + entry.setValue(record); - getNextUpgrader().complete(); } - }; + getNextUpgrader().configuredObject(record); + } + + getNextUpgrader().complete(); } } @@ -196,76 +175,68 @@ public class VirtualHostStoreUpgraderAndRecoverer * Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker * configuration store). Also remove bindings which reference nonexistent queues or exchanges. */ - private class UpgraderFactory_0_1 extends UpgraderPhaseFactory + private class Upgrader_0_1_to_0_2 extends StoreUpgraderPhase { - protected UpgraderFactory_0_1() + public Upgrader_0_1_to_0_2() { - super("0.1", "0.2"); + super("modelVersion", "0.1", "0.2"); } @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(final ConfiguredObjectRecord record) { - return new StoreUpgraderPhase("modelVersion", getToVersion()) + String type = record.getType().substring(1 + record.getType().lastIndexOf('.')); + ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents()); + getUpdateMap().put(record.getId(), newRecord); + + if ("VirtualHost".equals(type)) { + newRecord = upgradeRootRecord(newRecord); + } + } - @Override - public void configuredObject(final ConfiguredObjectRecord record) + @Override + public void complete() + { + for (Iterator> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();) + { + Map.Entry entry = iterator.next(); + final ConfiguredObjectRecord record = entry.getValue(); + final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName()); + final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName()); + if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId()) + || queueParent == null || unknownQueue(queueParent.getId()))) { - String type = record.getType().substring(1 + record.getType().lastIndexOf('.')); - ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents()); - getUpdateMap().put(record.getId(), newRecord); - - if ("VirtualHost".equals(type)) - { - newRecord = upgradeRootRecord(newRecord); - } + getDeleteMap().put(entry.getKey(), entry.getValue()); + iterator.remove(); } - - @Override - public void complete() + else { - for (Iterator> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();) - { - Map.Entry entry = iterator.next(); - final ConfiguredObjectRecord record = entry.getValue(); - final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName()); - final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName()); - if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId()) - || queueParent == null || unknownQueue(queueParent.getId()))) - { - getDeleteMap().put(entry.getKey(), entry.getValue()); - iterator.remove(); - } - else - { - getNextUpgrader().configuredObject(record); - } - } - getNextUpgrader().complete(); + getNextUpgrader().configuredObject(record); } + } + getNextUpgrader().complete(); + } - private boolean unknownExchange(final UUID exchangeId) - { - if (_defaultExchangeIds.containsValue(exchangeId)) - { - return false; - } - ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId); - return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName())); - } + private boolean unknownExchange(final UUID exchangeId) + { + if (_defaultExchangeIds.containsValue(exchangeId)) + { + return false; + } + ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId); + return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName())); + } - private boolean unknownQueue(final UUID queueId) - { - ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId); - return !(localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName())); - } + private boolean unknownQueue(final UUID queueId) + { + ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId); + return !(localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName())); + } - private boolean isBinding(final String type) - { - return Binding.class.getSimpleName().equals(type); - } - }; + private boolean isBinding(final String type) + { + return Binding.class.getSimpleName().equals(type); } } @@ -274,52 +245,46 @@ public class VirtualHostStoreUpgraderAndRecoverer * Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the * attributes into the map using the model attribute names rather than the wire attribute names */ - private class UpgraderFactory_0_2 extends UpgraderPhaseFactory + private class Upgrader_0_2_to_0_3 extends StoreUpgraderPhase { - protected UpgraderFactory_0_2() + private static final String ARGUMENTS = "arguments"; + + public Upgrader_0_2_to_0_3() { - super("0.2", "0.3"); + super("modelVersion", "0.2", "0.3"); } + @SuppressWarnings("unchecked") @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(ConfiguredObjectRecord record) { - return new StoreUpgraderPhase("modelVersion", getToVersion()) + if("VirtualHost".equals(record.getType())) { - private static final String ARGUMENTS = "arguments"; - - @SuppressWarnings("unchecked") - @Override - public void configuredObject(ConfiguredObjectRecord record) + record = upgradeRootRecord(record); + } + else if("Queue".equals(record.getType())) + { + Map newAttributes = new LinkedHashMap(); + if(record.getAttributes().get(ARGUMENTS) instanceof Map) { - if("VirtualHost".equals(record.getType())) - { - record = upgradeRootRecord(record); - } - else if("Queue".equals(record.getType())) - { - Map newAttributes = new LinkedHashMap(); - if(record.getAttributes().get(ARGUMENTS) instanceof Map) - { - newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map) record.getAttributes() - .get(ARGUMENTS))); - } - newAttributes.putAll(record.getAttributes()); - - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); - } - - getNextUpgrader().configuredObject(record); + newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map) record.getAttributes() + .get(ARGUMENTS))); } + newAttributes.putAll(record.getAttributes()); - @Override - public void complete() - { - getNextUpgrader().complete(); - } - }; + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + } + + getNextUpgrader().configuredObject(record); } + + @Override + public void complete() + { + getNextUpgrader().complete(); + } + } /* @@ -327,387 +292,125 @@ public class VirtualHostStoreUpgraderAndRecoverer * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER" * ensure OWNER is null unless the exclusivity policy is CONTAINER */ - private class UpgraderFactory_0_3 extends UpgraderPhaseFactory + private class Upgrader_0_3_to_0_4 extends StoreUpgraderPhase { - protected UpgraderFactory_0_3() + private static final String EXCLUSIVE = "exclusive"; + + public Upgrader_0_3_to_0_4() { - super("0.3", "0.4"); + super("modelVersion", "0.3", "0.4"); } + @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(ConfiguredObjectRecord record) { - return new StoreUpgraderPhase("modelVersion", getToVersion()) + if("VirtualHost".equals(record.getType())) { - private static final String EXCLUSIVE = "exclusive"; - - @Override - public void configuredObject(ConfiguredObjectRecord record) + record = upgradeRootRecord(record); + } + else if(Queue.class.getSimpleName().equals(record.getType())) + { + Map newAttributes = new LinkedHashMap(record.getAttributes()); + if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean) { - if("VirtualHost".equals(record.getType())) - { - record = upgradeRootRecord(record); - } - else if(Queue.class.getSimpleName().equals(record.getType())) + boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE); + newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE"); + if(!isExclusive && record.getAttributes().containsKey("owner")) { - Map newAttributes = new LinkedHashMap(record.getAttributes()); - if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean) - { - boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE); - newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE"); - if(!isExclusive && record.getAttributes().containsKey("owner")) - { - newAttributes.remove("owner"); - } - } - else - { - newAttributes.remove("owner"); - } - if(!record.getAttributes().containsKey("durable")) - { - newAttributes.put("durable","true"); - } - - record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); + newAttributes.remove("owner"); } - - getNextUpgrader().configuredObject(record); } - - @Override - public void complete() + else { - getNextUpgrader().complete(); + newAttributes.remove("owner"); } - }; - } - } - - private class UpgraderFactory_0_4 extends UpgraderPhaseFactory - { - protected UpgraderFactory_0_4() - { - super("0.4", "2.0"); - } - - @Override - public StoreUpgraderPhase newInstance() - { - return new StoreUpgraderPhase("modelVersion", getToVersion()) - { - private Map _missingAmqpExchanges = new HashMap(DEFAULT_EXCHANGES); - private static final String EXCHANGE_NAME = "name"; - private static final String EXCHANGE_TYPE = "type"; - private static final String EXCHANGE_DURABLE = "durable"; - private ConfiguredObjectRecord _virtualHostRecord; - - @Override - public void configuredObject(ConfiguredObjectRecord record) + if(!record.getAttributes().containsKey("durable")) { - if("VirtualHost".equals(record.getType())) - { - record = upgradeRootRecord(record); - Map virtualHostAttributes = new HashMap(record.getAttributes()); - virtualHostAttributes.put("name", _virtualHostNode.getName()); - virtualHostAttributes.put("modelVersion", getToVersion()); - record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.emptyMap()); - _virtualHostRecord = record; - } - else if("Exchange".equals(record.getType())) - { - Map attributes = record.getAttributes(); - String name = (String)attributes.get(EXCHANGE_NAME); - _missingAmqpExchanges.remove(name); - } - getNextUpgrader().configuredObject(record); + newAttributes.put("durable","true"); } - @Override - public void complete() - { - for (Entry entry : _missingAmqpExchanges.entrySet()) - { - String name = entry.getKey(); - String type = entry.getValue(); - UUID id = _defaultExchangeIds.get(name); - - Map attributes = new HashMap(); - attributes.put(EXCHANGE_NAME, name); - attributes.put(EXCHANGE_TYPE, type); - attributes.put(EXCHANGE_DURABLE, true); - - ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord)); - getUpdateMap().put(id, record); - - getNextUpgrader().configuredObject(record); - - } + record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + } - getNextUpgrader().complete(); - } - }; + getNextUpgrader().configuredObject(record); } - } + @Override + public void complete() + { + getNextUpgrader().complete(); + } - public void perform(DurableConfigurationStore durableConfigurationStore) - { - UpgradeAndRecoveryHandler vhrh = new UpgradeAndRecoveryHandler(_virtualHostNode, _objectFactory, durableConfigurationStore, _upgraders); - durableConfigurationStore.visitConfiguredObjectRecords(vhrh); } - //TODO: generalize this class - private static class UpgradeAndRecoveryHandler implements ConfiguredObjectRecordHandler + private class Upgrader_0_4_to_0_5 extends StoreUpgraderPhase { - private static Logger LOGGER = Logger.getLogger(UpgradeAndRecoveryHandler.class); + private Map _missingAmqpExchanges = new HashMap(DEFAULT_EXCHANGES); + private static final String EXCHANGE_NAME = "name"; + private static final String EXCHANGE_TYPE = "type"; + private static final String EXCHANGE_DURABLE = "durable"; + private ConfiguredObjectRecord _virtualHostRecord; - private final Map _records = new LinkedHashMap(); - private Map _upgraders; - - private final VirtualHostNode _parent; - private final ConfiguredObjectFactory _configuredObjectFactory; - private final DurableConfigurationStore _store; - - public UpgradeAndRecoveryHandler(VirtualHostNode parent, ConfiguredObjectFactory configuredObjectFactory, DurableConfigurationStore durableConfigurationStore, Map upgraders) + public Upgrader_0_4_to_0_5() { - super(); - _parent = parent; - _configuredObjectFactory = configuredObjectFactory; - _upgraders = upgraders; - _store = durableConfigurationStore; + super("modelVersion", "0.4", "2.0"); } @Override - public void begin() + public void configuredObject(ConfiguredObjectRecord record) { - } - - @Override - public boolean handle(final ConfiguredObjectRecord record) - { - _records.put(record.getId(), record); - return true; - } - - @Override - public void end() - { - String version = getCurrentVersion(); - - if (LOGGER.isInfoEnabled()) - { - LOGGER.info("Store has model version " + version + ". Number of record(s) " + _records.size()); - } - - DurableConfigurationStoreUpgrader upgrader = buildUpgraderChain(version); - - for(ConfiguredObjectRecord record : _records.values()) + if("VirtualHost".equals(record.getType())) { - upgrader.configuredObject(record); + record = upgradeRootRecord(record); + Map virtualHostAttributes = new HashMap(record.getAttributes()); + virtualHostAttributes.put("name", _virtualHostNode.getName()); + virtualHostAttributes.put("modelVersion", getToVersion()); + record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.emptyMap()); + _virtualHostRecord = record; } - - upgrader.complete(); - - Map deletedRecords = upgrader.getDeletedRecords(); - Map updatedRecords = upgrader.getUpdatedRecords(); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("VirtualHost store upgrade: " + deletedRecords.size() + " record(s) deleted"); - LOGGER.debug("VirtualHost store upgrade: " + updatedRecords.size() + " record(s) updated"); - LOGGER.debug("VirtualHost store upgrade: " + _records.size() + " total record(s)"); - } - - _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()])); - _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()])); - - _records.keySet().removeAll(deletedRecords.keySet()); - _records.putAll(updatedRecords); - - ConfiguredObjectRecord virtualHostRecord = null; - for (ConfiguredObjectRecord record : _records.values()) - { - LOGGER.debug("Found type " + record.getType()); - if ("VirtualHost".equals(record.getType())) - { - virtualHostRecord = record; - break; - } - } - - if (virtualHostRecord != null) + else if("Exchange".equals(record.getType())) { - String parentCategory = _parent.getCategoryClass().getSimpleName(); - ConfiguredObjectRecord parentRecord = new ConfiguredObjectRecordImpl(_parent.getId(), parentCategory, Collections.emptyMap()); - Map rootParents = Collections.singletonMap(parentCategory, parentRecord); - _records.put(virtualHostRecord.getId(), new ConfiguredObjectRecordImpl(virtualHostRecord.getId(), VirtualHost.class.getSimpleName(), virtualHostRecord.getAttributes(), rootParents)); - Collection records = _records.values(); - resolveObjects(_configuredObjectFactory, _parent, records.toArray(new ConfiguredObjectRecord[records.size()])); + Map attributes = record.getAttributes(); + String name = (String)attributes.get(EXCHANGE_NAME); + _missingAmqpExchanges.remove(name); } + getNextUpgrader().configuredObject(record); } - private DurableConfigurationStoreUpgrader buildUpgraderChain(String version) - { - DurableConfigurationStoreUpgrader head = null; - while(!BrokerModel.MODEL_VERSION.equals(version)) - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Adding virtual host store upgrader from model version: " + version); - } - final UpgraderPhaseFactory upgraderPhaseFactory = _upgraders.get(version); - StoreUpgraderPhase upgrader = upgraderPhaseFactory.newInstance(); - if(head == null) - { - head = upgrader; - } - else - { - head.setNextUpgrader(upgrader); - } - version = upgraderPhaseFactory.getToVersion(); - } - - if(head == null) - { - head = new NullUpgrader(); - } - else - { - head.setNextUpgrader(new NullUpgrader()); - } - - return head; - } - - private String getCurrentVersion() + @Override + public void complete() { - for(ConfiguredObjectRecord record : _records.values()) + for (Entry entry : _missingAmqpExchanges.entrySet()) { - if(record.getType().equals("VirtualHost")) - { - return (String) record.getAttributes().get(VirtualHost.MODEL_VERSION); - } - } - return BrokerModel.MODEL_VERSION; - } - - - public void resolveObjects(ConfiguredObjectFactory factory, ConfiguredObject root, ConfiguredObjectRecord... records) - { - Map> resolvedObjects = new HashMap>(); - resolvedObjects.put(root.getId(), root); + String name = entry.getKey(); + String type = entry.getValue(); + UUID id = _defaultExchangeIds.get(name); - Collection recordsWithUnresolvedParents = new ArrayList(Arrays.asList(records)); - Collection> recordsWithUnresolvedDependencies = - new ArrayList>(); + Map attributes = new HashMap(); + attributes.put(EXCHANGE_NAME, name); + attributes.put(EXCHANGE_TYPE, type); + attributes.put(EXCHANGE_DURABLE, true); - boolean updatesMade; + ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord)); + getUpdateMap().put(id, record); - do - { - updatesMade = false; - Iterator iter = recordsWithUnresolvedParents.iterator(); - while (iter.hasNext()) - { - ConfiguredObjectRecord record = iter.next(); - Collection> parents = new ArrayList>(); - boolean foundParents = true; - for (ConfiguredObjectRecord parent : record.getParents().values()) - { - if (!resolvedObjects.containsKey(parent.getId())) - { - foundParents = false; - break; - } - else - { - parents.add(resolvedObjects.get(parent.getId())); - } - } - if (foundParents) - { - iter.remove(); - UnresolvedConfiguredObject recovered = - factory.recover(record, parents.toArray(new ConfiguredObject[parents.size()])); - Collection> dependencies = - recovered.getUnresolvedDependencies(); - if (dependencies.isEmpty()) - { - updatesMade = true; - ConfiguredObject resolved = recovered.resolve(); - resolvedObjects.put(resolved.getId(), resolved); - } - else - { - recordsWithUnresolvedDependencies.add(recovered); - } - } + getNextUpgrader().configuredObject(record); - } - - Iterator> unresolvedIter = - recordsWithUnresolvedDependencies.iterator(); - - while(unresolvedIter.hasNext()) - { - UnresolvedConfiguredObject unresolvedObject = unresolvedIter.next(); - Collection> dependencies = - new ArrayList>(unresolvedObject.getUnresolvedDependencies()); + } - for(ConfiguredObjectDependency dependency : dependencies) - { - if(dependency instanceof ConfiguredObjectIdDependency) - { - UUID id = ((ConfiguredObjectIdDependency)dependency).getId(); - if(resolvedObjects.containsKey(id)) - { - dependency.resolve(resolvedObjects.get(id)); - } - } - else if(dependency instanceof ConfiguredObjectNameDependency) - { - ConfiguredObject dependentObject = null; - for(ConfiguredObject parent : unresolvedObject.getParents()) - { - dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), ((ConfiguredObjectNameDependency)dependency).getName()); - if(dependentObject != null) - { - break; - } - } - if(dependentObject != null) - { - dependency.resolve(dependentObject); - } - } - else - { - throw new ServerScopedRuntimeException("Unknown dependency type " + dependency.getClass().getSimpleName()); - } - } - if(unresolvedObject.getUnresolvedDependencies().isEmpty()) - { - updatesMade = true; - unresolvedIter.remove(); - ConfiguredObject resolved = unresolvedObject.resolve(); - resolvedObjects.put(resolved.getId(), resolved); - } - } + getNextUpgrader().complete(); + } - } while(updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() && recordsWithUnresolvedParents.isEmpty())); + } - if(!recordsWithUnresolvedDependencies.isEmpty()) - { - throw new IllegalArgumentException("Cannot resolve some objects: " + recordsWithUnresolvedDependencies); - } - if(!recordsWithUnresolvedParents.isEmpty()) - { - throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + recordsWithUnresolvedParents); - } - } + public void perform(DurableConfigurationStore durableConfigurationStore) + { + String virtualHostCategory = VirtualHost.class.getSimpleName(); + GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders); + upgraderHandler.upgrade(); + new GenericRecoverer(_virtualHostNode, virtualHostCategory).recover(upgraderHandler.getRecords()); } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java deleted file mode 100644 index 28d7830290..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.startup; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import junit.framework.TestCase; - -import org.apache.qpid.server.BrokerOptions; -import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer; -import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.configuration.RecovererProvider; -import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.logging.LogRecorder; -import org.apache.qpid.server.model.AuthenticationProvider; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.BrokerModel; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.ConfiguredObjectFactory; -import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; -import org.apache.qpid.server.model.GroupProvider; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.SystemContext; -import org.apache.qpid.server.model.SystemContextImpl; -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; -import org.apache.qpid.server.store.UnresolvedConfiguredObject; - -public class BrokerRecovererTest extends TestCase -{ - private ConfiguredObjectRecord _brokerEntry = mock(ConfiguredObjectRecord.class); - - private UUID _brokerId = UUID.randomUUID(); - private AuthenticationProvider _authenticationProvider1; - private UUID _authenticationProvider1Id = UUID.randomUUID(); - private SystemContext _systemContext; - private ConfiguredObjectFactory _configuredObjectFactory; - private TaskExecutor _taskExecutor; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - - _configuredObjectFactory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); - _taskExecutor = new TaskExecutor(); - _taskExecutor.start(); - _systemContext = new SystemContextImpl(_taskExecutor, - _configuredObjectFactory, mock(EventLogger.class), mock(LogRecorder.class), mock(BrokerOptions.class)); - - when(_brokerEntry.getId()).thenReturn(_brokerId); - when(_brokerEntry.getType()).thenReturn(Broker.class.getSimpleName()); - Map attributesMap = new HashMap(); - attributesMap.put(Broker.MODEL_VERSION, BrokerModel.MODEL_VERSION); - attributesMap.put(Broker.NAME, getName()); - - when(_brokerEntry.getAttributes()).thenReturn(attributesMap); - when(_brokerEntry.getParents()).thenReturn(Collections.singletonMap(SystemContext.class.getSimpleName(), _systemContext.asObjectRecord())); - - //Add a base AuthenticationProvider for all tests - _authenticationProvider1 = mock(AuthenticationProvider.class); - when(_authenticationProvider1.getName()).thenReturn("authenticationProvider1"); - when(_authenticationProvider1.getId()).thenReturn(_authenticationProvider1Id); - } - - @Override - protected void tearDown() throws Exception - { - super.tearDown(); - _taskExecutor.stop(); - } - - public void testCreateBrokerAttributes() - { - Map attributes = new HashMap(); - attributes.put(Broker.NAME, getName()); - attributes.put(Broker.DEFAULT_VIRTUAL_HOST, "test"); - attributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 1000); - attributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 2000); - attributes.put(Broker.STATISTICS_REPORTING_PERIOD, 4000); - attributes.put(Broker.STATISTICS_REPORTING_RESET_ENABLED, true); - attributes.put(Broker.MODEL_VERSION, BrokerModel.MODEL_VERSION); - - Map entryAttributes = new HashMap(); - for (Map.Entry attribute : attributes.entrySet()) - { - String value = convertToString(attribute.getValue()); - entryAttributes.put(attribute.getKey(), value); - } - - when(_brokerEntry.getAttributes()).thenReturn(entryAttributes); - - _systemContext.resolveObjects(_brokerEntry); - Broker broker = _systemContext.getBroker(); - - assertNotNull(broker); - - broker.open(); - - assertEquals(_brokerId, broker.getId()); - - for (Map.Entry attribute : attributes.entrySet()) - { - Object attributeValue = broker.getAttribute(attribute.getKey()); - assertEquals("Unexpected value of attribute '" + attribute.getKey() + "'", attribute.getValue(), attributeValue); - } - } - - public ConfiguredObjectRecord createAuthProviderRecord(UUID id, String name) - { - final Map authProviderAttrs = new HashMap(); - authProviderAttrs.put(AuthenticationProvider.NAME, name); - authProviderAttrs.put(AuthenticationProvider.TYPE, "Anonymous"); - - return new ConfiguredObjectRecordImpl(id, AuthenticationProvider.class.getSimpleName(), authProviderAttrs, Collections - .singletonMap(Broker.class.getSimpleName(), _brokerEntry)); - } - - - public ConfiguredObjectRecord createGroupProviderRecord(UUID id, String name) - { - final Map groupProviderAttrs = new HashMap(); - groupProviderAttrs.put(GroupProvider.NAME, name); - groupProviderAttrs.put(GroupProvider.TYPE, "GroupFile"); - groupProviderAttrs.put("path", "/no-such-path"); - - return new ConfiguredObjectRecordImpl(id, GroupProvider.class.getSimpleName(), groupProviderAttrs, Collections - .singletonMap(Broker.class.getSimpleName(), _brokerEntry)); - } - - public ConfiguredObjectRecord createPortRecord(UUID id, int port, Object authProviderRef) - { - final Map portAttrs = new HashMap(); - portAttrs.put(Port.NAME, "port-"+port); - portAttrs.put(Port.TYPE, "HTTP"); - portAttrs.put(Port.PORT, port); - portAttrs.put(Port.AUTHENTICATION_PROVIDER, authProviderRef); - - return new ConfiguredObjectRecordImpl(id, Port.class.getSimpleName(), portAttrs, Collections - .singletonMap(Broker.class.getSimpleName(), _brokerEntry)); - } - - - public void testCreateBrokerWithPorts() - { - UUID authProviderId = UUID.randomUUID(); - UUID portId = UUID.randomUUID(); - - _systemContext.resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord( - portId, - 5672, - "authProvider")); - Broker broker = _systemContext.getBroker(); - - - assertNotNull(broker); - broker.open(); - assertEquals(_brokerId, broker.getId()); - assertEquals(1, broker.getPorts().size()); - } - - public void testCreateBrokerWithOneAuthenticationProvider() - { - UUID authProviderId = UUID.randomUUID(); - - _systemContext.resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider")); - Broker broker = _systemContext.getBroker(); - - - assertNotNull(broker); - broker.open(); - assertEquals(_brokerId, broker.getId()); - assertEquals(1, broker.getAuthenticationProviders().size()); - - } - - public void testCreateBrokerWithMultipleAuthenticationProvidersAndPorts() - { - UUID authProviderId = UUID.randomUUID(); - UUID portId = UUID.randomUUID(); - UUID authProvider2Id = UUID.randomUUID(); - UUID port2Id = UUID.randomUUID(); - - _systemContext.resolveObjects(_brokerEntry, - createAuthProviderRecord(authProviderId, "authProvider"), - createPortRecord(portId, 5672, "authProvider"), - createAuthProviderRecord(authProvider2Id, "authProvider2"), - createPortRecord(port2Id, 5673, "authProvider2")); - Broker broker = _systemContext.getBroker(); - - - assertNotNull(broker); - broker.open(); - assertEquals(_brokerId, broker.getId()); - assertEquals(2, broker.getPorts().size()); - - assertEquals("Unexpected number of authentication providers", 2, broker.getAuthenticationProviders().size()); - - } - - public void testCreateBrokerWithGroupProvider() - { - - UUID authProviderId = UUID.randomUUID(); - - _systemContext.resolveObjects(_brokerEntry, createGroupProviderRecord(authProviderId, "groupProvider")); - Broker broker = _systemContext.getBroker(); - - - assertNotNull(broker); - broker.open(); - assertEquals(_brokerId, broker.getId()); - assertEquals(1, broker.getGroupProviders().size()); - - } - - public void testModelVersionValidationForIncompatibleMajorVersion() throws Exception - { - Map brokerAttributes = new HashMap(); - String[] incompatibleVersions = {Integer.MAX_VALUE + "." + 0, "0.0"}; - for (String incompatibleVersion : incompatibleVersions) - { - // need to reset all the shared objects for every iteration of the test - setUp(); - brokerAttributes.put(Broker.MODEL_VERSION, incompatibleVersion); - brokerAttributes.put(Broker.NAME, getName()); - when(_brokerEntry.getAttributes()).thenReturn(brokerAttributes); - - try - { - _systemContext.resolveObjects(_brokerEntry); - Broker broker = _systemContext.getBroker(); - broker.open(); - fail("The broker creation should fail due to unsupported model version"); - } - catch (IllegalConfigurationException e) - { - assertEquals("The model version '" + incompatibleVersion - + "' in configuration is incompatible with the broker model version '" + BrokerModel.MODEL_VERSION + "'", e.getMessage()); - } - } - } - - - public void testModelVersionValidationForIncompatibleMinorVersion() throws Exception - { - Map brokerAttributes = new HashMap(); - String incompatibleVersion = BrokerModel.MODEL_MAJOR_VERSION + "." + Integer.MAX_VALUE; - brokerAttributes.put(Broker.MODEL_VERSION, incompatibleVersion); - brokerAttributes.put(Broker.NAME, getName()); - - when(_brokerEntry.getAttributes()).thenReturn(brokerAttributes); - - try - { - UnresolvedConfiguredObject recover = - _configuredObjectFactory.recover(_brokerEntry, _systemContext); - - Broker broker = (Broker) recover.resolve(); - broker.open(); - fail("The broker creation should fail due to unsupported model version"); - } - catch (IllegalConfigurationException e) - { - assertEquals("The model version '" + incompatibleVersion - + "' in configuration is incompatible with the broker model version '" + BrokerModel.MODEL_VERSION + "'", e.getMessage()); - } - } - - public void testIncorrectModelVersion() throws Exception - { - Map brokerAttributes = new HashMap(); - brokerAttributes.put(Broker.NAME, getName()); - - String[] versions = { Integer.MAX_VALUE + "_" + 0, "", null }; - for (String modelVersion : versions) - { - brokerAttributes.put(Broker.MODEL_VERSION, modelVersion); - when(_brokerEntry.getAttributes()).thenReturn(brokerAttributes); - - try - { - UnresolvedConfiguredObject recover = - _configuredObjectFactory.recover(_brokerEntry, _systemContext); - Broker broker = (Broker) recover.resolve(); - broker.open(); - fail("The broker creation should fail due to unsupported model version"); - } - catch (IllegalConfigurationException e) - { - // pass - } - } - } - - private String convertToString(Object attributeValue) - { - return String.valueOf(attributeValue); - } - - private RecovererProvider createRecoveryProvider(final ConfiguredObjectRecord[] entries, final ConfiguredObject[] objectsToRecoverer) - { - RecovererProvider recovererProvider = new RecovererProvider() - { - @Override - public ConfiguredObjectRecoverer getRecoverer(String type) - { - @SuppressWarnings({ "unchecked", "rawtypes" }) - final ConfiguredObjectRecoverer recoverer = new ConfiguredObjectRecoverer() - { - public ConfiguredObject create(RecovererProvider recovererProvider, ConfiguredObjectRecord entry, ConfiguredObject... parents) - { - for (int i = 0; i < entries.length; i++) - { - ConfiguredObjectRecord e = entries[i]; - if (entry == e) - { - return objectsToRecoverer[i]; - } - } - return null; - } - }; - - return recoverer; - } - }; - return recovererProvider; - } -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java new file mode 100644 index 0000000000..26aa99a481 --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java @@ -0,0 +1,338 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import junit.framework.TestCase; + +import org.apache.qpid.server.BrokerOptions; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.LogRecorder; +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; +import org.apache.qpid.server.model.GroupProvider; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.SystemContext; +import org.apache.qpid.server.model.SystemContextImpl; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; +import org.apache.qpid.server.store.GenericRecoverer; +import org.apache.qpid.server.store.UnresolvedConfiguredObject; + +public class BrokerRecovererTest extends TestCase +{ + private ConfiguredObjectRecord _brokerEntry = mock(ConfiguredObjectRecord.class); + + private UUID _brokerId = UUID.randomUUID(); + private AuthenticationProvider _authenticationProvider1; + private UUID _authenticationProvider1Id = UUID.randomUUID(); + private SystemContext _systemContext; + private ConfiguredObjectFactory _configuredObjectFactory; + private TaskExecutor _taskExecutor; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _configuredObjectFactory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); + _systemContext = new SystemContextImpl(_taskExecutor, + _configuredObjectFactory, mock(EventLogger.class), mock(LogRecorder.class), mock(BrokerOptions.class)); + + when(_brokerEntry.getId()).thenReturn(_brokerId); + when(_brokerEntry.getType()).thenReturn(Broker.class.getSimpleName()); + Map attributesMap = new HashMap(); + attributesMap.put(Broker.MODEL_VERSION, BrokerModel.MODEL_VERSION); + attributesMap.put(Broker.NAME, getName()); + + when(_brokerEntry.getAttributes()).thenReturn(attributesMap); + when(_brokerEntry.getParents()).thenReturn(Collections.singletonMap(SystemContext.class.getSimpleName(), _systemContext.asObjectRecord())); + + //Add a base AuthenticationProvider for all tests + _authenticationProvider1 = mock(AuthenticationProvider.class); + when(_authenticationProvider1.getName()).thenReturn("authenticationProvider1"); + when(_authenticationProvider1.getId()).thenReturn(_authenticationProvider1Id); + } + + @Override + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + _taskExecutor.stop(); + } + } + + public void testCreateBrokerAttributes() + { + Map attributes = new HashMap(); + attributes.put(Broker.NAME, getName()); + attributes.put(Broker.DEFAULT_VIRTUAL_HOST, "test"); + attributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 1000); + attributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 2000); + attributes.put(Broker.STATISTICS_REPORTING_PERIOD, 4000); + attributes.put(Broker.STATISTICS_REPORTING_RESET_ENABLED, true); + attributes.put(Broker.MODEL_VERSION, BrokerModel.MODEL_VERSION); + + Map entryAttributes = new HashMap(); + for (Map.Entry attribute : attributes.entrySet()) + { + String value = convertToString(attribute.getValue()); + entryAttributes.put(attribute.getKey(), value); + } + + when(_brokerEntry.getAttributes()).thenReturn(entryAttributes); + + resolveObjects(_brokerEntry); + Broker broker = _systemContext.getBroker(); + + assertNotNull(broker); + + broker.open(); + + assertEquals(_brokerId, broker.getId()); + + for (Map.Entry attribute : attributes.entrySet()) + { + Object attributeValue = broker.getAttribute(attribute.getKey()); + assertEquals("Unexpected value of attribute '" + attribute.getKey() + "'", attribute.getValue(), attributeValue); + } + } + + public ConfiguredObjectRecord createAuthProviderRecord(UUID id, String name) + { + final Map authProviderAttrs = new HashMap(); + authProviderAttrs.put(AuthenticationProvider.NAME, name); + authProviderAttrs.put(AuthenticationProvider.TYPE, "Anonymous"); + + return new ConfiguredObjectRecordImpl(id, AuthenticationProvider.class.getSimpleName(), authProviderAttrs, Collections + .singletonMap(Broker.class.getSimpleName(), _brokerEntry)); + } + + + public ConfiguredObjectRecord createGroupProviderRecord(UUID id, String name) + { + final Map groupProviderAttrs = new HashMap(); + groupProviderAttrs.put(GroupProvider.NAME, name); + groupProviderAttrs.put(GroupProvider.TYPE, "GroupFile"); + groupProviderAttrs.put("path", "/no-such-path"); + + return new ConfiguredObjectRecordImpl(id, GroupProvider.class.getSimpleName(), groupProviderAttrs, Collections + .singletonMap(Broker.class.getSimpleName(), _brokerEntry)); + } + + public ConfiguredObjectRecord createPortRecord(UUID id, int port, Object authProviderRef) + { + final Map portAttrs = new HashMap(); + portAttrs.put(Port.NAME, "port-"+port); + portAttrs.put(Port.TYPE, "HTTP"); + portAttrs.put(Port.PORT, port); + portAttrs.put(Port.AUTHENTICATION_PROVIDER, authProviderRef); + + return new ConfiguredObjectRecordImpl(id, Port.class.getSimpleName(), portAttrs, Collections + .singletonMap(Broker.class.getSimpleName(), _brokerEntry)); + } + + + public void testCreateBrokerWithPorts() + { + UUID authProviderId = UUID.randomUUID(); + UUID portId = UUID.randomUUID(); + + resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord( + portId, + 5672, + "authProvider")); + Broker broker = _systemContext.getBroker(); + + + assertNotNull(broker); + broker.open(); + assertEquals(_brokerId, broker.getId()); + assertEquals(1, broker.getPorts().size()); + } + + public void testCreateBrokerWithOneAuthenticationProvider() + { + UUID authProviderId = UUID.randomUUID(); + + resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider")); + Broker broker = _systemContext.getBroker(); + + + assertNotNull(broker); + broker.open(); + assertEquals(_brokerId, broker.getId()); + assertEquals(1, broker.getAuthenticationProviders().size()); + + } + + public void testCreateBrokerWithMultipleAuthenticationProvidersAndPorts() + { + UUID authProviderId = UUID.randomUUID(); + UUID portId = UUID.randomUUID(); + UUID authProvider2Id = UUID.randomUUID(); + UUID port2Id = UUID.randomUUID(); + + resolveObjects(_brokerEntry, + createAuthProviderRecord(authProviderId, "authProvider"), + createPortRecord(portId, 5672, "authProvider"), + createAuthProviderRecord(authProvider2Id, "authProvider2"), + createPortRecord(port2Id, 5673, "authProvider2")); + Broker broker = _systemContext.getBroker(); + + + assertNotNull(broker); + broker.open(); + assertEquals(_brokerId, broker.getId()); + assertEquals(2, broker.getPorts().size()); + + assertEquals("Unexpected number of authentication providers", 2, broker.getAuthenticationProviders().size()); + + } + + public void testCreateBrokerWithGroupProvider() + { + + UUID authProviderId = UUID.randomUUID(); + + resolveObjects(_brokerEntry, createGroupProviderRecord(authProviderId, "groupProvider")); + Broker broker = _systemContext.getBroker(); + + + assertNotNull(broker); + broker.open(); + assertEquals(_brokerId, broker.getId()); + assertEquals(1, broker.getGroupProviders().size()); + + } + + public void testModelVersionValidationForIncompatibleMajorVersion() throws Exception + { + Map brokerAttributes = new HashMap(); + String[] incompatibleVersions = {Integer.MAX_VALUE + "." + 0, "0.0"}; + for (String incompatibleVersion : incompatibleVersions) + { + // need to reset all the shared objects for every iteration of the test + setUp(); + brokerAttributes.put(Broker.MODEL_VERSION, incompatibleVersion); + brokerAttributes.put(Broker.NAME, getName()); + when(_brokerEntry.getAttributes()).thenReturn(brokerAttributes); + + try + { + resolveObjects(_brokerEntry); + Broker broker = _systemContext.getBroker(); + broker.open(); + fail("The broker creation should fail due to unsupported model version"); + } + catch (IllegalConfigurationException e) + { + assertEquals("The model version '" + incompatibleVersion + + "' in configuration is incompatible with the broker model version '" + BrokerModel.MODEL_VERSION + "'", e.getMessage()); + } + } + } + + + public void testModelVersionValidationForIncompatibleMinorVersion() throws Exception + { + Map brokerAttributes = new HashMap(); + String incompatibleVersion = BrokerModel.MODEL_MAJOR_VERSION + "." + Integer.MAX_VALUE; + brokerAttributes.put(Broker.MODEL_VERSION, incompatibleVersion); + brokerAttributes.put(Broker.NAME, getName()); + + when(_brokerEntry.getAttributes()).thenReturn(brokerAttributes); + + try + { + UnresolvedConfiguredObject recover = + _configuredObjectFactory.recover(_brokerEntry, _systemContext); + + Broker broker = (Broker) recover.resolve(); + broker.open(); + fail("The broker creation should fail due to unsupported model version"); + } + catch (IllegalConfigurationException e) + { + assertEquals("The model version '" + incompatibleVersion + + "' in configuration is incompatible with the broker model version '" + BrokerModel.MODEL_VERSION + "'", e.getMessage()); + } + } + + public void testIncorrectModelVersion() throws Exception + { + Map brokerAttributes = new HashMap(); + brokerAttributes.put(Broker.NAME, getName()); + + String[] versions = { Integer.MAX_VALUE + "_" + 0, "", null }; + for (String modelVersion : versions) + { + brokerAttributes.put(Broker.MODEL_VERSION, modelVersion); + when(_brokerEntry.getAttributes()).thenReturn(brokerAttributes); + + try + { + UnresolvedConfiguredObject recover = + _configuredObjectFactory.recover(_brokerEntry, _systemContext); + Broker broker = (Broker) recover.resolve(); + broker.open(); + fail("The broker creation should fail due to unsupported model version"); + } + catch (IllegalConfigurationException e) + { + // pass + } + } + } + + private String convertToString(Object attributeValue) + { + return String.valueOf(attributeValue); + } + + private void resolveObjects(ConfiguredObjectRecord... records) + { + GenericRecoverer recoverer = new GenericRecoverer(_systemContext, Broker.class.getSimpleName()); + recoverer.recover(Arrays.asList(records)); + } + +} -- cgit v1.2.1