diff options
| author | Keith Wall <kwall@apache.org> | 2014-04-22 12:32:23 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-04-22 12:32:23 +0000 |
| commit | ecc7f3fc1c56cde10e9960afd1e4fbd4bcb07abf (patch) | |
| tree | 66f2f92568ee799b9991db441bd05b4c60b5b496 /qpid/java | |
| parent | a0c53a8a0a83bf67198717844e22cd122a2ab7c9 (diff) | |
| download | qpid-python-ecc7f3fc1c56cde10e9960afd1e4fbd4bcb07abf.tar.gz | |
QPID-5715: [Java Broker] Refactor VirtualHostStoreUpgraderAndRecoverer/BrokerStoreUpgrader to avoid duplicated code
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1589112 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
11 files changed, 793 insertions, 1043 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java index b2559f8668..3ccfc276fa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.Set; @@ -34,7 +35,6 @@ import javax.security.auth.Subject; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; - import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator; import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -152,6 +152,8 @@ public class Broker store = new ManagementModeStoreHandler(store, options); } + store.openConfigurationStore(systemContext, Collections.<String, Object>emptyMap()); + _applicationRegistry = new ApplicationRegistry(store,systemContext); try { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UpgraderPhaseFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UpgraderPhaseFactory.java deleted file mode 100644 index 26686c67cd..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UpgraderPhaseFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.startup; - -public abstract class UpgraderPhaseFactory -{ - private final String _toVersion; - private final String _fromVersion; - - protected UpgraderPhaseFactory(String fromVersion, String toVersion) - { - _toVersion = toVersion; - _fromVersion = fromVersion; - } - - public String getToVersion() - { - return _toVersion; - } - - public String getFromVersion() - { - return _fromVersion; - } - - public abstract StoreUpgraderPhase newInstance(); -}
\ No newline at end of file diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java index 03032ad1ed..04634ea2a6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java @@ -23,13 +23,10 @@ package org.apache.qpid.server.model; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; -import org.apache.qpid.server.store.ConfiguredObjectRecord; @ManagedObject (creatable = false) public interface SystemContext<X extends SystemContext<X>> extends ConfiguredObject<X> { - void resolveObjects(ConfiguredObjectRecord... records); - EventLogger getEventLogger(); BrokerOptions getBrokerOptions(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java index f0f5e96081..d9e9950972 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java @@ -84,141 +84,6 @@ public class SystemContextImpl extends AbstractConfiguredObject<SystemContextImp } @Override - public void resolveObjects(final ConfiguredObjectRecord... records) - { - runTask(new TaskExecutor.VoidTask() - { - @Override - public void execute() - { - - - ConfiguredObjectFactory factory = getObjectFactory(); - - Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, ConfiguredObject<?>>(); - resolvedObjects.put(getId(), SystemContextImpl.this); - - Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents = - new ArrayList<ConfiguredObjectRecord>(Arrays.asList(records)); - Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> recordsWithUnresolvedDependencies = - new ArrayList<UnresolvedConfiguredObject<? extends ConfiguredObject>>(); - - boolean updatesMade; - - do - { - updatesMade = false; - Iterator<ConfiguredObjectRecord> iter = recordsWithUnresolvedParents.iterator(); - while (iter.hasNext()) - { - ConfiguredObjectRecord record = iter.next(); - Collection<ConfiguredObject<?>> parents = new ArrayList<ConfiguredObject<?>>(); - boolean foundParents = true; - for (ConfiguredObjectRecord parent : record.getParents().values()) - { - if (!resolvedObjects.containsKey(parent.getId())) - { - foundParents = false; - break; - } - else - { - parents.add(resolvedObjects.get(parent.getId())); - } - } - if (foundParents) - { - iter.remove(); - UnresolvedConfiguredObject<? extends ConfiguredObject> recovered = - factory.recover(record, parents.toArray(new ConfiguredObject<?>[parents.size()])); - Collection<ConfiguredObjectDependency<?>> dependencies = - recovered.getUnresolvedDependencies(); - if (dependencies.isEmpty()) - { - updatesMade = true; - ConfiguredObject<?> resolved = recovered.resolve(); - resolvedObjects.put(resolved.getId(), resolved); - } - else - { - recordsWithUnresolvedDependencies.add(recovered); - } - } - - } - - Iterator<UnresolvedConfiguredObject<? extends ConfiguredObject>> unresolvedIter = - recordsWithUnresolvedDependencies.iterator(); - - while (unresolvedIter.hasNext()) - { - UnresolvedConfiguredObject<? extends ConfiguredObject> unresolvedObject = unresolvedIter.next(); - Collection<ConfiguredObjectDependency<?>> dependencies = - new ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies()); - - for (ConfiguredObjectDependency dependency : dependencies) - { - if (dependency instanceof ConfiguredObjectIdDependency) - { - UUID id = ((ConfiguredObjectIdDependency) dependency).getId(); - if (resolvedObjects.containsKey(id)) - { - dependency.resolve(resolvedObjects.get(id)); - } - } - else if (dependency instanceof ConfiguredObjectNameDependency) - { - ConfiguredObject<?> dependentObject = null; - for (ConfiguredObject<?> parent : unresolvedObject.getParents()) - { - dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), - ((ConfiguredObjectNameDependency) dependency) - .getName() - ); - if (dependentObject != null) - { - break; - } - } - if (dependentObject != null) - { - dependency.resolve(dependentObject); - } - } - else - { - throw new ServerScopedRuntimeException("Unknown dependency type " - + dependency.getClass() - .getSimpleName()); - } - } - if (unresolvedObject.getUnresolvedDependencies().isEmpty()) - { - updatesMade = true; - unresolvedIter.remove(); - ConfiguredObject<?> resolved = unresolvedObject.resolve(); - resolvedObjects.put(resolved.getId(), resolved); - } - } - - } while (updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() - && recordsWithUnresolvedParents.isEmpty())); - - if (!recordsWithUnresolvedDependencies.isEmpty()) - { - throw new IllegalArgumentException("Cannot resolve some objects: " - + recordsWithUnresolvedDependencies); - } - if (!recordsWithUnresolvedParents.isEmpty()) - { - throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" - + recordsWithUnresolvedParents); - } - } - }); - } - - @Override protected boolean setState(final State currentState, final State desiredState) { throw new IllegalArgumentException("Cannot change the state of the SystemContext object"); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 6ddf5f83dd..d3e80ecee7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -21,11 +21,9 @@ package org.apache.qpid.server.registry; import org.apache.log4j.Logger; - import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.configuration.startup.BrokerStoreUpgrader; import org.apache.qpid.server.logging.CompositeStartupMessageLogger; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.Log4jMessageLogger; @@ -35,6 +33,7 @@ import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.SystemContext; +import org.apache.qpid.server.store.BrokerStoreUpgraderAndRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.util.SystemUtils; @@ -77,8 +76,8 @@ public class ApplicationRegistry implements IApplicationRegistry logStartupMessages(startupLogger); - BrokerStoreUpgrader upgrader = new BrokerStoreUpgrader(_systemContext); - _broker = upgrader.upgrade(_store); + BrokerStoreUpgraderAndRecoverer upgrader = new BrokerStoreUpgraderAndRecoverer(_systemContext); + _broker = upgrader.perform(_store); _broker.setEventLogger(startupLogger); _broker.open(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java index 1a942a2e4a..7e49e0f6d3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java @@ -18,42 +18,30 @@ * under the License. * */ -package org.apache.qpid.server.configuration.startup; +package org.apache.qpid.server.store; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; -import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.SystemContext; -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader; -import org.apache.qpid.server.store.NullUpgrader; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.util.Action; -public class BrokerStoreUpgrader +public class BrokerStoreUpgraderAndRecoverer { - private final SystemContext _systemContext; - private Map<String, UpgraderPhaseFactory> _upgraders = new HashMap<String, UpgraderPhaseFactory>(); - + private final SystemContext<?> _systemContext; + private final Map<String, StoreUpgraderPhase> _upgraders = new HashMap<String, StoreUpgraderPhase>(); // Note: don't use externally defined constants in upgraders in case they change, the values here MUST stay the same // no matter what changes are made to the code in the future - - public BrokerStoreUpgrader(SystemContext systemContext) + public BrokerStoreUpgraderAndRecoverer(SystemContext<?> systemContext) { _systemContext = systemContext; @@ -63,199 +51,171 @@ public class BrokerStoreUpgrader register(new Upgrader_1_3_to_1_4()); } - private void register(UpgraderPhaseFactory factory) + private void register(StoreUpgraderPhase upgrader) { - _upgraders.put(factory.getFromVersion(), factory); + _upgraders.put(upgrader.getFromVersion(), upgrader); } - private final class Upgrader_1_0_to_1_1 extends UpgraderPhaseFactory + private static final class Upgrader_1_0_to_1_1 extends StoreUpgraderPhase { private Upgrader_1_0_to_1_1() { - super("1.0", "1.1"); + super("modelVersion", "1.0", "1.1"); } @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(ConfiguredObjectRecord record) { - return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion()) + if (record.getType().equals("Broker")) { - @Override - public void configuredObject(ConfiguredObjectRecord record) - { - if (record.getType().equals("Broker")) - { - record = upgradeRootRecord(record); - } - else if (record.getType().equals("VirtualHost") && record.getAttributes().containsKey("storeType")) - { - Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes()); - updatedAttributes.put("type", "STANDARD"); - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); - } - - getNextUpgrader().configuredObject(record); - } + record = upgradeRootRecord(record); + } + else if (record.getType().equals("VirtualHost") && record.getAttributes().containsKey("storeType")) + { + Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes()); + updatedAttributes.put("type", "STANDARD"); + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + } + getNextUpgrader().configuredObject(record); + } - @Override - public void complete() - { - getNextUpgrader().complete(); - } - }; + @Override + public void complete() + { + getNextUpgrader().complete(); } + } - private static final class Upgrader_1_1_to_1_2 extends UpgraderPhaseFactory + private static final class Upgrader_1_1_to_1_2 extends StoreUpgraderPhase { private Upgrader_1_1_to_1_2() { - super("1.1", "1.2"); + super("modelVersion", "1.1", "1.2"); } @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(ConfiguredObjectRecord record) { - return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion()) + if (record.getType().equals("Broker")) { + record = upgradeRootRecord(record); + } - @Override - public void configuredObject(ConfiguredObjectRecord record) - { - if (record.getType().equals("Broker")) - { - record = upgradeRootRecord(record); - } - - getNextUpgrader().configuredObject(record); + getNextUpgrader().configuredObject(record); - } + } - @Override - public void complete() - { - getNextUpgrader().complete(); - } - }; + @Override + public void complete() + { + getNextUpgrader().complete(); } + } - private static final class Upgrader_1_2_to_1_3 extends UpgraderPhaseFactory + private static final class Upgrader_1_2_to_1_3 extends StoreUpgraderPhase { private Upgrader_1_2_to_1_3() { - super("1.2", "1.3"); + super("modelVersion", "1.2", "1.3"); } @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(ConfiguredObjectRecord record) { - return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion()) + if (record.getType().equals("TrustStore") && record.getAttributes().containsKey("type")) { + Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes()); + updatedAttributes.put("trustStoreType", updatedAttributes.remove("type")); + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); - @Override - public void configuredObject(ConfiguredObjectRecord record) - { - if (record.getType().equals("TrustStore") && record.getAttributes().containsKey("type")) - { - Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes()); - updatedAttributes.put("trustStoreType", updatedAttributes.remove("type")); - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); - - } - else if (record.getType().equals("KeyStore") && record.getAttributes().containsKey("type")) - { - Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes()); - updatedAttributes.put("keyStoreType", updatedAttributes.remove("type")); - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); + } + else if (record.getType().equals("KeyStore") && record.getAttributes().containsKey("type")) + { + Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes()); + updatedAttributes.put("keyStoreType", updatedAttributes.remove("type")); + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); - } - else if (record.getType().equals("Broker")) - { - record = upgradeRootRecord(record); - } + } + else if (record.getType().equals("Broker")) + { + record = upgradeRootRecord(record); + } - getNextUpgrader().configuredObject(record); + getNextUpgrader().configuredObject(record); - } + } - @Override - public void complete() - { - getNextUpgrader().complete(); - } - }; + @Override + public void complete() + { + getNextUpgrader().complete(); } + } - private static final class Upgrader_1_3_to_1_4 extends UpgraderPhaseFactory + private static final class Upgrader_1_3_to_1_4 extends StoreUpgraderPhase { private Upgrader_1_3_to_1_4() { - super("1.3", "1.4"); + super("modelVersion", "1.3", "1.4"); } + @SuppressWarnings("serial") + private Map<String, VirtualHostEntryUpgrader> _vhostUpgraderMap = new HashMap<String, VirtualHostEntryUpgrader>() + {{ + put("BDB_HA", new BdbHaVirtualHostUpgrader()); + put("STANDARD", new StandardVirtualHostUpgrader()); + }}; + @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(ConfiguredObjectRecord record) { - return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion()) + if (record.getType().equals("VirtualHost")) { - - @SuppressWarnings("serial") - private Map<String, VirtualHostEntryUpgrader> _vhostUpgraderMap = new HashMap<String, VirtualHostEntryUpgrader>() - {{ - put("BDB_HA", new BdbHaVirtualHostUpgrader()); - put("STANDARD", new StandardVirtualHostUpgrader()); - }}; - - @Override - public void configuredObject(ConfiguredObjectRecord record) + Map<String, Object> attributes = record.getAttributes(); + if (attributes.containsKey("configPath")) { - if (record.getType().equals("VirtualHost")) - { - Map<String, Object> attributes = record.getAttributes(); - if (attributes.containsKey("configPath")) - { - throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name") + " having XML configuration is not supported. Virtual host configuration file is " + attributes.get("configPath")); - } + throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name") + " having XML configuration is not supported. Virtual host configuration file is " + attributes.get("configPath")); + } - String type = (String) attributes.get("type"); - VirtualHostEntryUpgrader vhostUpgrader = _vhostUpgraderMap.get(type); - if (vhostUpgrader == null) - { - throw new IllegalConfigurationException("Don't know how to perform an upgrade from version for virtualhost type " + type); - } - record = vhostUpgrader.upgrade(record); - getUpdateMap().put(record.getId(), record); - } - else if (record.getType().equals("Plugin") && record.getAttributes().containsKey("pluginType")) - { - Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes()); - updatedAttributes.put("type", updatedAttributes.remove("pluginType")); - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); + String type = (String) attributes.get("type"); + VirtualHostEntryUpgrader vhostUpgrader = _vhostUpgraderMap.get(type); + if (vhostUpgrader == null) + { + throw new IllegalConfigurationException("Don't know how to perform an upgrade from version for virtualhost type " + type); + } + record = vhostUpgrader.upgrade(record); + getUpdateMap().put(record.getId(), record); + } + else if (record.getType().equals("Plugin") && record.getAttributes().containsKey("pluginType")) + { + Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes()); + updatedAttributes.put("type", updatedAttributes.remove("pluginType")); + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); - } - else if (record.getType().equals("Broker")) - { - record = upgradeRootRecord(record); - } + } + else if (record.getType().equals("Broker")) + { + record = upgradeRootRecord(record); + } - getNextUpgrader().configuredObject(record); + getNextUpgrader().configuredObject(record); - } + } - @Override - public void complete() - { - getNextUpgrader().complete(); - } - }; + @Override + public void complete() + { + getNextUpgrader().complete(); } + } private static interface VirtualHostEntryUpgrader @@ -560,163 +520,52 @@ public class BrokerStoreUpgrader } } - public Broker<?> upgrade(DurableConfigurationStore store) - { - final BrokerStoreRecoveryHandler recoveryHandler = new BrokerStoreRecoveryHandler(_systemContext, store, _upgraders); - store.openConfigurationStore(_systemContext, Collections.<String,Object>emptyMap()); - store.visitConfiguredObjectRecords(recoveryHandler); - - return recoveryHandler.getBroker(); - } - - - private static class BrokerStoreRecoveryHandler implements ConfiguredObjectRecordHandler + public Broker<?> perform(final DurableConfigurationStore store) { - private static Logger LOGGER = Logger.getLogger(BrokerStoreRecoveryHandler.class); - - private DurableConfigurationStoreUpgrader _upgrader; - private DurableConfigurationStore _store; - private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>(); - private final SystemContext _systemContext; - private Map<String, UpgraderPhaseFactory> _upgraders; - - private BrokerStoreRecoveryHandler(final SystemContext systemContext, DurableConfigurationStore store, Map<String, UpgraderPhaseFactory> upgraders) - { - _systemContext = systemContext; - _store = store; - _upgraders = upgraders; - } - + final String brokerCategory = Broker.class.getSimpleName(); + final GenericStoreUpgrader upgrader = new GenericStoreUpgrader(brokerCategory, Broker.MODEL_VERSION, store, _upgraders); + upgrader.upgrade(); - @Override - public void begin() - { - } + new GenericRecoverer(_systemContext, brokerCategory).recover(upgrader.getRecords()); - @Override - public boolean handle(final ConfiguredObjectRecord object) + final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store); + applyRecursively(_systemContext.getBroker(), new Action<ConfiguredObject<?>>() { - _records.put(object.getId(), object); - return true; - } - - @Override - public void end() - { - String version = getCurrentVersion(); - - while(!BrokerModel.MODEL_VERSION.equals(version)) - { - LOGGER.debug("Adding broker store upgrader from model version: " + version); - final UpgraderPhaseFactory upgraderPhaseFactory = _upgraders.get(version); - StoreUpgraderPhase upgrader = upgraderPhaseFactory.newInstance(); - if(_upgrader == null) - { - _upgrader = upgrader; - } - else - { - _upgrader.setNextUpgrader(upgrader); - } - version = upgraderPhaseFactory.getToVersion(); - } - - if(_upgrader == null) - { - _upgrader = new NullUpgrader(); - } - else + @Override + public void performAction(final ConfiguredObject<?> object) { - _upgrader.setNextUpgrader(new NullUpgrader()); + object.addChangeListener(configChangeListener); } + }); - for(ConfiguredObjectRecord record : _records.values()) - { - _upgrader.configuredObject(record); - } - - Map<UUID, ConfiguredObjectRecord> deletedRecords = _upgrader.getDeletedRecords(); - Map<UUID, ConfiguredObjectRecord> updatedRecords = _upgrader.getUpdatedRecords(); - - LOGGER.debug("Broker store upgrade: " + deletedRecords.size() + " records deleted"); - LOGGER.debug("Broker store upgrade: " + updatedRecords.size() + " records updated"); - LOGGER.debug("Broker store upgrade: " + _records.size() + " total records"); - - _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()])); - _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()])); - - - - - _records.keySet().removeAll(deletedRecords.keySet()); - _records.putAll(updatedRecords); - - _systemContext.resolveObjects(_records.values().toArray(new ConfiguredObjectRecord[_records.size()])); - - final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(_store); - applyRecursively(_systemContext.getBroker(), - new Action<ConfiguredObject<?>>() - { - @Override - public void performAction(final ConfiguredObject<?> object) - { - object.addChangeListener(configChangeListener); - } - - - }); - - } - - private void applyRecursively(final ConfiguredObject<?> object, final Action<ConfiguredObject<?>> action) - { - applyRecursively(object, action, new HashSet<ConfiguredObject<?>>()); - } + return _systemContext.getBroker(); + } - private void applyRecursively(final ConfiguredObject<?> object, - final Action<ConfiguredObject<?>> action, - final HashSet<ConfiguredObject<?>> visited) - { - if(!visited.contains(object)) - { - visited.add(object); - action.performAction(object); - for(Class<? extends ConfiguredObject> childClass : object.getModel().getChildTypes(object.getCategoryClass())) - { - Collection<? extends ConfiguredObject> children = object.getChildren(childClass); - if(children != null) - { - for(ConfiguredObject<?> child : children) - { - applyRecursively(child, action, visited); - } - } - } - } - } + private void applyRecursively(final ConfiguredObject<?> object, final Action<ConfiguredObject<?>> action) + { + applyRecursively(object, action, new HashSet<ConfiguredObject<?>>()); + } - private String getCurrentVersion() + private void applyRecursively(final ConfiguredObject<?> object, + final Action<ConfiguredObject<?>> action, + final HashSet<ConfiguredObject<?>> visited) + { + if(!visited.contains(object)) { - for(ConfiguredObjectRecord record : _records.values()) + visited.add(object); + action.performAction(object); + for(Class<? extends ConfiguredObject> childClass : object.getModel().getChildTypes(object.getCategoryClass())) { - if(record.getType().equals("Broker")) + Collection<? extends ConfiguredObject> children = object.getChildren(childClass); + if(children != null) { - String version = (String) record.getAttributes().get(Broker.MODEL_VERSION); - if(version == null) + for(ConfiguredObject<?> child : children) { - version = "1.0"; + applyRecursively(child, action, visited); } - return version; } } - return BrokerModel.MODEL_VERSION; - } - - public Broker getBroker() - { - return _systemContext.getBroker(); } } - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java new file mode 100644 index 0000000000..01be6cf556 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java @@ -0,0 +1,218 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.util.ServerScopedRuntimeException; + +public class GenericRecoverer +{ + private static final Logger LOGGER = Logger.getLogger(GenericRecoverer.class); + + private final ConfiguredObject<?> _parentOfRoot; + private final String _rootCategory; + + public GenericRecoverer(ConfiguredObject<?> parentOfRoot, String rootCategory) + { + _parentOfRoot = parentOfRoot; + _rootCategory = rootCategory; + } + + public void recover(final List<ConfiguredObjectRecord> records) + { + _parentOfRoot.getTaskExecutor().run(new TaskExecutor.VoidTask() + { + @Override + public void execute() + { + performRecover(records); + } + + @Override + public String toString() + { + return _rootCategory + " recovery"; + } + }); + + } + + private void performRecover(List<ConfiguredObjectRecord> records) + { + ConfiguredObjectRecord rootRecord = null; + for (ConfiguredObjectRecord record : records) + { + if (_rootCategory.equals(record.getType())) + { + rootRecord = record; + break; + } + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Root record " + rootRecord); + } + + if (rootRecord != null) + { + + if (rootRecord.getParents() == null || rootRecord.getParents().isEmpty()) + { + records = new ArrayList<ConfiguredObjectRecord>(records); + + String parentOfRootCategory = _parentOfRoot.getCategoryClass().getSimpleName(); + ConfiguredObjectRecord parentRecord = new ConfiguredObjectRecordImpl(_parentOfRoot.getId(), parentOfRootCategory, Collections.<String, Object>emptyMap()); + Map<String, ConfiguredObjectRecord> rootParents = Collections.<String, ConfiguredObjectRecord>singletonMap(parentOfRootCategory, parentRecord); + records.remove(rootRecord); + records.add(new ConfiguredObjectRecordImpl(rootRecord.getId(), _rootCategory, rootRecord.getAttributes(), rootParents)); + } + + resolveObjects(_parentOfRoot, records); + } + } + + private void resolveObjects(ConfiguredObject<?> parentObject, List<ConfiguredObjectRecord> records) + { + ConfiguredObjectFactory factory = parentObject.getObjectFactory(); + Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, ConfiguredObject<?>>(); + resolvedObjects.put(parentObject.getId(), parentObject); + + Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents = new ArrayList<ConfiguredObjectRecord>(records); + Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> recordsWithUnresolvedDependencies = + new ArrayList<UnresolvedConfiguredObject<? extends ConfiguredObject>>(); + + boolean updatesMade; + + do + { + updatesMade = false; + Iterator<ConfiguredObjectRecord> iter = recordsWithUnresolvedParents.iterator(); + while (iter.hasNext()) + { + ConfiguredObjectRecord record = iter.next(); + Collection<ConfiguredObject<?>> parents = new ArrayList<ConfiguredObject<?>>(); + boolean foundParents = true; + for (ConfiguredObjectRecord parent : record.getParents().values()) + { + if (!resolvedObjects.containsKey(parent.getId())) + { + foundParents = false; + break; + } + else + { + parents.add(resolvedObjects.get(parent.getId())); + } + } + if (foundParents) + { + iter.remove(); + ConfiguredObject<?>[] parentArray = parents.toArray(new ConfiguredObject<?>[parents.size()]); + UnresolvedConfiguredObject<? extends ConfiguredObject> recovered = factory.recover(record, parentArray); + Collection<ConfiguredObjectDependency<?>> dependencies = recovered.getUnresolvedDependencies(); + if (dependencies.isEmpty()) + { + updatesMade = true; + ConfiguredObject<?> resolved = recovered.resolve(); + resolvedObjects.put(resolved.getId(), resolved); + } + else + { + recordsWithUnresolvedDependencies.add(recovered); + } + } + + } + + Iterator<UnresolvedConfiguredObject<? extends ConfiguredObject>> unresolvedIter = recordsWithUnresolvedDependencies.iterator(); + + while(unresolvedIter.hasNext()) + { + UnresolvedConfiguredObject<? extends ConfiguredObject> unresolvedObject = unresolvedIter.next(); + Collection<ConfiguredObjectDependency<?>> dependencies = + new ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies()); + + for(ConfiguredObjectDependency dependency : dependencies) + { + if(dependency instanceof ConfiguredObjectIdDependency) + { + UUID id = ((ConfiguredObjectIdDependency)dependency).getId(); + if(resolvedObjects.containsKey(id)) + { + dependency.resolve(resolvedObjects.get(id)); + } + } + else if(dependency instanceof ConfiguredObjectNameDependency) + { + ConfiguredObject<?> dependentObject = null; + for(ConfiguredObject<?> parent : unresolvedObject.getParents()) + { + dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), ((ConfiguredObjectNameDependency)dependency).getName()); + if(dependentObject != null) + { + break; + } + } + if(dependentObject != null) + { + dependency.resolve(dependentObject); + } + } + else + { + throw new ServerScopedRuntimeException("Unknown dependency type " + dependency.getClass().getSimpleName()); + } + } + if(unresolvedObject.getUnresolvedDependencies().isEmpty()) + { + updatesMade = true; + unresolvedIter.remove(); + ConfiguredObject<?> resolved = unresolvedObject.resolve(); + resolvedObjects.put(resolved.getId(), resolved); + } + } + + } while(updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() && recordsWithUnresolvedParents.isEmpty())); + + if(!recordsWithUnresolvedDependencies.isEmpty()) + { + throw new IllegalArgumentException("Cannot resolve some objects: " + recordsWithUnresolvedDependencies); + } + if(!recordsWithUnresolvedParents.isEmpty()) + { + throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + recordsWithUnresolvedParents); + } + } + +}
\ No newline at end of file diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericStoreUpgrader.java new file mode 100644 index 0000000000..26b60f765f --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericStoreUpgrader.java @@ -0,0 +1,170 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; + +public class GenericStoreUpgrader +{ + private static final Logger LOGGER = Logger.getLogger(GenericStoreUpgrader.class); + + private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>(); + private final Map<String, StoreUpgraderPhase> _upgraders; + private final DurableConfigurationStore _store; + private final String _rootCategory; + private final String _modelVersionAttributeName; + + public GenericStoreUpgrader(String rootCategory, String rootModelVersionAttributeName, DurableConfigurationStore configurationStore, Map<String, StoreUpgraderPhase> upgraders) + { + super(); + _upgraders = upgraders; + _store = configurationStore; + _rootCategory = rootCategory; + _modelVersionAttributeName = rootModelVersionAttributeName; + } + + + public List<ConfiguredObjectRecord> getRecords() + { + return new ArrayList<ConfiguredObjectRecord>(_records.values()); + } + + public void upgrade() + { + ConfiguredObjectRecordHandler handler = new ConfiguredObjectRecordHandler() + { + @Override + public void begin() + { + } + + @Override + public boolean handle(final ConfiguredObjectRecord record) + { + _records.put(record.getId(), record); + return true; + } + + @Override + public void end() + { + performUpgrade(); + } + }; + + _store.visitConfiguredObjectRecords(handler); + } + + private void performUpgrade() + { + String version = getCurrentVersion(); + + if (LOGGER.isInfoEnabled()) + { + LOGGER.info(_rootCategory + " store has model version " + version + ". Number of record(s) " + _records.size()); + } + + DurableConfigurationStoreUpgrader upgrader = buildUpgraderChain(version); + + for(ConfiguredObjectRecord record : _records.values()) + { + upgrader.configuredObject(record); + } + + upgrader.complete(); + + Map<UUID, ConfiguredObjectRecord> deletedRecords = upgrader.getDeletedRecords(); + Map<UUID, ConfiguredObjectRecord> updatedRecords = upgrader.getUpdatedRecords(); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(_rootCategory + " store upgrade is about to complete. " + _records.size() + " total record(s)." + + " Records to update " + updatedRecords.size() + + " Records to delete " + deletedRecords.size()); + } + + _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()])); + _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()])); + + _records.keySet().removeAll(deletedRecords.keySet()); + _records.putAll(updatedRecords); + } + + private DurableConfigurationStoreUpgrader buildUpgraderChain(String version) + { + DurableConfigurationStoreUpgrader head = null; + while(!BrokerModel.MODEL_VERSION.equals(version)) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Adding " + _rootCategory + " store upgrader from model version: " + version); + } + + StoreUpgraderPhase upgrader = _upgraders.get(version); + if (upgrader == null) + { + throw new IllegalConfigurationException("No phase upgrader for version " + version); + } + + if(head == null) + { + head = upgrader; + } + else + { + head.setNextUpgrader(upgrader); + } + version = upgrader.getToVersion(); + } + + if(head == null) + { + head = new NullUpgrader(); + } + else + { + head.setNextUpgrader(new NullUpgrader()); + } + + return head; + } + + private String getCurrentVersion() + { + for(ConfiguredObjectRecord record : _records.values()) + { + if(_rootCategory.equals(record.getType())) + { + return (String) record.getAttributes().get(_modelVersionAttributeName); + } + } + return BrokerModel.MODEL_VERSION; + } +}
\ No newline at end of file diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgraderPhase.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreUpgraderPhase.java index 5762a8f8e6..b40dc73186 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgraderPhase.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreUpgraderPhase.java @@ -18,23 +18,21 @@ * under the License. * */ -package org.apache.qpid.server.configuration.startup; +package org.apache.qpid.server.store; import java.util.HashMap; import java.util.Map; -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; -import org.apache.qpid.server.store.NonNullUpgrader; - public abstract class StoreUpgraderPhase extends NonNullUpgrader { + private final String _fromVersion; private final String _toVersion; private final String _versionAttributeName; - protected StoreUpgraderPhase(String versionAttributeName, String toVersion) + public StoreUpgraderPhase(String versionAttributeName, String fromVersion, String toVersion) { _toVersion = toVersion; + _fromVersion = fromVersion; _versionAttributeName = versionAttributeName; } @@ -46,4 +44,15 @@ public abstract class StoreUpgraderPhase extends NonNullUpgrader getUpdateMap().put(record.getId(), record); return record; } + + public String getFromVersion() + { + return _fromVersion; + } + + public String getToVersion() + { + return _toVersion; + } + }
\ No newline at end of file diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java index 226371a065..e1cffeb942 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.server.store; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -31,13 +28,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.UUID; -import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.startup.StoreUpgraderPhase; -import org.apache.qpid.server.configuration.startup.UpgraderPhaseFactory; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.BrokerModel; -import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; @@ -45,14 +37,11 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; -import org.apache.qpid.server.util.ServerScopedRuntimeException; public class VirtualHostStoreUpgraderAndRecoverer { - private final ConfiguredObjectFactory _objectFactory; private final VirtualHostNode<?> _virtualHostNode; - private Map<String, UpgraderPhaseFactory> _upgraders = new HashMap<String, UpgraderPhaseFactory>(); + private Map<String, StoreUpgraderPhase> _upgraders = new HashMap<String, StoreUpgraderPhase>(); @SuppressWarnings("serial") private static final Map<String, String> DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap<String, String>() @@ -68,12 +57,11 @@ public class VirtualHostStoreUpgraderAndRecoverer public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode<?> virtualHostNode, ConfiguredObjectFactory objectFactory) { _virtualHostNode = virtualHostNode; - _objectFactory = objectFactory; - register(new UpgraderFactory_0_0()); - register(new UpgraderFactory_0_1()); - register(new UpgraderFactory_0_2()); - register(new UpgraderFactory_0_3()); - register(new UpgraderFactory_0_4()); + register(new Upgrader_0_0_to_0_1()); + register(new Upgrader_0_1_to_0_2()); + register(new Upgrader_0_2_to_0_3()); + register(new Upgrader_0_3_to_0_4()); + register(new Upgrader_0_4_to_0_5()); Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>(); for (String exchangeName : DEFAULT_EXCHANGES.keySet()) @@ -84,9 +72,9 @@ public class VirtualHostStoreUpgraderAndRecoverer _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds); } - private void register(UpgraderPhaseFactory factory) + private void register(StoreUpgraderPhase upgrader) { - _upgraders.put(factory.getFromVersion(), factory); + _upgraders.put(upgrader.getFromVersion(), upgrader); } /* @@ -94,100 +82,91 @@ public class VirtualHostStoreUpgraderAndRecoverer * such bindings would have been ignored, starting from the point at which the config version changed, these * arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue. */ - private class UpgraderFactory_0_0 extends UpgraderPhaseFactory + private class Upgrader_0_0_to_0_1 extends StoreUpgraderPhase { private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>(); - public UpgraderFactory_0_0() + public Upgrader_0_0_to_0_1() { - super("0.0", "0.1"); + super("modelVersion", "0.0", "0.1"); } - @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(final ConfiguredObjectRecord record) { - return new StoreUpgraderPhase("modelVersion", getToVersion()) - { + _records.put(record.getId(), record); + } - @Override - public void configuredObject(final ConfiguredObjectRecord record) - { - _records.put(record.getId(), record); - } + private void removeSelectorArguments(Map<String, Object> binding) + { + @SuppressWarnings("unchecked") + Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS)); - private void removeSelectorArguments(Map<String, Object> binding) - { - @SuppressWarnings("unchecked") - Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS)); + FilterSupport.removeFilters(arguments); + binding.put(Binding.ARGUMENTS, arguments); + } - FilterSupport.removeFilters(arguments); - binding.put(Binding.ARGUMENTS, arguments); - } + private boolean isTopicExchange(ConfiguredObjectRecord entry) + { + ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange"); + if (exchangeRecord == null) + { + return false; + } + UUID exchangeId = exchangeRecord.getId(); - private boolean isTopicExchange(ConfiguredObjectRecord entry) + if(_records.containsKey(exchangeId)) + { + return "topic".equals(_records.get(exchangeId) + .getAttributes() + .get(org.apache.qpid.server.model.Exchange.TYPE)); + } + else + { + if (_defaultExchangeIds.get("amq.topic").equals(exchangeId)) { - ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange"); - if (exchangeRecord == null) - { - return false; - } - UUID exchangeId = exchangeRecord.getId(); + return true; + } - if(_records.containsKey(exchangeId)) - { - return "topic".equals(_records.get(exchangeId) - .getAttributes() - .get(org.apache.qpid.server.model.Exchange.TYPE)); - } - else - { - if (_defaultExchangeIds.get("amq.topic").equals(exchangeId)) - { - return true; - } + return false; + } - return false; - } + } - } + private boolean hasSelectorArguments(Map<String, Object> binding) + { + @SuppressWarnings("unchecked") + Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS); + return (arguments != null) && FilterSupport.argumentsContainFilter(arguments); + } - private boolean hasSelectorArguments(Map<String, Object> binding) + @Override + public void complete() + { + for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet()) + { + ConfiguredObjectRecord record = entry.getValue(); + String type = record.getType(); + Map<String, Object> attributes = record.getAttributes(); + UUID id = record.getId(); + if ("org.apache.qpid.server.model.VirtualHost".equals(type)) { - @SuppressWarnings("unchecked") - Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS); - return (arguments != null) && FilterSupport.argumentsContainFilter(arguments); + record = upgradeRootRecord(record); } - - @Override - public void complete() + else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record)) { - for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet()) - { - ConfiguredObjectRecord record = entry.getValue(); - String type = record.getType(); - Map<String, Object> attributes = record.getAttributes(); - UUID id = record.getId(); - if ("org.apache.qpid.server.model.VirtualHost".equals(type)) - { - record = upgradeRootRecord(record); - } - else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record)) - { - attributes = new LinkedHashMap<String, Object>(attributes); - removeSelectorArguments(attributes); - - record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents()); - getUpdateMap().put(id, record); - entry.setValue(record); - - } - getNextUpgrader().configuredObject(record); - } + attributes = new LinkedHashMap<String, Object>(attributes); + removeSelectorArguments(attributes); + + record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents()); + getUpdateMap().put(id, record); + entry.setValue(record); - getNextUpgrader().complete(); } - }; + getNextUpgrader().configuredObject(record); + } + + getNextUpgrader().complete(); } } @@ -196,76 +175,68 @@ public class VirtualHostStoreUpgraderAndRecoverer * Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker * configuration store). Also remove bindings which reference nonexistent queues or exchanges. */ - private class UpgraderFactory_0_1 extends UpgraderPhaseFactory + private class Upgrader_0_1_to_0_2 extends StoreUpgraderPhase { - protected UpgraderFactory_0_1() + public Upgrader_0_1_to_0_2() { - super("0.1", "0.2"); + super("modelVersion", "0.1", "0.2"); } @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(final ConfiguredObjectRecord record) { - return new StoreUpgraderPhase("modelVersion", getToVersion()) + String type = record.getType().substring(1 + record.getType().lastIndexOf('.')); + ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents()); + getUpdateMap().put(record.getId(), newRecord); + + if ("VirtualHost".equals(type)) { + newRecord = upgradeRootRecord(newRecord); + } + } - @Override - public void configuredObject(final ConfiguredObjectRecord record) + @Override + public void complete() + { + for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();) + { + Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next(); + final ConfiguredObjectRecord record = entry.getValue(); + final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName()); + final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName()); + if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId()) + || queueParent == null || unknownQueue(queueParent.getId()))) { - String type = record.getType().substring(1 + record.getType().lastIndexOf('.')); - ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents()); - getUpdateMap().put(record.getId(), newRecord); - - if ("VirtualHost".equals(type)) - { - newRecord = upgradeRootRecord(newRecord); - } + getDeleteMap().put(entry.getKey(), entry.getValue()); + iterator.remove(); } - - @Override - public void complete() + else { - for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();) - { - Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next(); - final ConfiguredObjectRecord record = entry.getValue(); - final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName()); - final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName()); - if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId()) - || queueParent == null || unknownQueue(queueParent.getId()))) - { - getDeleteMap().put(entry.getKey(), entry.getValue()); - iterator.remove(); - } - else - { - getNextUpgrader().configuredObject(record); - } - } - getNextUpgrader().complete(); + getNextUpgrader().configuredObject(record); } + } + getNextUpgrader().complete(); + } - private boolean unknownExchange(final UUID exchangeId) - { - if (_defaultExchangeIds.containsValue(exchangeId)) - { - return false; - } - ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId); - return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName())); - } + private boolean unknownExchange(final UUID exchangeId) + { + if (_defaultExchangeIds.containsValue(exchangeId)) + { + return false; + } + ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId); + return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName())); + } - private boolean unknownQueue(final UUID queueId) - { - ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId); - return !(localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName())); - } + private boolean unknownQueue(final UUID queueId) + { + ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId); + return !(localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName())); + } - private boolean isBinding(final String type) - { - return Binding.class.getSimpleName().equals(type); - } - }; + private boolean isBinding(final String type) + { + return Binding.class.getSimpleName().equals(type); } } @@ -274,52 +245,46 @@ public class VirtualHostStoreUpgraderAndRecoverer * Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the * attributes into the map using the model attribute names rather than the wire attribute names */ - private class UpgraderFactory_0_2 extends UpgraderPhaseFactory + private class Upgrader_0_2_to_0_3 extends StoreUpgraderPhase { - protected UpgraderFactory_0_2() + private static final String ARGUMENTS = "arguments"; + + public Upgrader_0_2_to_0_3() { - super("0.2", "0.3"); + super("modelVersion", "0.2", "0.3"); } + @SuppressWarnings("unchecked") @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(ConfiguredObjectRecord record) { - return new StoreUpgraderPhase("modelVersion", getToVersion()) + if("VirtualHost".equals(record.getType())) { - private static final String ARGUMENTS = "arguments"; - - @SuppressWarnings("unchecked") - @Override - public void configuredObject(ConfiguredObjectRecord record) + record = upgradeRootRecord(record); + } + else if("Queue".equals(record.getType())) + { + Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(); + if(record.getAttributes().get(ARGUMENTS) instanceof Map) { - if("VirtualHost".equals(record.getType())) - { - record = upgradeRootRecord(record); - } - else if("Queue".equals(record.getType())) - { - Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(); - if(record.getAttributes().get(ARGUMENTS) instanceof Map) - { - newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes() - .get(ARGUMENTS))); - } - newAttributes.putAll(record.getAttributes()); - - record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); - } - - getNextUpgrader().configuredObject(record); + newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes() + .get(ARGUMENTS))); } + newAttributes.putAll(record.getAttributes()); - @Override - public void complete() - { - getNextUpgrader().complete(); - } - }; + record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + } + + getNextUpgrader().configuredObject(record); } + + @Override + public void complete() + { + getNextUpgrader().complete(); + } + } /* @@ -327,387 +292,125 @@ public class VirtualHostStoreUpgraderAndRecoverer * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER" * ensure OWNER is null unless the exclusivity policy is CONTAINER */ - private class UpgraderFactory_0_3 extends UpgraderPhaseFactory + private class Upgrader_0_3_to_0_4 extends StoreUpgraderPhase { - protected UpgraderFactory_0_3() + private static final String EXCLUSIVE = "exclusive"; + + public Upgrader_0_3_to_0_4() { - super("0.3", "0.4"); + super("modelVersion", "0.3", "0.4"); } + @Override - public StoreUpgraderPhase newInstance() + public void configuredObject(ConfiguredObjectRecord record) { - return new StoreUpgraderPhase("modelVersion", getToVersion()) + if("VirtualHost".equals(record.getType())) { - private static final String EXCLUSIVE = "exclusive"; - - @Override - public void configuredObject(ConfiguredObjectRecord record) + record = upgradeRootRecord(record); + } + else if(Queue.class.getSimpleName().equals(record.getType())) + { + Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes()); + if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean) { - if("VirtualHost".equals(record.getType())) - { - record = upgradeRootRecord(record); - } - else if(Queue.class.getSimpleName().equals(record.getType())) + boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE); + newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE"); + if(!isExclusive && record.getAttributes().containsKey("owner")) { - Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes()); - if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean) - { - boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE); - newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE"); - if(!isExclusive && record.getAttributes().containsKey("owner")) - { - newAttributes.remove("owner"); - } - } - else - { - newAttributes.remove("owner"); - } - if(!record.getAttributes().containsKey("durable")) - { - newAttributes.put("durable","true"); - } - - record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents()); - getUpdateMap().put(record.getId(), record); + newAttributes.remove("owner"); } - - getNextUpgrader().configuredObject(record); } - - @Override - public void complete() + else { - getNextUpgrader().complete(); + newAttributes.remove("owner"); } - }; - } - } - - private class UpgraderFactory_0_4 extends UpgraderPhaseFactory - { - protected UpgraderFactory_0_4() - { - super("0.4", "2.0"); - } - - @Override - public StoreUpgraderPhase newInstance() - { - return new StoreUpgraderPhase("modelVersion", getToVersion()) - { - private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES); - private static final String EXCHANGE_NAME = "name"; - private static final String EXCHANGE_TYPE = "type"; - private static final String EXCHANGE_DURABLE = "durable"; - private ConfiguredObjectRecord _virtualHostRecord; - - @Override - public void configuredObject(ConfiguredObjectRecord record) + if(!record.getAttributes().containsKey("durable")) { - if("VirtualHost".equals(record.getType())) - { - record = upgradeRootRecord(record); - Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes()); - virtualHostAttributes.put("name", _virtualHostNode.getName()); - virtualHostAttributes.put("modelVersion", getToVersion()); - record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, ConfiguredObjectRecord>emptyMap()); - _virtualHostRecord = record; - } - else if("Exchange".equals(record.getType())) - { - Map<String, Object> attributes = record.getAttributes(); - String name = (String)attributes.get(EXCHANGE_NAME); - _missingAmqpExchanges.remove(name); - } - getNextUpgrader().configuredObject(record); + newAttributes.put("durable","true"); } - @Override - public void complete() - { - for (Entry<String, String> entry : _missingAmqpExchanges.entrySet()) - { - String name = entry.getKey(); - String type = entry.getValue(); - UUID id = _defaultExchangeIds.get(name); - - Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(EXCHANGE_NAME, name); - attributes.put(EXCHANGE_TYPE, type); - attributes.put(EXCHANGE_DURABLE, true); - - ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord)); - getUpdateMap().put(id, record); - - getNextUpgrader().configuredObject(record); - - } + record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents()); + getUpdateMap().put(record.getId(), record); + } - getNextUpgrader().complete(); - } - }; + getNextUpgrader().configuredObject(record); } - } + @Override + public void complete() + { + getNextUpgrader().complete(); + } - public void perform(DurableConfigurationStore durableConfigurationStore) - { - UpgradeAndRecoveryHandler vhrh = new UpgradeAndRecoveryHandler(_virtualHostNode, _objectFactory, durableConfigurationStore, _upgraders); - durableConfigurationStore.visitConfiguredObjectRecords(vhrh); } - //TODO: generalize this class - private static class UpgradeAndRecoveryHandler implements ConfiguredObjectRecordHandler + private class Upgrader_0_4_to_0_5 extends StoreUpgraderPhase { - private static Logger LOGGER = Logger.getLogger(UpgradeAndRecoveryHandler.class); + private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES); + private static final String EXCHANGE_NAME = "name"; + private static final String EXCHANGE_TYPE = "type"; + private static final String EXCHANGE_DURABLE = "durable"; + private ConfiguredObjectRecord _virtualHostRecord; - private final Map<UUID, ConfiguredObjectRecord> _records = new LinkedHashMap<UUID, ConfiguredObjectRecord>(); - private Map<String, UpgraderPhaseFactory> _upgraders; - - private final VirtualHostNode<?> _parent; - private final ConfiguredObjectFactory _configuredObjectFactory; - private final DurableConfigurationStore _store; - - public UpgradeAndRecoveryHandler(VirtualHostNode<?> parent, ConfiguredObjectFactory configuredObjectFactory, DurableConfigurationStore durableConfigurationStore, Map<String, UpgraderPhaseFactory> upgraders) + public Upgrader_0_4_to_0_5() { - super(); - _parent = parent; - _configuredObjectFactory = configuredObjectFactory; - _upgraders = upgraders; - _store = durableConfigurationStore; + super("modelVersion", "0.4", "2.0"); } @Override - public void begin() + public void configuredObject(ConfiguredObjectRecord record) { - } - - @Override - public boolean handle(final ConfiguredObjectRecord record) - { - _records.put(record.getId(), record); - return true; - } - - @Override - public void end() - { - String version = getCurrentVersion(); - - if (LOGGER.isInfoEnabled()) - { - LOGGER.info("Store has model version " + version + ". Number of record(s) " + _records.size()); - } - - DurableConfigurationStoreUpgrader upgrader = buildUpgraderChain(version); - - for(ConfiguredObjectRecord record : _records.values()) + if("VirtualHost".equals(record.getType())) { - upgrader.configuredObject(record); + record = upgradeRootRecord(record); + Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes()); + virtualHostAttributes.put("name", _virtualHostNode.getName()); + virtualHostAttributes.put("modelVersion", getToVersion()); + record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, ConfiguredObjectRecord>emptyMap()); + _virtualHostRecord = record; } - - upgrader.complete(); - - Map<UUID, ConfiguredObjectRecord> deletedRecords = upgrader.getDeletedRecords(); - Map<UUID, ConfiguredObjectRecord> updatedRecords = upgrader.getUpdatedRecords(); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("VirtualHost store upgrade: " + deletedRecords.size() + " record(s) deleted"); - LOGGER.debug("VirtualHost store upgrade: " + updatedRecords.size() + " record(s) updated"); - LOGGER.debug("VirtualHost store upgrade: " + _records.size() + " total record(s)"); - } - - _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()])); - _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()])); - - _records.keySet().removeAll(deletedRecords.keySet()); - _records.putAll(updatedRecords); - - ConfiguredObjectRecord virtualHostRecord = null; - for (ConfiguredObjectRecord record : _records.values()) - { - LOGGER.debug("Found type " + record.getType()); - if ("VirtualHost".equals(record.getType())) - { - virtualHostRecord = record; - break; - } - } - - if (virtualHostRecord != null) + else if("Exchange".equals(record.getType())) { - String parentCategory = _parent.getCategoryClass().getSimpleName(); - ConfiguredObjectRecord parentRecord = new ConfiguredObjectRecordImpl(_parent.getId(), parentCategory, Collections.<String, Object>emptyMap()); - Map<String, ConfiguredObjectRecord> rootParents = Collections.<String, ConfiguredObjectRecord>singletonMap(parentCategory, parentRecord); - _records.put(virtualHostRecord.getId(), new ConfiguredObjectRecordImpl(virtualHostRecord.getId(), VirtualHost.class.getSimpleName(), virtualHostRecord.getAttributes(), rootParents)); - Collection<ConfiguredObjectRecord> records = _records.values(); - resolveObjects(_configuredObjectFactory, _parent, records.toArray(new ConfiguredObjectRecord[records.size()])); + Map<String, Object> attributes = record.getAttributes(); + String name = (String)attributes.get(EXCHANGE_NAME); + _missingAmqpExchanges.remove(name); } + getNextUpgrader().configuredObject(record); } - private DurableConfigurationStoreUpgrader buildUpgraderChain(String version) - { - DurableConfigurationStoreUpgrader head = null; - while(!BrokerModel.MODEL_VERSION.equals(version)) - { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Adding virtual host store upgrader from model version: " + version); - } - final UpgraderPhaseFactory upgraderPhaseFactory = _upgraders.get(version); - StoreUpgraderPhase upgrader = upgraderPhaseFactory.newInstance(); - if(head == null) - { - head = upgrader; - } - else - { - head.setNextUpgrader(upgrader); - } - version = upgraderPhaseFactory.getToVersion(); - } - - if(head == null) - { - head = new NullUpgrader(); - } - else - { - head.setNextUpgrader(new NullUpgrader()); - } - - return head; - } - - private String getCurrentVersion() + @Override + public void complete() { - for(ConfiguredObjectRecord record : _records.values()) + for (Entry<String, String> entry : _missingAmqpExchanges.entrySet()) { - if(record.getType().equals("VirtualHost")) - { - return (String) record.getAttributes().get(VirtualHost.MODEL_VERSION); - } - } - return BrokerModel.MODEL_VERSION; - } - - - public void resolveObjects(ConfiguredObjectFactory factory, ConfiguredObject<?> root, ConfiguredObjectRecord... records) - { - Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, ConfiguredObject<?>>(); - resolvedObjects.put(root.getId(), root); + String name = entry.getKey(); + String type = entry.getValue(); + UUID id = _defaultExchangeIds.get(name); - Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents = new ArrayList<ConfiguredObjectRecord>(Arrays.asList(records)); - Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> recordsWithUnresolvedDependencies = - new ArrayList<UnresolvedConfiguredObject<? extends ConfiguredObject>>(); + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put(EXCHANGE_NAME, name); + attributes.put(EXCHANGE_TYPE, type); + attributes.put(EXCHANGE_DURABLE, true); - boolean updatesMade; + ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord)); + getUpdateMap().put(id, record); - do - { - updatesMade = false; - Iterator<ConfiguredObjectRecord> iter = recordsWithUnresolvedParents.iterator(); - while (iter.hasNext()) - { - ConfiguredObjectRecord record = iter.next(); - Collection<ConfiguredObject<?>> parents = new ArrayList<ConfiguredObject<?>>(); - boolean foundParents = true; - for (ConfiguredObjectRecord parent : record.getParents().values()) - { - if (!resolvedObjects.containsKey(parent.getId())) - { - foundParents = false; - break; - } - else - { - parents.add(resolvedObjects.get(parent.getId())); - } - } - if (foundParents) - { - iter.remove(); - UnresolvedConfiguredObject<? extends ConfiguredObject> recovered = - factory.recover(record, parents.toArray(new ConfiguredObject<?>[parents.size()])); - Collection<ConfiguredObjectDependency<?>> dependencies = - recovered.getUnresolvedDependencies(); - if (dependencies.isEmpty()) - { - updatesMade = true; - ConfiguredObject<?> resolved = recovered.resolve(); - resolvedObjects.put(resolved.getId(), resolved); - } - else - { - recordsWithUnresolvedDependencies.add(recovered); - } - } + getNextUpgrader().configuredObject(record); - } - - Iterator<UnresolvedConfiguredObject<? extends ConfiguredObject>> unresolvedIter = - recordsWithUnresolvedDependencies.iterator(); - - while(unresolvedIter.hasNext()) - { - UnresolvedConfiguredObject<? extends ConfiguredObject> unresolvedObject = unresolvedIter.next(); - Collection<ConfiguredObjectDependency<?>> dependencies = - new ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies()); + } - for(ConfiguredObjectDependency dependency : dependencies) - { - if(dependency instanceof ConfiguredObjectIdDependency) - { - UUID id = ((ConfiguredObjectIdDependency)dependency).getId(); - if(resolvedObjects.containsKey(id)) - { - dependency.resolve(resolvedObjects.get(id)); - } - } - else if(dependency instanceof ConfiguredObjectNameDependency) - { - ConfiguredObject<?> dependentObject = null; - for(ConfiguredObject<?> parent : unresolvedObject.getParents()) - { - dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), ((ConfiguredObjectNameDependency)dependency).getName()); - if(dependentObject != null) - { - break; - } - } - if(dependentObject != null) - { - dependency.resolve(dependentObject); - } - } - else - { - throw new ServerScopedRuntimeException("Unknown dependency type " + dependency.getClass().getSimpleName()); - } - } - if(unresolvedObject.getUnresolvedDependencies().isEmpty()) - { - updatesMade = true; - unresolvedIter.remove(); - ConfiguredObject<?> resolved = unresolvedObject.resolve(); - resolvedObjects.put(resolved.getId(), resolved); - } - } + getNextUpgrader().complete(); + } - } while(updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() && recordsWithUnresolvedParents.isEmpty())); + } - if(!recordsWithUnresolvedDependencies.isEmpty()) - { - throw new IllegalArgumentException("Cannot resolve some objects: " + recordsWithUnresolvedDependencies); - } - if(!recordsWithUnresolvedParents.isEmpty()) - { - throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + recordsWithUnresolvedParents); - } - } + public void perform(DurableConfigurationStore durableConfigurationStore) + { + String virtualHostCategory = VirtualHost.class.getSimpleName(); + GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders); + upgraderHandler.upgrade(); + new GenericRecoverer(_virtualHostNode, virtualHostCategory).recover(upgraderHandler.getRecords()); } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java index 28d7830290..26aa99a481 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java @@ -18,11 +18,12 @@ * under the License. * */ -package org.apache.qpid.server.configuration.startup; +package org.apache.qpid.server.store; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -31,9 +32,7 @@ import java.util.UUID; import junit.framework.TestCase; import org.apache.qpid.server.BrokerOptions; -import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer; import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.configuration.RecovererProvider; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; @@ -49,6 +48,7 @@ import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.SystemContextImpl; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; +import org.apache.qpid.server.store.GenericRecoverer; import org.apache.qpid.server.store.UnresolvedConfiguredObject; public class BrokerRecovererTest extends TestCase @@ -56,9 +56,9 @@ public class BrokerRecovererTest extends TestCase private ConfiguredObjectRecord _brokerEntry = mock(ConfiguredObjectRecord.class); private UUID _brokerId = UUID.randomUUID(); - private AuthenticationProvider _authenticationProvider1; + private AuthenticationProvider<?> _authenticationProvider1; private UUID _authenticationProvider1Id = UUID.randomUUID(); - private SystemContext _systemContext; + private SystemContext<?> _systemContext; private ConfiguredObjectFactory _configuredObjectFactory; private TaskExecutor _taskExecutor; @@ -91,8 +91,14 @@ public class BrokerRecovererTest extends TestCase @Override protected void tearDown() throws Exception { - super.tearDown(); - _taskExecutor.stop(); + try + { + super.tearDown(); + } + finally + { + _taskExecutor.stop(); + } } public void testCreateBrokerAttributes() @@ -115,8 +121,8 @@ public class BrokerRecovererTest extends TestCase when(_brokerEntry.getAttributes()).thenReturn(entryAttributes); - _systemContext.resolveObjects(_brokerEntry); - Broker broker = _systemContext.getBroker(); + resolveObjects(_brokerEntry); + Broker<?> broker = _systemContext.getBroker(); assertNotNull(broker); @@ -171,7 +177,7 @@ public class BrokerRecovererTest extends TestCase UUID authProviderId = UUID.randomUUID(); UUID portId = UUID.randomUUID(); - _systemContext.resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord( + resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord( portId, 5672, "authProvider")); @@ -188,7 +194,7 @@ public class BrokerRecovererTest extends TestCase { UUID authProviderId = UUID.randomUUID(); - _systemContext.resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider")); + resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider")); Broker<?> broker = _systemContext.getBroker(); @@ -206,7 +212,7 @@ public class BrokerRecovererTest extends TestCase UUID authProvider2Id = UUID.randomUUID(); UUID port2Id = UUID.randomUUID(); - _systemContext.resolveObjects(_brokerEntry, + resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord(portId, 5672, "authProvider"), createAuthProviderRecord(authProvider2Id, "authProvider2"), @@ -228,7 +234,7 @@ public class BrokerRecovererTest extends TestCase UUID authProviderId = UUID.randomUUID(); - _systemContext.resolveObjects(_brokerEntry, createGroupProviderRecord(authProviderId, "groupProvider")); + resolveObjects(_brokerEntry, createGroupProviderRecord(authProviderId, "groupProvider")); Broker<?> broker = _systemContext.getBroker(); @@ -253,7 +259,7 @@ public class BrokerRecovererTest extends TestCase try { - _systemContext.resolveObjects(_brokerEntry); + resolveObjects(_brokerEntry); Broker<?> broker = _systemContext.getBroker(); broker.open(); fail("The broker creation should fail due to unsupported model version"); @@ -323,33 +329,10 @@ public class BrokerRecovererTest extends TestCase return String.valueOf(attributeValue); } - private RecovererProvider createRecoveryProvider(final ConfiguredObjectRecord[] entries, final ConfiguredObject[] objectsToRecoverer) + private void resolveObjects(ConfiguredObjectRecord... records) { - RecovererProvider recovererProvider = new RecovererProvider() - { - @Override - public ConfiguredObjectRecoverer<? extends ConfiguredObject> getRecoverer(String type) - { - @SuppressWarnings({ "unchecked", "rawtypes" }) - final ConfiguredObjectRecoverer<? extends ConfiguredObject> recoverer = new ConfiguredObjectRecoverer() - { - public ConfiguredObject create(RecovererProvider recovererProvider, ConfiguredObjectRecord entry, ConfiguredObject... parents) - { - for (int i = 0; i < entries.length; i++) - { - ConfiguredObjectRecord e = entries[i]; - if (entry == e) - { - return objectsToRecoverer[i]; - } - } - return null; - } - }; - - return recoverer; - } - }; - return recovererProvider; + GenericRecoverer recoverer = new GenericRecoverer(_systemContext, Broker.class.getSimpleName()); + recoverer.recover(Arrays.asList(records)); } + } |
