summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-04-22 12:32:23 +0000
committerKeith Wall <kwall@apache.org>2014-04-22 12:32:23 +0000
commitecc7f3fc1c56cde10e9960afd1e4fbd4bcb07abf (patch)
tree66f2f92568ee799b9991db441bd05b4c60b5b496 /qpid/java
parenta0c53a8a0a83bf67198717844e22cd122a2ab7c9 (diff)
downloadqpid-python-ecc7f3fc1c56cde10e9960afd1e4fbd4bcb07abf.tar.gz
QPID-5715: [Java Broker] Refactor VirtualHostStoreUpgraderAndRecoverer/BrokerStoreUpgrader to avoid duplicated code
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1589112 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UpgraderPhaseFactory.java45
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java135
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java)427
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java218
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericStoreUpgrader.java170
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreUpgraderPhase.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgraderPhase.java)21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java741
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java (renamed from qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java)65
11 files changed, 793 insertions, 1043 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
index b2559f8668..3ccfc276fa 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@@ -34,7 +35,6 @@ import javax.security.auth.Subject;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
-
import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator;
import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -152,6 +152,8 @@ public class Broker
store = new ManagementModeStoreHandler(store, options);
}
+ store.openConfigurationStore(systemContext, Collections.<String, Object>emptyMap());
+
_applicationRegistry = new ApplicationRegistry(store,systemContext);
try
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UpgraderPhaseFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UpgraderPhaseFactory.java
deleted file mode 100644
index 26686c67cd..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UpgraderPhaseFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.configuration.startup;
-
-public abstract class UpgraderPhaseFactory
-{
- private final String _toVersion;
- private final String _fromVersion;
-
- protected UpgraderPhaseFactory(String fromVersion, String toVersion)
- {
- _toVersion = toVersion;
- _fromVersion = fromVersion;
- }
-
- public String getToVersion()
- {
- return _toVersion;
- }
-
- public String getFromVersion()
- {
- return _fromVersion;
- }
-
- public abstract StoreUpgraderPhase newInstance();
-} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java
index 03032ad1ed..04634ea2a6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java
@@ -23,13 +23,10 @@ package org.apache.qpid.server.model;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogRecorder;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
@ManagedObject (creatable = false)
public interface SystemContext<X extends SystemContext<X>> extends ConfiguredObject<X>
{
- void resolveObjects(ConfiguredObjectRecord... records);
-
EventLogger getEventLogger();
BrokerOptions getBrokerOptions();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java
index f0f5e96081..d9e9950972 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java
@@ -84,141 +84,6 @@ public class SystemContextImpl extends AbstractConfiguredObject<SystemContextImp
}
@Override
- public void resolveObjects(final ConfiguredObjectRecord... records)
- {
- runTask(new TaskExecutor.VoidTask()
- {
- @Override
- public void execute()
- {
-
-
- ConfiguredObjectFactory factory = getObjectFactory();
-
- Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, ConfiguredObject<?>>();
- resolvedObjects.put(getId(), SystemContextImpl.this);
-
- Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents =
- new ArrayList<ConfiguredObjectRecord>(Arrays.asList(records));
- Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> recordsWithUnresolvedDependencies =
- new ArrayList<UnresolvedConfiguredObject<? extends ConfiguredObject>>();
-
- boolean updatesMade;
-
- do
- {
- updatesMade = false;
- Iterator<ConfiguredObjectRecord> iter = recordsWithUnresolvedParents.iterator();
- while (iter.hasNext())
- {
- ConfiguredObjectRecord record = iter.next();
- Collection<ConfiguredObject<?>> parents = new ArrayList<ConfiguredObject<?>>();
- boolean foundParents = true;
- for (ConfiguredObjectRecord parent : record.getParents().values())
- {
- if (!resolvedObjects.containsKey(parent.getId()))
- {
- foundParents = false;
- break;
- }
- else
- {
- parents.add(resolvedObjects.get(parent.getId()));
- }
- }
- if (foundParents)
- {
- iter.remove();
- UnresolvedConfiguredObject<? extends ConfiguredObject> recovered =
- factory.recover(record, parents.toArray(new ConfiguredObject<?>[parents.size()]));
- Collection<ConfiguredObjectDependency<?>> dependencies =
- recovered.getUnresolvedDependencies();
- if (dependencies.isEmpty())
- {
- updatesMade = true;
- ConfiguredObject<?> resolved = recovered.resolve();
- resolvedObjects.put(resolved.getId(), resolved);
- }
- else
- {
- recordsWithUnresolvedDependencies.add(recovered);
- }
- }
-
- }
-
- Iterator<UnresolvedConfiguredObject<? extends ConfiguredObject>> unresolvedIter =
- recordsWithUnresolvedDependencies.iterator();
-
- while (unresolvedIter.hasNext())
- {
- UnresolvedConfiguredObject<? extends ConfiguredObject> unresolvedObject = unresolvedIter.next();
- Collection<ConfiguredObjectDependency<?>> dependencies =
- new ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies());
-
- for (ConfiguredObjectDependency dependency : dependencies)
- {
- if (dependency instanceof ConfiguredObjectIdDependency)
- {
- UUID id = ((ConfiguredObjectIdDependency) dependency).getId();
- if (resolvedObjects.containsKey(id))
- {
- dependency.resolve(resolvedObjects.get(id));
- }
- }
- else if (dependency instanceof ConfiguredObjectNameDependency)
- {
- ConfiguredObject<?> dependentObject = null;
- for (ConfiguredObject<?> parent : unresolvedObject.getParents())
- {
- dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(),
- ((ConfiguredObjectNameDependency) dependency)
- .getName()
- );
- if (dependentObject != null)
- {
- break;
- }
- }
- if (dependentObject != null)
- {
- dependency.resolve(dependentObject);
- }
- }
- else
- {
- throw new ServerScopedRuntimeException("Unknown dependency type "
- + dependency.getClass()
- .getSimpleName());
- }
- }
- if (unresolvedObject.getUnresolvedDependencies().isEmpty())
- {
- updatesMade = true;
- unresolvedIter.remove();
- ConfiguredObject<?> resolved = unresolvedObject.resolve();
- resolvedObjects.put(resolved.getId(), resolved);
- }
- }
-
- } while (updatesMade && !(recordsWithUnresolvedDependencies.isEmpty()
- && recordsWithUnresolvedParents.isEmpty()));
-
- if (!recordsWithUnresolvedDependencies.isEmpty())
- {
- throw new IllegalArgumentException("Cannot resolve some objects: "
- + recordsWithUnresolvedDependencies);
- }
- if (!recordsWithUnresolvedParents.isEmpty())
- {
- throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found"
- + recordsWithUnresolvedParents);
- }
- }
- });
- }
-
- @Override
protected boolean setState(final State currentState, final State desiredState)
{
throw new IllegalArgumentException("Cannot change the state of the SystemContext object");
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 6ddf5f83dd..d3e80ecee7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -21,11 +21,9 @@
package org.apache.qpid.server.registry;
import org.apache.log4j.Logger;
-
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.configuration.startup.BrokerStoreUpgrader;
import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.Log4jMessageLogger;
@@ -35,6 +33,7 @@ import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.SystemContext;
+import org.apache.qpid.server.store.BrokerStoreUpgraderAndRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.util.SystemUtils;
@@ -77,8 +76,8 @@ public class ApplicationRegistry implements IApplicationRegistry
logStartupMessages(startupLogger);
- BrokerStoreUpgrader upgrader = new BrokerStoreUpgrader(_systemContext);
- _broker = upgrader.upgrade(_store);
+ BrokerStoreUpgraderAndRecoverer upgrader = new BrokerStoreUpgraderAndRecoverer(_systemContext);
+ _broker = upgrader.perform(_store);
_broker.setEventLogger(startupLogger);
_broker.open();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
index 1a942a2e4a..7e49e0f6d3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
@@ -18,42 +18,30 @@
* under the License.
*
*/
-package org.apache.qpid.server.configuration.startup;
+package org.apache.qpid.server.store;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
-import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.SystemContext;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader;
-import org.apache.qpid.server.store.NullUpgrader;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.util.Action;
-public class BrokerStoreUpgrader
+public class BrokerStoreUpgraderAndRecoverer
{
- private final SystemContext _systemContext;
- private Map<String, UpgraderPhaseFactory> _upgraders = new HashMap<String, UpgraderPhaseFactory>();
-
+ private final SystemContext<?> _systemContext;
+ private final Map<String, StoreUpgraderPhase> _upgraders = new HashMap<String, StoreUpgraderPhase>();
// Note: don't use externally defined constants in upgraders in case they change, the values here MUST stay the same
// no matter what changes are made to the code in the future
-
- public BrokerStoreUpgrader(SystemContext systemContext)
+ public BrokerStoreUpgraderAndRecoverer(SystemContext<?> systemContext)
{
_systemContext = systemContext;
@@ -63,199 +51,171 @@ public class BrokerStoreUpgrader
register(new Upgrader_1_3_to_1_4());
}
- private void register(UpgraderPhaseFactory factory)
+ private void register(StoreUpgraderPhase upgrader)
{
- _upgraders.put(factory.getFromVersion(), factory);
+ _upgraders.put(upgrader.getFromVersion(), upgrader);
}
- private final class Upgrader_1_0_to_1_1 extends UpgraderPhaseFactory
+ private static final class Upgrader_1_0_to_1_1 extends StoreUpgraderPhase
{
private Upgrader_1_0_to_1_1()
{
- super("1.0", "1.1");
+ super("modelVersion", "1.0", "1.1");
}
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion())
+ if (record.getType().equals("Broker"))
{
- @Override
- public void configuredObject(ConfiguredObjectRecord record)
- {
- if (record.getType().equals("Broker"))
- {
- record = upgradeRootRecord(record);
- }
- else if (record.getType().equals("VirtualHost") && record.getAttributes().containsKey("storeType"))
- {
- Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
- updatedAttributes.put("type", "STANDARD");
- record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
- getUpdateMap().put(record.getId(), record);
- }
-
- getNextUpgrader().configuredObject(record);
- }
+ record = upgradeRootRecord(record);
+ }
+ else if (record.getType().equals("VirtualHost") && record.getAttributes().containsKey("storeType"))
+ {
+ Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
+ updatedAttributes.put("type", "STANDARD");
+ record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
+ }
+ getNextUpgrader().configuredObject(record);
+ }
- @Override
- public void complete()
- {
- getNextUpgrader().complete();
- }
- };
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
}
+
}
- private static final class Upgrader_1_1_to_1_2 extends UpgraderPhaseFactory
+ private static final class Upgrader_1_1_to_1_2 extends StoreUpgraderPhase
{
private Upgrader_1_1_to_1_2()
{
- super("1.1", "1.2");
+ super("modelVersion", "1.1", "1.2");
}
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion())
+ if (record.getType().equals("Broker"))
{
+ record = upgradeRootRecord(record);
+ }
- @Override
- public void configuredObject(ConfiguredObjectRecord record)
- {
- if (record.getType().equals("Broker"))
- {
- record = upgradeRootRecord(record);
- }
-
- getNextUpgrader().configuredObject(record);
+ getNextUpgrader().configuredObject(record);
- }
+ }
- @Override
- public void complete()
- {
- getNextUpgrader().complete();
- }
- };
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
}
+
}
- private static final class Upgrader_1_2_to_1_3 extends UpgraderPhaseFactory
+ private static final class Upgrader_1_2_to_1_3 extends StoreUpgraderPhase
{
private Upgrader_1_2_to_1_3()
{
- super("1.2", "1.3");
+ super("modelVersion", "1.2", "1.3");
}
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion())
+ if (record.getType().equals("TrustStore") && record.getAttributes().containsKey("type"))
{
+ Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
+ updatedAttributes.put("trustStoreType", updatedAttributes.remove("type"));
+ record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
- @Override
- public void configuredObject(ConfiguredObjectRecord record)
- {
- if (record.getType().equals("TrustStore") && record.getAttributes().containsKey("type"))
- {
- Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
- updatedAttributes.put("trustStoreType", updatedAttributes.remove("type"));
- record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
- getUpdateMap().put(record.getId(), record);
-
- }
- else if (record.getType().equals("KeyStore") && record.getAttributes().containsKey("type"))
- {
- Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
- updatedAttributes.put("keyStoreType", updatedAttributes.remove("type"));
- record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
- getUpdateMap().put(record.getId(), record);
+ }
+ else if (record.getType().equals("KeyStore") && record.getAttributes().containsKey("type"))
+ {
+ Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
+ updatedAttributes.put("keyStoreType", updatedAttributes.remove("type"));
+ record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
- }
- else if (record.getType().equals("Broker"))
- {
- record = upgradeRootRecord(record);
- }
+ }
+ else if (record.getType().equals("Broker"))
+ {
+ record = upgradeRootRecord(record);
+ }
- getNextUpgrader().configuredObject(record);
+ getNextUpgrader().configuredObject(record);
- }
+ }
- @Override
- public void complete()
- {
- getNextUpgrader().complete();
- }
- };
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
}
+
}
- private static final class Upgrader_1_3_to_1_4 extends UpgraderPhaseFactory
+ private static final class Upgrader_1_3_to_1_4 extends StoreUpgraderPhase
{
private Upgrader_1_3_to_1_4()
{
- super("1.3", "1.4");
+ super("modelVersion", "1.3", "1.4");
}
+ @SuppressWarnings("serial")
+ private Map<String, VirtualHostEntryUpgrader> _vhostUpgraderMap = new HashMap<String, VirtualHostEntryUpgrader>()
+ {{
+ put("BDB_HA", new BdbHaVirtualHostUpgrader());
+ put("STANDARD", new StandardVirtualHostUpgrader());
+ }};
+
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase(Broker.MODEL_VERSION, getToVersion())
+ if (record.getType().equals("VirtualHost"))
{
-
- @SuppressWarnings("serial")
- private Map<String, VirtualHostEntryUpgrader> _vhostUpgraderMap = new HashMap<String, VirtualHostEntryUpgrader>()
- {{
- put("BDB_HA", new BdbHaVirtualHostUpgrader());
- put("STANDARD", new StandardVirtualHostUpgrader());
- }};
-
- @Override
- public void configuredObject(ConfiguredObjectRecord record)
+ Map<String, Object> attributes = record.getAttributes();
+ if (attributes.containsKey("configPath"))
{
- if (record.getType().equals("VirtualHost"))
- {
- Map<String, Object> attributes = record.getAttributes();
- if (attributes.containsKey("configPath"))
- {
- throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name") + " having XML configuration is not supported. Virtual host configuration file is " + attributes.get("configPath"));
- }
+ throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name") + " having XML configuration is not supported. Virtual host configuration file is " + attributes.get("configPath"));
+ }
- String type = (String) attributes.get("type");
- VirtualHostEntryUpgrader vhostUpgrader = _vhostUpgraderMap.get(type);
- if (vhostUpgrader == null)
- {
- throw new IllegalConfigurationException("Don't know how to perform an upgrade from version for virtualhost type " + type);
- }
- record = vhostUpgrader.upgrade(record);
- getUpdateMap().put(record.getId(), record);
- }
- else if (record.getType().equals("Plugin") && record.getAttributes().containsKey("pluginType"))
- {
- Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
- updatedAttributes.put("type", updatedAttributes.remove("pluginType"));
- record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
- getUpdateMap().put(record.getId(), record);
+ String type = (String) attributes.get("type");
+ VirtualHostEntryUpgrader vhostUpgrader = _vhostUpgraderMap.get(type);
+ if (vhostUpgrader == null)
+ {
+ throw new IllegalConfigurationException("Don't know how to perform an upgrade from version for virtualhost type " + type);
+ }
+ record = vhostUpgrader.upgrade(record);
+ getUpdateMap().put(record.getId(), record);
+ }
+ else if (record.getType().equals("Plugin") && record.getAttributes().containsKey("pluginType"))
+ {
+ Map<String, Object> updatedAttributes = new HashMap<String, Object>(record.getAttributes());
+ updatedAttributes.put("type", updatedAttributes.remove("pluginType"));
+ record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
- }
- else if (record.getType().equals("Broker"))
- {
- record = upgradeRootRecord(record);
- }
+ }
+ else if (record.getType().equals("Broker"))
+ {
+ record = upgradeRootRecord(record);
+ }
- getNextUpgrader().configuredObject(record);
+ getNextUpgrader().configuredObject(record);
- }
+ }
- @Override
- public void complete()
- {
- getNextUpgrader().complete();
- }
- };
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
}
+
}
private static interface VirtualHostEntryUpgrader
@@ -560,163 +520,52 @@ public class BrokerStoreUpgrader
}
}
- public Broker<?> upgrade(DurableConfigurationStore store)
- {
- final BrokerStoreRecoveryHandler recoveryHandler = new BrokerStoreRecoveryHandler(_systemContext, store, _upgraders);
- store.openConfigurationStore(_systemContext, Collections.<String,Object>emptyMap());
- store.visitConfiguredObjectRecords(recoveryHandler);
-
- return recoveryHandler.getBroker();
- }
-
-
- private static class BrokerStoreRecoveryHandler implements ConfiguredObjectRecordHandler
+ public Broker<?> perform(final DurableConfigurationStore store)
{
- private static Logger LOGGER = Logger.getLogger(BrokerStoreRecoveryHandler.class);
-
- private DurableConfigurationStoreUpgrader _upgrader;
- private DurableConfigurationStore _store;
- private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>();
- private final SystemContext _systemContext;
- private Map<String, UpgraderPhaseFactory> _upgraders;
-
- private BrokerStoreRecoveryHandler(final SystemContext systemContext, DurableConfigurationStore store, Map<String, UpgraderPhaseFactory> upgraders)
- {
- _systemContext = systemContext;
- _store = store;
- _upgraders = upgraders;
- }
-
+ final String brokerCategory = Broker.class.getSimpleName();
+ final GenericStoreUpgrader upgrader = new GenericStoreUpgrader(brokerCategory, Broker.MODEL_VERSION, store, _upgraders);
+ upgrader.upgrade();
- @Override
- public void begin()
- {
- }
+ new GenericRecoverer(_systemContext, brokerCategory).recover(upgrader.getRecords());
- @Override
- public boolean handle(final ConfiguredObjectRecord object)
+ final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store);
+ applyRecursively(_systemContext.getBroker(), new Action<ConfiguredObject<?>>()
{
- _records.put(object.getId(), object);
- return true;
- }
-
- @Override
- public void end()
- {
- String version = getCurrentVersion();
-
- while(!BrokerModel.MODEL_VERSION.equals(version))
- {
- LOGGER.debug("Adding broker store upgrader from model version: " + version);
- final UpgraderPhaseFactory upgraderPhaseFactory = _upgraders.get(version);
- StoreUpgraderPhase upgrader = upgraderPhaseFactory.newInstance();
- if(_upgrader == null)
- {
- _upgrader = upgrader;
- }
- else
- {
- _upgrader.setNextUpgrader(upgrader);
- }
- version = upgraderPhaseFactory.getToVersion();
- }
-
- if(_upgrader == null)
- {
- _upgrader = new NullUpgrader();
- }
- else
+ @Override
+ public void performAction(final ConfiguredObject<?> object)
{
- _upgrader.setNextUpgrader(new NullUpgrader());
+ object.addChangeListener(configChangeListener);
}
+ });
- for(ConfiguredObjectRecord record : _records.values())
- {
- _upgrader.configuredObject(record);
- }
-
- Map<UUID, ConfiguredObjectRecord> deletedRecords = _upgrader.getDeletedRecords();
- Map<UUID, ConfiguredObjectRecord> updatedRecords = _upgrader.getUpdatedRecords();
-
- LOGGER.debug("Broker store upgrade: " + deletedRecords.size() + " records deleted");
- LOGGER.debug("Broker store upgrade: " + updatedRecords.size() + " records updated");
- LOGGER.debug("Broker store upgrade: " + _records.size() + " total records");
-
- _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()]));
- _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()]));
-
-
-
-
- _records.keySet().removeAll(deletedRecords.keySet());
- _records.putAll(updatedRecords);
-
- _systemContext.resolveObjects(_records.values().toArray(new ConfiguredObjectRecord[_records.size()]));
-
- final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(_store);
- applyRecursively(_systemContext.getBroker(),
- new Action<ConfiguredObject<?>>()
- {
- @Override
- public void performAction(final ConfiguredObject<?> object)
- {
- object.addChangeListener(configChangeListener);
- }
-
-
- });
-
- }
-
- private void applyRecursively(final ConfiguredObject<?> object, final Action<ConfiguredObject<?>> action)
- {
- applyRecursively(object, action, new HashSet<ConfiguredObject<?>>());
- }
+ return _systemContext.getBroker();
+ }
- private void applyRecursively(final ConfiguredObject<?> object,
- final Action<ConfiguredObject<?>> action,
- final HashSet<ConfiguredObject<?>> visited)
- {
- if(!visited.contains(object))
- {
- visited.add(object);
- action.performAction(object);
- for(Class<? extends ConfiguredObject> childClass : object.getModel().getChildTypes(object.getCategoryClass()))
- {
- Collection<? extends ConfiguredObject> children = object.getChildren(childClass);
- if(children != null)
- {
- for(ConfiguredObject<?> child : children)
- {
- applyRecursively(child, action, visited);
- }
- }
- }
- }
- }
+ private void applyRecursively(final ConfiguredObject<?> object, final Action<ConfiguredObject<?>> action)
+ {
+ applyRecursively(object, action, new HashSet<ConfiguredObject<?>>());
+ }
- private String getCurrentVersion()
+ private void applyRecursively(final ConfiguredObject<?> object,
+ final Action<ConfiguredObject<?>> action,
+ final HashSet<ConfiguredObject<?>> visited)
+ {
+ if(!visited.contains(object))
{
- for(ConfiguredObjectRecord record : _records.values())
+ visited.add(object);
+ action.performAction(object);
+ for(Class<? extends ConfiguredObject> childClass : object.getModel().getChildTypes(object.getCategoryClass()))
{
- if(record.getType().equals("Broker"))
+ Collection<? extends ConfiguredObject> children = object.getChildren(childClass);
+ if(children != null)
{
- String version = (String) record.getAttributes().get(Broker.MODEL_VERSION);
- if(version == null)
+ for(ConfiguredObject<?> child : children)
{
- version = "1.0";
+ applyRecursively(child, action, visited);
}
- return version;
}
}
- return BrokerModel.MODEL_VERSION;
- }
-
- public Broker getBroker()
- {
- return _systemContext.getBroker();
}
}
-
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java
new file mode 100644
index 0000000000..01be6cf556
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericRecoverer.java
@@ -0,0 +1,218 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+
+public class GenericRecoverer
+{
+ private static final Logger LOGGER = Logger.getLogger(GenericRecoverer.class);
+
+ private final ConfiguredObject<?> _parentOfRoot;
+ private final String _rootCategory;
+
+ public GenericRecoverer(ConfiguredObject<?> parentOfRoot, String rootCategory)
+ {
+ _parentOfRoot = parentOfRoot;
+ _rootCategory = rootCategory;
+ }
+
+ public void recover(final List<ConfiguredObjectRecord> records)
+ {
+ _parentOfRoot.getTaskExecutor().run(new TaskExecutor.VoidTask()
+ {
+ @Override
+ public void execute()
+ {
+ performRecover(records);
+ }
+
+ @Override
+ public String toString()
+ {
+ return _rootCategory + " recovery";
+ }
+ });
+
+ }
+
+ private void performRecover(List<ConfiguredObjectRecord> records)
+ {
+ ConfiguredObjectRecord rootRecord = null;
+ for (ConfiguredObjectRecord record : records)
+ {
+ if (_rootCategory.equals(record.getType()))
+ {
+ rootRecord = record;
+ break;
+ }
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Root record " + rootRecord);
+ }
+
+ if (rootRecord != null)
+ {
+
+ if (rootRecord.getParents() == null || rootRecord.getParents().isEmpty())
+ {
+ records = new ArrayList<ConfiguredObjectRecord>(records);
+
+ String parentOfRootCategory = _parentOfRoot.getCategoryClass().getSimpleName();
+ ConfiguredObjectRecord parentRecord = new ConfiguredObjectRecordImpl(_parentOfRoot.getId(), parentOfRootCategory, Collections.<String, Object>emptyMap());
+ Map<String, ConfiguredObjectRecord> rootParents = Collections.<String, ConfiguredObjectRecord>singletonMap(parentOfRootCategory, parentRecord);
+ records.remove(rootRecord);
+ records.add(new ConfiguredObjectRecordImpl(rootRecord.getId(), _rootCategory, rootRecord.getAttributes(), rootParents));
+ }
+
+ resolveObjects(_parentOfRoot, records);
+ }
+ }
+
+ private void resolveObjects(ConfiguredObject<?> parentObject, List<ConfiguredObjectRecord> records)
+ {
+ ConfiguredObjectFactory factory = parentObject.getObjectFactory();
+ Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, ConfiguredObject<?>>();
+ resolvedObjects.put(parentObject.getId(), parentObject);
+
+ Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents = new ArrayList<ConfiguredObjectRecord>(records);
+ Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> recordsWithUnresolvedDependencies =
+ new ArrayList<UnresolvedConfiguredObject<? extends ConfiguredObject>>();
+
+ boolean updatesMade;
+
+ do
+ {
+ updatesMade = false;
+ Iterator<ConfiguredObjectRecord> iter = recordsWithUnresolvedParents.iterator();
+ while (iter.hasNext())
+ {
+ ConfiguredObjectRecord record = iter.next();
+ Collection<ConfiguredObject<?>> parents = new ArrayList<ConfiguredObject<?>>();
+ boolean foundParents = true;
+ for (ConfiguredObjectRecord parent : record.getParents().values())
+ {
+ if (!resolvedObjects.containsKey(parent.getId()))
+ {
+ foundParents = false;
+ break;
+ }
+ else
+ {
+ parents.add(resolvedObjects.get(parent.getId()));
+ }
+ }
+ if (foundParents)
+ {
+ iter.remove();
+ ConfiguredObject<?>[] parentArray = parents.toArray(new ConfiguredObject<?>[parents.size()]);
+ UnresolvedConfiguredObject<? extends ConfiguredObject> recovered = factory.recover(record, parentArray);
+ Collection<ConfiguredObjectDependency<?>> dependencies = recovered.getUnresolvedDependencies();
+ if (dependencies.isEmpty())
+ {
+ updatesMade = true;
+ ConfiguredObject<?> resolved = recovered.resolve();
+ resolvedObjects.put(resolved.getId(), resolved);
+ }
+ else
+ {
+ recordsWithUnresolvedDependencies.add(recovered);
+ }
+ }
+
+ }
+
+ Iterator<UnresolvedConfiguredObject<? extends ConfiguredObject>> unresolvedIter = recordsWithUnresolvedDependencies.iterator();
+
+ while(unresolvedIter.hasNext())
+ {
+ UnresolvedConfiguredObject<? extends ConfiguredObject> unresolvedObject = unresolvedIter.next();
+ Collection<ConfiguredObjectDependency<?>> dependencies =
+ new ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies());
+
+ for(ConfiguredObjectDependency dependency : dependencies)
+ {
+ if(dependency instanceof ConfiguredObjectIdDependency)
+ {
+ UUID id = ((ConfiguredObjectIdDependency)dependency).getId();
+ if(resolvedObjects.containsKey(id))
+ {
+ dependency.resolve(resolvedObjects.get(id));
+ }
+ }
+ else if(dependency instanceof ConfiguredObjectNameDependency)
+ {
+ ConfiguredObject<?> dependentObject = null;
+ for(ConfiguredObject<?> parent : unresolvedObject.getParents())
+ {
+ dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), ((ConfiguredObjectNameDependency)dependency).getName());
+ if(dependentObject != null)
+ {
+ break;
+ }
+ }
+ if(dependentObject != null)
+ {
+ dependency.resolve(dependentObject);
+ }
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException("Unknown dependency type " + dependency.getClass().getSimpleName());
+ }
+ }
+ if(unresolvedObject.getUnresolvedDependencies().isEmpty())
+ {
+ updatesMade = true;
+ unresolvedIter.remove();
+ ConfiguredObject<?> resolved = unresolvedObject.resolve();
+ resolvedObjects.put(resolved.getId(), resolved);
+ }
+ }
+
+ } while(updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() && recordsWithUnresolvedParents.isEmpty()));
+
+ if(!recordsWithUnresolvedDependencies.isEmpty())
+ {
+ throw new IllegalArgumentException("Cannot resolve some objects: " + recordsWithUnresolvedDependencies);
+ }
+ if(!recordsWithUnresolvedParents.isEmpty())
+ {
+ throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + recordsWithUnresolvedParents);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericStoreUpgrader.java
new file mode 100644
index 0000000000..26b60f765f
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/GenericStoreUpgrader.java
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
+public class GenericStoreUpgrader
+{
+ private static final Logger LOGGER = Logger.getLogger(GenericStoreUpgrader.class);
+
+ private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>();
+ private final Map<String, StoreUpgraderPhase> _upgraders;
+ private final DurableConfigurationStore _store;
+ private final String _rootCategory;
+ private final String _modelVersionAttributeName;
+
+ public GenericStoreUpgrader(String rootCategory, String rootModelVersionAttributeName, DurableConfigurationStore configurationStore, Map<String, StoreUpgraderPhase> upgraders)
+ {
+ super();
+ _upgraders = upgraders;
+ _store = configurationStore;
+ _rootCategory = rootCategory;
+ _modelVersionAttributeName = rootModelVersionAttributeName;
+ }
+
+
+ public List<ConfiguredObjectRecord> getRecords()
+ {
+ return new ArrayList<ConfiguredObjectRecord>(_records.values());
+ }
+
+ public void upgrade()
+ {
+ ConfiguredObjectRecordHandler handler = new ConfiguredObjectRecordHandler()
+ {
+ @Override
+ public void begin()
+ {
+ }
+
+ @Override
+ public boolean handle(final ConfiguredObjectRecord record)
+ {
+ _records.put(record.getId(), record);
+ return true;
+ }
+
+ @Override
+ public void end()
+ {
+ performUpgrade();
+ }
+ };
+
+ _store.visitConfiguredObjectRecords(handler);
+ }
+
+ private void performUpgrade()
+ {
+ String version = getCurrentVersion();
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info(_rootCategory + " store has model version " + version + ". Number of record(s) " + _records.size());
+ }
+
+ DurableConfigurationStoreUpgrader upgrader = buildUpgraderChain(version);
+
+ for(ConfiguredObjectRecord record : _records.values())
+ {
+ upgrader.configuredObject(record);
+ }
+
+ upgrader.complete();
+
+ Map<UUID, ConfiguredObjectRecord> deletedRecords = upgrader.getDeletedRecords();
+ Map<UUID, ConfiguredObjectRecord> updatedRecords = upgrader.getUpdatedRecords();
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug(_rootCategory + " store upgrade is about to complete. " + _records.size() + " total record(s)."
+ + " Records to update " + updatedRecords.size()
+ + " Records to delete " + deletedRecords.size());
+ }
+
+ _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()]));
+ _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()]));
+
+ _records.keySet().removeAll(deletedRecords.keySet());
+ _records.putAll(updatedRecords);
+ }
+
+ private DurableConfigurationStoreUpgrader buildUpgraderChain(String version)
+ {
+ DurableConfigurationStoreUpgrader head = null;
+ while(!BrokerModel.MODEL_VERSION.equals(version))
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Adding " + _rootCategory + " store upgrader from model version: " + version);
+ }
+
+ StoreUpgraderPhase upgrader = _upgraders.get(version);
+ if (upgrader == null)
+ {
+ throw new IllegalConfigurationException("No phase upgrader for version " + version);
+ }
+
+ if(head == null)
+ {
+ head = upgrader;
+ }
+ else
+ {
+ head.setNextUpgrader(upgrader);
+ }
+ version = upgrader.getToVersion();
+ }
+
+ if(head == null)
+ {
+ head = new NullUpgrader();
+ }
+ else
+ {
+ head.setNextUpgrader(new NullUpgrader());
+ }
+
+ return head;
+ }
+
+ private String getCurrentVersion()
+ {
+ for(ConfiguredObjectRecord record : _records.values())
+ {
+ if(_rootCategory.equals(record.getType()))
+ {
+ return (String) record.getAttributes().get(_modelVersionAttributeName);
+ }
+ }
+ return BrokerModel.MODEL_VERSION;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgraderPhase.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreUpgraderPhase.java
index 5762a8f8e6..b40dc73186 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgraderPhase.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreUpgraderPhase.java
@@ -18,23 +18,21 @@
* under the License.
*
*/
-package org.apache.qpid.server.configuration.startup;
+package org.apache.qpid.server.store;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
-import org.apache.qpid.server.store.NonNullUpgrader;
-
public abstract class StoreUpgraderPhase extends NonNullUpgrader
{
+ private final String _fromVersion;
private final String _toVersion;
private final String _versionAttributeName;
- protected StoreUpgraderPhase(String versionAttributeName, String toVersion)
+ public StoreUpgraderPhase(String versionAttributeName, String fromVersion, String toVersion)
{
_toVersion = toVersion;
+ _fromVersion = fromVersion;
_versionAttributeName = versionAttributeName;
}
@@ -46,4 +44,15 @@ public abstract class StoreUpgraderPhase extends NonNullUpgrader
getUpdateMap().put(record.getId(), record);
return record;
}
+
+ public String getFromVersion()
+ {
+ return _fromVersion;
+ }
+
+ public String getToVersion()
+ {
+ return _toVersion;
+ }
+
} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index 226371a065..e1cffeb942 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.store;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -31,13 +28,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.startup.StoreUpgraderPhase;
-import org.apache.qpid.server.configuration.startup.UpgraderPhaseFactory;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.BrokerModel;
-import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
@@ -45,14 +37,11 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class VirtualHostStoreUpgraderAndRecoverer
{
- private final ConfiguredObjectFactory _objectFactory;
private final VirtualHostNode<?> _virtualHostNode;
- private Map<String, UpgraderPhaseFactory> _upgraders = new HashMap<String, UpgraderPhaseFactory>();
+ private Map<String, StoreUpgraderPhase> _upgraders = new HashMap<String, StoreUpgraderPhase>();
@SuppressWarnings("serial")
private static final Map<String, String> DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap<String, String>()
@@ -68,12 +57,11 @@ public class VirtualHostStoreUpgraderAndRecoverer
public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode<?> virtualHostNode, ConfiguredObjectFactory objectFactory)
{
_virtualHostNode = virtualHostNode;
- _objectFactory = objectFactory;
- register(new UpgraderFactory_0_0());
- register(new UpgraderFactory_0_1());
- register(new UpgraderFactory_0_2());
- register(new UpgraderFactory_0_3());
- register(new UpgraderFactory_0_4());
+ register(new Upgrader_0_0_to_0_1());
+ register(new Upgrader_0_1_to_0_2());
+ register(new Upgrader_0_2_to_0_3());
+ register(new Upgrader_0_3_to_0_4());
+ register(new Upgrader_0_4_to_0_5());
Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
for (String exchangeName : DEFAULT_EXCHANGES.keySet())
@@ -84,9 +72,9 @@ public class VirtualHostStoreUpgraderAndRecoverer
_defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds);
}
- private void register(UpgraderPhaseFactory factory)
+ private void register(StoreUpgraderPhase upgrader)
{
- _upgraders.put(factory.getFromVersion(), factory);
+ _upgraders.put(upgrader.getFromVersion(), upgrader);
}
/*
@@ -94,100 +82,91 @@ public class VirtualHostStoreUpgraderAndRecoverer
* such bindings would have been ignored, starting from the point at which the config version changed, these
* arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue.
*/
- private class UpgraderFactory_0_0 extends UpgraderPhaseFactory
+ private class Upgrader_0_0_to_0_1 extends StoreUpgraderPhase
{
private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>();
- public UpgraderFactory_0_0()
+ public Upgrader_0_0_to_0_1()
{
- super("0.0", "0.1");
+ super("modelVersion", "0.0", "0.1");
}
-
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(final ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase("modelVersion", getToVersion())
- {
+ _records.put(record.getId(), record);
+ }
- @Override
- public void configuredObject(final ConfiguredObjectRecord record)
- {
- _records.put(record.getId(), record);
- }
+ private void removeSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
- private void removeSelectorArguments(Map<String, Object> binding)
- {
- @SuppressWarnings("unchecked")
- Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
+ FilterSupport.removeFilters(arguments);
+ binding.put(Binding.ARGUMENTS, arguments);
+ }
- FilterSupport.removeFilters(arguments);
- binding.put(Binding.ARGUMENTS, arguments);
- }
+ private boolean isTopicExchange(ConfiguredObjectRecord entry)
+ {
+ ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange");
+ if (exchangeRecord == null)
+ {
+ return false;
+ }
+ UUID exchangeId = exchangeRecord.getId();
- private boolean isTopicExchange(ConfiguredObjectRecord entry)
+ if(_records.containsKey(exchangeId))
+ {
+ return "topic".equals(_records.get(exchangeId)
+ .getAttributes()
+ .get(org.apache.qpid.server.model.Exchange.TYPE));
+ }
+ else
+ {
+ if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
{
- ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange");
- if (exchangeRecord == null)
- {
- return false;
- }
- UUID exchangeId = exchangeRecord.getId();
+ return true;
+ }
- if(_records.containsKey(exchangeId))
- {
- return "topic".equals(_records.get(exchangeId)
- .getAttributes()
- .get(org.apache.qpid.server.model.Exchange.TYPE));
- }
- else
- {
- if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
- {
- return true;
- }
+ return false;
+ }
- return false;
- }
+ }
- }
+ private boolean hasSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
+ return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
+ }
- private boolean hasSelectorArguments(Map<String, Object> binding)
+ @Override
+ public void complete()
+ {
+ for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet())
+ {
+ ConfiguredObjectRecord record = entry.getValue();
+ String type = record.getType();
+ Map<String, Object> attributes = record.getAttributes();
+ UUID id = record.getId();
+ if ("org.apache.qpid.server.model.VirtualHost".equals(type))
{
- @SuppressWarnings("unchecked")
- Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
- return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
+ record = upgradeRootRecord(record);
}
-
- @Override
- public void complete()
+ else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record))
{
- for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet())
- {
- ConfiguredObjectRecord record = entry.getValue();
- String type = record.getType();
- Map<String, Object> attributes = record.getAttributes();
- UUID id = record.getId();
- if ("org.apache.qpid.server.model.VirtualHost".equals(type))
- {
- record = upgradeRootRecord(record);
- }
- else if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record))
- {
- attributes = new LinkedHashMap<String, Object>(attributes);
- removeSelectorArguments(attributes);
-
- record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents());
- getUpdateMap().put(id, record);
- entry.setValue(record);
-
- }
- getNextUpgrader().configuredObject(record);
- }
+ attributes = new LinkedHashMap<String, Object>(attributes);
+ removeSelectorArguments(attributes);
+
+ record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents());
+ getUpdateMap().put(id, record);
+ entry.setValue(record);
- getNextUpgrader().complete();
}
- };
+ getNextUpgrader().configuredObject(record);
+ }
+
+ getNextUpgrader().complete();
}
}
@@ -196,76 +175,68 @@ public class VirtualHostStoreUpgraderAndRecoverer
* Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker
* configuration store). Also remove bindings which reference nonexistent queues or exchanges.
*/
- private class UpgraderFactory_0_1 extends UpgraderPhaseFactory
+ private class Upgrader_0_1_to_0_2 extends StoreUpgraderPhase
{
- protected UpgraderFactory_0_1()
+ public Upgrader_0_1_to_0_2()
{
- super("0.1", "0.2");
+ super("modelVersion", "0.1", "0.2");
}
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(final ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase("modelVersion", getToVersion())
+ String type = record.getType().substring(1 + record.getType().lastIndexOf('.'));
+ ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents());
+ getUpdateMap().put(record.getId(), newRecord);
+
+ if ("VirtualHost".equals(type))
{
+ newRecord = upgradeRootRecord(newRecord);
+ }
+ }
- @Override
- public void configuredObject(final ConfiguredObjectRecord record)
+ @Override
+ public void complete()
+ {
+ for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
+ {
+ Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
+ final ConfiguredObjectRecord record = entry.getValue();
+ final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName());
+ final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName());
+ if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId())
+ || queueParent == null || unknownQueue(queueParent.getId())))
{
- String type = record.getType().substring(1 + record.getType().lastIndexOf('.'));
- ConfiguredObjectRecord newRecord = new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents());
- getUpdateMap().put(record.getId(), newRecord);
-
- if ("VirtualHost".equals(type))
- {
- newRecord = upgradeRootRecord(newRecord);
- }
+ getDeleteMap().put(entry.getKey(), entry.getValue());
+ iterator.remove();
}
-
- @Override
- public void complete()
+ else
{
- for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator = getUpdateMap().entrySet().iterator(); iterator.hasNext();)
- {
- Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
- final ConfiguredObjectRecord record = entry.getValue();
- final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName());
- final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName());
- if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId())
- || queueParent == null || unknownQueue(queueParent.getId())))
- {
- getDeleteMap().put(entry.getKey(), entry.getValue());
- iterator.remove();
- }
- else
- {
- getNextUpgrader().configuredObject(record);
- }
- }
- getNextUpgrader().complete();
+ getNextUpgrader().configuredObject(record);
}
+ }
+ getNextUpgrader().complete();
+ }
- private boolean unknownExchange(final UUID exchangeId)
- {
- if (_defaultExchangeIds.containsValue(exchangeId))
- {
- return false;
- }
- ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
- return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()));
- }
+ private boolean unknownExchange(final UUID exchangeId)
+ {
+ if (_defaultExchangeIds.containsValue(exchangeId))
+ {
+ return false;
+ }
+ ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
+ return !(localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()));
+ }
- private boolean unknownQueue(final UUID queueId)
- {
- ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
- return !(localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName()));
- }
+ private boolean unknownQueue(final UUID queueId)
+ {
+ ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
+ return !(localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName()));
+ }
- private boolean isBinding(final String type)
- {
- return Binding.class.getSimpleName().equals(type);
- }
- };
+ private boolean isBinding(final String type)
+ {
+ return Binding.class.getSimpleName().equals(type);
}
}
@@ -274,52 +245,46 @@ public class VirtualHostStoreUpgraderAndRecoverer
* Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the
* attributes into the map using the model attribute names rather than the wire attribute names
*/
- private class UpgraderFactory_0_2 extends UpgraderPhaseFactory
+ private class Upgrader_0_2_to_0_3 extends StoreUpgraderPhase
{
- protected UpgraderFactory_0_2()
+ private static final String ARGUMENTS = "arguments";
+
+ public Upgrader_0_2_to_0_3()
{
- super("0.2", "0.3");
+ super("modelVersion", "0.2", "0.3");
}
+ @SuppressWarnings("unchecked")
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase("modelVersion", getToVersion())
+ if("VirtualHost".equals(record.getType()))
{
- private static final String ARGUMENTS = "arguments";
-
- @SuppressWarnings("unchecked")
- @Override
- public void configuredObject(ConfiguredObjectRecord record)
+ record = upgradeRootRecord(record);
+ }
+ else if("Queue".equals(record.getType()))
+ {
+ Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
+ if(record.getAttributes().get(ARGUMENTS) instanceof Map)
{
- if("VirtualHost".equals(record.getType()))
- {
- record = upgradeRootRecord(record);
- }
- else if("Queue".equals(record.getType()))
- {
- Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
- if(record.getAttributes().get(ARGUMENTS) instanceof Map)
- {
- newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
- .get(ARGUMENTS)));
- }
- newAttributes.putAll(record.getAttributes());
-
- record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
- getUpdateMap().put(record.getId(), record);
- }
-
- getNextUpgrader().configuredObject(record);
+ newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
+ .get(ARGUMENTS)));
}
+ newAttributes.putAll(record.getAttributes());
- @Override
- public void complete()
- {
- getNextUpgrader().complete();
- }
- };
+ record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
+ }
+
+ getNextUpgrader().configuredObject(record);
}
+
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
+ }
+
}
/*
@@ -327,387 +292,125 @@ public class VirtualHostStoreUpgraderAndRecoverer
* where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER"
* ensure OWNER is null unless the exclusivity policy is CONTAINER
*/
- private class UpgraderFactory_0_3 extends UpgraderPhaseFactory
+ private class Upgrader_0_3_to_0_4 extends StoreUpgraderPhase
{
- protected UpgraderFactory_0_3()
+ private static final String EXCLUSIVE = "exclusive";
+
+ public Upgrader_0_3_to_0_4()
{
- super("0.3", "0.4");
+ super("modelVersion", "0.3", "0.4");
}
+
@Override
- public StoreUpgraderPhase newInstance()
+ public void configuredObject(ConfiguredObjectRecord record)
{
- return new StoreUpgraderPhase("modelVersion", getToVersion())
+ if("VirtualHost".equals(record.getType()))
{
- private static final String EXCLUSIVE = "exclusive";
-
- @Override
- public void configuredObject(ConfiguredObjectRecord record)
+ record = upgradeRootRecord(record);
+ }
+ else if(Queue.class.getSimpleName().equals(record.getType()))
+ {
+ Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes());
+ if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
{
- if("VirtualHost".equals(record.getType()))
- {
- record = upgradeRootRecord(record);
- }
- else if(Queue.class.getSimpleName().equals(record.getType()))
+ boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE);
+ newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
+ if(!isExclusive && record.getAttributes().containsKey("owner"))
{
- Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(record.getAttributes());
- if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean)
- {
- boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE);
- newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
- if(!isExclusive && record.getAttributes().containsKey("owner"))
- {
- newAttributes.remove("owner");
- }
- }
- else
- {
- newAttributes.remove("owner");
- }
- if(!record.getAttributes().containsKey("durable"))
- {
- newAttributes.put("durable","true");
- }
-
- record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents());
- getUpdateMap().put(record.getId(), record);
+ newAttributes.remove("owner");
}
-
- getNextUpgrader().configuredObject(record);
}
-
- @Override
- public void complete()
+ else
{
- getNextUpgrader().complete();
+ newAttributes.remove("owner");
}
- };
- }
- }
-
- private class UpgraderFactory_0_4 extends UpgraderPhaseFactory
- {
- protected UpgraderFactory_0_4()
- {
- super("0.4", "2.0");
- }
-
- @Override
- public StoreUpgraderPhase newInstance()
- {
- return new StoreUpgraderPhase("modelVersion", getToVersion())
- {
- private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
- private static final String EXCHANGE_NAME = "name";
- private static final String EXCHANGE_TYPE = "type";
- private static final String EXCHANGE_DURABLE = "durable";
- private ConfiguredObjectRecord _virtualHostRecord;
-
- @Override
- public void configuredObject(ConfiguredObjectRecord record)
+ if(!record.getAttributes().containsKey("durable"))
{
- if("VirtualHost".equals(record.getType()))
- {
- record = upgradeRootRecord(record);
- Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes());
- virtualHostAttributes.put("name", _virtualHostNode.getName());
- virtualHostAttributes.put("modelVersion", getToVersion());
- record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, ConfiguredObjectRecord>emptyMap());
- _virtualHostRecord = record;
- }
- else if("Exchange".equals(record.getType()))
- {
- Map<String, Object> attributes = record.getAttributes();
- String name = (String)attributes.get(EXCHANGE_NAME);
- _missingAmqpExchanges.remove(name);
- }
- getNextUpgrader().configuredObject(record);
+ newAttributes.put("durable","true");
}
- @Override
- public void complete()
- {
- for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
- {
- String name = entry.getKey();
- String type = entry.getValue();
- UUID id = _defaultExchangeIds.get(name);
-
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(EXCHANGE_NAME, name);
- attributes.put(EXCHANGE_TYPE, type);
- attributes.put(EXCHANGE_DURABLE, true);
-
- ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord));
- getUpdateMap().put(id, record);
-
- getNextUpgrader().configuredObject(record);
-
- }
+ record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
+ }
- getNextUpgrader().complete();
- }
- };
+ getNextUpgrader().configuredObject(record);
}
- }
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
+ }
- public void perform(DurableConfigurationStore durableConfigurationStore)
- {
- UpgradeAndRecoveryHandler vhrh = new UpgradeAndRecoveryHandler(_virtualHostNode, _objectFactory, durableConfigurationStore, _upgraders);
- durableConfigurationStore.visitConfiguredObjectRecords(vhrh);
}
- //TODO: generalize this class
- private static class UpgradeAndRecoveryHandler implements ConfiguredObjectRecordHandler
+ private class Upgrader_0_4_to_0_5 extends StoreUpgraderPhase
{
- private static Logger LOGGER = Logger.getLogger(UpgradeAndRecoveryHandler.class);
+ private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
+ private static final String EXCHANGE_NAME = "name";
+ private static final String EXCHANGE_TYPE = "type";
+ private static final String EXCHANGE_DURABLE = "durable";
+ private ConfiguredObjectRecord _virtualHostRecord;
- private final Map<UUID, ConfiguredObjectRecord> _records = new LinkedHashMap<UUID, ConfiguredObjectRecord>();
- private Map<String, UpgraderPhaseFactory> _upgraders;
-
- private final VirtualHostNode<?> _parent;
- private final ConfiguredObjectFactory _configuredObjectFactory;
- private final DurableConfigurationStore _store;
-
- public UpgradeAndRecoveryHandler(VirtualHostNode<?> parent, ConfiguredObjectFactory configuredObjectFactory, DurableConfigurationStore durableConfigurationStore, Map<String, UpgraderPhaseFactory> upgraders)
+ public Upgrader_0_4_to_0_5()
{
- super();
- _parent = parent;
- _configuredObjectFactory = configuredObjectFactory;
- _upgraders = upgraders;
- _store = durableConfigurationStore;
+ super("modelVersion", "0.4", "2.0");
}
@Override
- public void begin()
+ public void configuredObject(ConfiguredObjectRecord record)
{
- }
-
- @Override
- public boolean handle(final ConfiguredObjectRecord record)
- {
- _records.put(record.getId(), record);
- return true;
- }
-
- @Override
- public void end()
- {
- String version = getCurrentVersion();
-
- if (LOGGER.isInfoEnabled())
- {
- LOGGER.info("Store has model version " + version + ". Number of record(s) " + _records.size());
- }
-
- DurableConfigurationStoreUpgrader upgrader = buildUpgraderChain(version);
-
- for(ConfiguredObjectRecord record : _records.values())
+ if("VirtualHost".equals(record.getType()))
{
- upgrader.configuredObject(record);
+ record = upgradeRootRecord(record);
+ Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(record.getAttributes());
+ virtualHostAttributes.put("name", _virtualHostNode.getName());
+ virtualHostAttributes.put("modelVersion", getToVersion());
+ record = new ConfiguredObjectRecordImpl(record.getId(), "VirtualHost", virtualHostAttributes, Collections.<String, ConfiguredObjectRecord>emptyMap());
+ _virtualHostRecord = record;
}
-
- upgrader.complete();
-
- Map<UUID, ConfiguredObjectRecord> deletedRecords = upgrader.getDeletedRecords();
- Map<UUID, ConfiguredObjectRecord> updatedRecords = upgrader.getUpdatedRecords();
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("VirtualHost store upgrade: " + deletedRecords.size() + " record(s) deleted");
- LOGGER.debug("VirtualHost store upgrade: " + updatedRecords.size() + " record(s) updated");
- LOGGER.debug("VirtualHost store upgrade: " + _records.size() + " total record(s)");
- }
-
- _store.update(true, updatedRecords.values().toArray(new ConfiguredObjectRecord[updatedRecords.size()]));
- _store.remove(deletedRecords.values().toArray(new ConfiguredObjectRecord[deletedRecords.size()]));
-
- _records.keySet().removeAll(deletedRecords.keySet());
- _records.putAll(updatedRecords);
-
- ConfiguredObjectRecord virtualHostRecord = null;
- for (ConfiguredObjectRecord record : _records.values())
- {
- LOGGER.debug("Found type " + record.getType());
- if ("VirtualHost".equals(record.getType()))
- {
- virtualHostRecord = record;
- break;
- }
- }
-
- if (virtualHostRecord != null)
+ else if("Exchange".equals(record.getType()))
{
- String parentCategory = _parent.getCategoryClass().getSimpleName();
- ConfiguredObjectRecord parentRecord = new ConfiguredObjectRecordImpl(_parent.getId(), parentCategory, Collections.<String, Object>emptyMap());
- Map<String, ConfiguredObjectRecord> rootParents = Collections.<String, ConfiguredObjectRecord>singletonMap(parentCategory, parentRecord);
- _records.put(virtualHostRecord.getId(), new ConfiguredObjectRecordImpl(virtualHostRecord.getId(), VirtualHost.class.getSimpleName(), virtualHostRecord.getAttributes(), rootParents));
- Collection<ConfiguredObjectRecord> records = _records.values();
- resolveObjects(_configuredObjectFactory, _parent, records.toArray(new ConfiguredObjectRecord[records.size()]));
+ Map<String, Object> attributes = record.getAttributes();
+ String name = (String)attributes.get(EXCHANGE_NAME);
+ _missingAmqpExchanges.remove(name);
}
+ getNextUpgrader().configuredObject(record);
}
- private DurableConfigurationStoreUpgrader buildUpgraderChain(String version)
- {
- DurableConfigurationStoreUpgrader head = null;
- while(!BrokerModel.MODEL_VERSION.equals(version))
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Adding virtual host store upgrader from model version: " + version);
- }
- final UpgraderPhaseFactory upgraderPhaseFactory = _upgraders.get(version);
- StoreUpgraderPhase upgrader = upgraderPhaseFactory.newInstance();
- if(head == null)
- {
- head = upgrader;
- }
- else
- {
- head.setNextUpgrader(upgrader);
- }
- version = upgraderPhaseFactory.getToVersion();
- }
-
- if(head == null)
- {
- head = new NullUpgrader();
- }
- else
- {
- head.setNextUpgrader(new NullUpgrader());
- }
-
- return head;
- }
-
- private String getCurrentVersion()
+ @Override
+ public void complete()
{
- for(ConfiguredObjectRecord record : _records.values())
+ for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
{
- if(record.getType().equals("VirtualHost"))
- {
- return (String) record.getAttributes().get(VirtualHost.MODEL_VERSION);
- }
- }
- return BrokerModel.MODEL_VERSION;
- }
-
-
- public void resolveObjects(ConfiguredObjectFactory factory, ConfiguredObject<?> root, ConfiguredObjectRecord... records)
- {
- Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, ConfiguredObject<?>>();
- resolvedObjects.put(root.getId(), root);
+ String name = entry.getKey();
+ String type = entry.getValue();
+ UUID id = _defaultExchangeIds.get(name);
- Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents = new ArrayList<ConfiguredObjectRecord>(Arrays.asList(records));
- Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> recordsWithUnresolvedDependencies =
- new ArrayList<UnresolvedConfiguredObject<? extends ConfiguredObject>>();
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(EXCHANGE_NAME, name);
+ attributes.put(EXCHANGE_TYPE, type);
+ attributes.put(EXCHANGE_DURABLE, true);
- boolean updatesMade;
+ ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord));
+ getUpdateMap().put(id, record);
- do
- {
- updatesMade = false;
- Iterator<ConfiguredObjectRecord> iter = recordsWithUnresolvedParents.iterator();
- while (iter.hasNext())
- {
- ConfiguredObjectRecord record = iter.next();
- Collection<ConfiguredObject<?>> parents = new ArrayList<ConfiguredObject<?>>();
- boolean foundParents = true;
- for (ConfiguredObjectRecord parent : record.getParents().values())
- {
- if (!resolvedObjects.containsKey(parent.getId()))
- {
- foundParents = false;
- break;
- }
- else
- {
- parents.add(resolvedObjects.get(parent.getId()));
- }
- }
- if (foundParents)
- {
- iter.remove();
- UnresolvedConfiguredObject<? extends ConfiguredObject> recovered =
- factory.recover(record, parents.toArray(new ConfiguredObject<?>[parents.size()]));
- Collection<ConfiguredObjectDependency<?>> dependencies =
- recovered.getUnresolvedDependencies();
- if (dependencies.isEmpty())
- {
- updatesMade = true;
- ConfiguredObject<?> resolved = recovered.resolve();
- resolvedObjects.put(resolved.getId(), resolved);
- }
- else
- {
- recordsWithUnresolvedDependencies.add(recovered);
- }
- }
+ getNextUpgrader().configuredObject(record);
- }
-
- Iterator<UnresolvedConfiguredObject<? extends ConfiguredObject>> unresolvedIter =
- recordsWithUnresolvedDependencies.iterator();
-
- while(unresolvedIter.hasNext())
- {
- UnresolvedConfiguredObject<? extends ConfiguredObject> unresolvedObject = unresolvedIter.next();
- Collection<ConfiguredObjectDependency<?>> dependencies =
- new ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies());
+ }
- for(ConfiguredObjectDependency dependency : dependencies)
- {
- if(dependency instanceof ConfiguredObjectIdDependency)
- {
- UUID id = ((ConfiguredObjectIdDependency)dependency).getId();
- if(resolvedObjects.containsKey(id))
- {
- dependency.resolve(resolvedObjects.get(id));
- }
- }
- else if(dependency instanceof ConfiguredObjectNameDependency)
- {
- ConfiguredObject<?> dependentObject = null;
- for(ConfiguredObject<?> parent : unresolvedObject.getParents())
- {
- dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), ((ConfiguredObjectNameDependency)dependency).getName());
- if(dependentObject != null)
- {
- break;
- }
- }
- if(dependentObject != null)
- {
- dependency.resolve(dependentObject);
- }
- }
- else
- {
- throw new ServerScopedRuntimeException("Unknown dependency type " + dependency.getClass().getSimpleName());
- }
- }
- if(unresolvedObject.getUnresolvedDependencies().isEmpty())
- {
- updatesMade = true;
- unresolvedIter.remove();
- ConfiguredObject<?> resolved = unresolvedObject.resolve();
- resolvedObjects.put(resolved.getId(), resolved);
- }
- }
+ getNextUpgrader().complete();
+ }
- } while(updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() && recordsWithUnresolvedParents.isEmpty()));
+ }
- if(!recordsWithUnresolvedDependencies.isEmpty())
- {
- throw new IllegalArgumentException("Cannot resolve some objects: " + recordsWithUnresolvedDependencies);
- }
- if(!recordsWithUnresolvedParents.isEmpty())
- {
- throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + recordsWithUnresolvedParents);
- }
- }
+ public void perform(DurableConfigurationStore durableConfigurationStore)
+ {
+ String virtualHostCategory = VirtualHost.class.getSimpleName();
+ GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders);
+ upgraderHandler.upgrade();
+ new GenericRecoverer(_virtualHostNode, virtualHostCategory).recover(upgraderHandler.getRecords());
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
index 28d7830290..26aa99a481 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerRecovererTest.java
@@ -18,11 +18,12 @@
* under the License.
*
*/
-package org.apache.qpid.server.configuration.startup;
+package org.apache.qpid.server.store;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -31,9 +32,7 @@ import java.util.UUID;
import junit.framework.TestCase;
import org.apache.qpid.server.BrokerOptions;
-import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.configuration.RecovererProvider;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogRecorder;
@@ -49,6 +48,7 @@ import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.SystemContextImpl;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
+import org.apache.qpid.server.store.GenericRecoverer;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
public class BrokerRecovererTest extends TestCase
@@ -56,9 +56,9 @@ public class BrokerRecovererTest extends TestCase
private ConfiguredObjectRecord _brokerEntry = mock(ConfiguredObjectRecord.class);
private UUID _brokerId = UUID.randomUUID();
- private AuthenticationProvider _authenticationProvider1;
+ private AuthenticationProvider<?> _authenticationProvider1;
private UUID _authenticationProvider1Id = UUID.randomUUID();
- private SystemContext _systemContext;
+ private SystemContext<?> _systemContext;
private ConfiguredObjectFactory _configuredObjectFactory;
private TaskExecutor _taskExecutor;
@@ -91,8 +91,14 @@ public class BrokerRecovererTest extends TestCase
@Override
protected void tearDown() throws Exception
{
- super.tearDown();
- _taskExecutor.stop();
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ _taskExecutor.stop();
+ }
}
public void testCreateBrokerAttributes()
@@ -115,8 +121,8 @@ public class BrokerRecovererTest extends TestCase
when(_brokerEntry.getAttributes()).thenReturn(entryAttributes);
- _systemContext.resolveObjects(_brokerEntry);
- Broker broker = _systemContext.getBroker();
+ resolveObjects(_brokerEntry);
+ Broker<?> broker = _systemContext.getBroker();
assertNotNull(broker);
@@ -171,7 +177,7 @@ public class BrokerRecovererTest extends TestCase
UUID authProviderId = UUID.randomUUID();
UUID portId = UUID.randomUUID();
- _systemContext.resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord(
+ resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"), createPortRecord(
portId,
5672,
"authProvider"));
@@ -188,7 +194,7 @@ public class BrokerRecovererTest extends TestCase
{
UUID authProviderId = UUID.randomUUID();
- _systemContext.resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"));
+ resolveObjects(_brokerEntry, createAuthProviderRecord(authProviderId, "authProvider"));
Broker<?> broker = _systemContext.getBroker();
@@ -206,7 +212,7 @@ public class BrokerRecovererTest extends TestCase
UUID authProvider2Id = UUID.randomUUID();
UUID port2Id = UUID.randomUUID();
- _systemContext.resolveObjects(_brokerEntry,
+ resolveObjects(_brokerEntry,
createAuthProviderRecord(authProviderId, "authProvider"),
createPortRecord(portId, 5672, "authProvider"),
createAuthProviderRecord(authProvider2Id, "authProvider2"),
@@ -228,7 +234,7 @@ public class BrokerRecovererTest extends TestCase
UUID authProviderId = UUID.randomUUID();
- _systemContext.resolveObjects(_brokerEntry, createGroupProviderRecord(authProviderId, "groupProvider"));
+ resolveObjects(_brokerEntry, createGroupProviderRecord(authProviderId, "groupProvider"));
Broker<?> broker = _systemContext.getBroker();
@@ -253,7 +259,7 @@ public class BrokerRecovererTest extends TestCase
try
{
- _systemContext.resolveObjects(_brokerEntry);
+ resolveObjects(_brokerEntry);
Broker<?> broker = _systemContext.getBroker();
broker.open();
fail("The broker creation should fail due to unsupported model version");
@@ -323,33 +329,10 @@ public class BrokerRecovererTest extends TestCase
return String.valueOf(attributeValue);
}
- private RecovererProvider createRecoveryProvider(final ConfiguredObjectRecord[] entries, final ConfiguredObject[] objectsToRecoverer)
+ private void resolveObjects(ConfiguredObjectRecord... records)
{
- RecovererProvider recovererProvider = new RecovererProvider()
- {
- @Override
- public ConfiguredObjectRecoverer<? extends ConfiguredObject> getRecoverer(String type)
- {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- final ConfiguredObjectRecoverer<? extends ConfiguredObject> recoverer = new ConfiguredObjectRecoverer()
- {
- public ConfiguredObject create(RecovererProvider recovererProvider, ConfiguredObjectRecord entry, ConfiguredObject... parents)
- {
- for (int i = 0; i < entries.length; i++)
- {
- ConfiguredObjectRecord e = entries[i];
- if (entry == e)
- {
- return objectsToRecoverer[i];
- }
- }
- return null;
- }
- };
-
- return recoverer;
- }
- };
- return recovererProvider;
+ GenericRecoverer recoverer = new GenericRecoverer(_systemContext, Broker.class.getSimpleName());
+ recoverer.recover(Arrays.asList(records));
}
+
}