diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-08-15 16:42:39 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-08-15 16:42:39 +0000 |
| commit | 7057688d9214cffd217781db3c51abef5e227c93 (patch) | |
| tree | af52519ecd8844b7061ae442c84dec1f83bd45ae /qpid/java | |
| parent | f203ee690d73b8f6ff19ba8b4f3f39808a1eddde (diff) | |
| download | qpid-python-7057688d9214cffd217781db3c51abef5e227c93.tar.gz | |
QPID-5073 : [Java Broker] Refactor DurableConfigurationStore recovery to allow for additional configured object children other than just Exchange/Binding/Queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1514360 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
34 files changed, 1842 insertions, 650 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index c2a530499a..2350e28ee2 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -29,6 +29,7 @@ import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -809,31 +810,52 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo { LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called"); } - OperationStatus status = removeConfiguredObject(id); + OperationStatus status = removeConfiguredObject(null, id); if (status == OperationStatus.NOTFOUND) { throw new AMQStoreException("Configured object of type " + type + " with id " + id + " not found"); } } + @Override + public UUID[] removeConfiguredObjects(final UUID... objects) throws AMQStoreException + { + com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); + Collection<UUID> removed = new ArrayList<UUID>(objects.length); + for(UUID id : objects) + { + if(removeConfiguredObject(txn, id) == OperationStatus.SUCCESS) + { + removed.add(id); + } + } + + txn.commit(); + return removed.toArray(new UUID[removed.size()]); + } @Override public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException { - update(id, type, attributes, null); + update(false, id, type, attributes, null); } public void update(ConfiguredObjectRecord... records) throws AMQStoreException { + update(false, records); + } + + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException + { com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); for(ConfiguredObjectRecord record : records) { - update(record.getId(), record.getType(), record.getAttributes(), txn); + update(createIfNecessary, record.getId(), record.getType(), record.getAttributes(), txn); } txn.commit(); } - private void update(UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws AMQStoreException + private void update(boolean createIfNecessary, UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws AMQStoreException { if (LOGGER.isDebugEnabled()) { @@ -851,7 +873,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT); - if (status == OperationStatus.SUCCESS) + if (status == OperationStatus.SUCCESS || (createIfNecessary && status == OperationStatus.NOTFOUND)) { ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); @@ -1406,14 +1428,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } } - private OperationStatus removeConfiguredObject(UUID id) throws AMQStoreException + private OperationStatus removeConfiguredObject(Transaction tx, UUID id) throws AMQStoreException { DatabaseEntry key = new DatabaseEntry(); UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); uuidBinding.objectToEntry(id, key); try { - return _configuredObjectsDb.delete(null, key); + return _configuredObjectsDb.delete(tx, key); } catch (DatabaseException e) { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index be91e4a484..b92a97c8cb 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -25,12 +25,14 @@ import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.OperationalLoggingListener; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; +import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider; import org.apache.qpid.server.virtualhost.State; import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -58,7 +60,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost _messageStore = new BDBHAMessageStore(); final MessageStoreLogSubject storeLogSubject = - new MessageStoreLogSubject(this, _messageStore.getClass().getSimpleName()); + new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); OperationalLoggingListener.listen(_messageStore, storeLogSubject); _messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); @@ -71,9 +73,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost _messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory()); + DurableConfigurationRecoverer configRecoverer = + new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), + new DefaultUpgraderProvider(this, getExchangeRegistry())); _messageStore.configureConfigStore(getName(), - recoveryHandler, + configRecoverer, virtualHost); _messageStore.configureMessageStore(getName(), diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java index 647e19d659..ece4e2275e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java @@ -180,7 +180,12 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore return _entries.get(id); } - @Override + /** + * Copies the store into the given location + * + * @param target location to copy store into + * @throws IllegalConfigurationException if store cannot be copied into given location + */ public void copyTo(String copyLocation) { File file = new File(copyLocation); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java index 08963bd8f1..ed989d764f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java @@ -28,8 +28,10 @@ public class MessageStoreLogSubject extends AbstractLogSubject { /** Create an MessageStoreLogSubject that Logs in the following format. */ - public MessageStoreLogSubject(VirtualHost vhost, String messageStoreName) + public MessageStoreLogSubject(String vhostName, String messageStoreName) { - setLogStringWithFormat(STORE_FORMAT, vhost.getName(), messageStoreName); + setLogStringWithFormat(STORE_FORMAT, vhostName, messageStoreName); } + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 9e32d303fb..a84a041b72 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -119,7 +119,8 @@ public interface VirtualHost extends ConfiguredObject QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, CONFIG_PATH)); - int CURRENT_CONFIG_VERSION = 1; + + int CURRENT_CONFIG_VERSION = 2; //children Collection<VirtualHostAlias> getAliases(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java index 7ff8c88331..9ac2cb00a3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java @@ -39,7 +39,7 @@ import org.apache.qpid.server.configuration.updater.CreateChildTask; import org.apache.qpid.server.configuration.updater.SetAttributeTask; import org.apache.qpid.server.configuration.updater.TaskExecutor; -abstract class AbstractAdapter implements ConfiguredObject +public abstract class AbstractAdapter implements ConfiguredObject { private static final Object ID = "id"; private final Map<String,Object> _attributes = new HashMap<String, Object>(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index aaa6f460d2..e8bacb2712 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -190,7 +190,7 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection } else if(name.equals(INCOMING)) { - + return true; } else if(name.equals(LOCAL_ADDRESS)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java new file mode 100644 index 0000000000..dbe8bf22a0 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Map; +import java.util.UUID; + +public abstract class AbstractDurableConfiguredObjectRecoverer<T> implements DurableConfiguredObjectRecoverer +{ + @Override + public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer, + final UUID id, + final Map<String, Object> attributes) + { + final UnresolvedObject obj = createUnresolvedObject(id, getType(), attributes); + UnresolvedDependency[] dependencies = obj.getUnresolvedDependencies(); + for(final UnresolvedDependency dependency : dependencies) + { + Object dep; + if((dep = durableConfigurationRecoverer.getResolvedObject(dependency.getType(), dependency.getId())) != null) + { + dependency.resolve(dep); + } + else + { + durableConfigurationRecoverer.addResolutionListener(dependency.getType(), dependency.getId(), + new DependencyListener() + { + + @Override + public void dependencyResolved(final String depType, + final UUID depId, + final Object o) + { + dependency.resolve(o); + if(obj.getUnresolvedDependencies().length == 0) + { + durableConfigurationRecoverer.resolve(getType(), id, obj.resolve()); + } + } + }); + } + } + if(obj.getUnresolvedDependencies().length == 0) + { + durableConfigurationRecoverer.resolve(getType(), id, obj.resolve()); + } + else + { + durableConfigurationRecoverer.addUnresolvedObject(getType(), id, obj); + } + + } + + public abstract UnresolvedObject<T> createUnresolvedObject(final UUID id, + final String type, + final Map<String, Object> attributes); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index ae6b4b0154..e2fea8f50b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -35,6 +35,8 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.UUID; @@ -1978,15 +1980,35 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC Connection conn = newAutoCommitConnection(); try { - PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS); - try - { - stmt.setString(1, id.toString()); - results = stmt.executeUpdate(); - } - finally + results = removeConfiguredObject(id, conn); + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e); + } + return results; + } + + public UUID[] removeConfiguredObjects(UUID... objects) throws AMQStoreException + { + Collection<UUID> removed = new ArrayList<UUID>(objects.length); + try + { + + Connection conn = newAutoCommitConnection(); + try + { + for(UUID id : objects) { - stmt.close(); + if(removeConfiguredObject(id, conn) != 0) + { + removed.add(id); + } } } finally @@ -1996,7 +2018,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } catch (SQLException e) { - throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e); + throw new AMQStoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e); + } + return removed.toArray(new UUID[removed.size()]); + } + + private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException + { + final int results;PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS); + try + { + stmt.setString(1, id.toString()); + results = stmt.executeUpdate(); + } + finally + { + stmt.close(); } return results; } @@ -2010,7 +2047,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC Connection conn = newAutoCommitConnection(); try { - updateConfiguredObject(configuredObject, conn); + updateConfiguredObject(configuredObject, false, conn); } finally { @@ -2027,6 +2064,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC @Override public void update(ConfiguredObjectRecord... records) throws AMQStoreException { + update(false, records); + } + + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException + { if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING)) { try @@ -2036,7 +2078,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { for(ConfiguredObjectRecord record : records) { - updateConfiguredObject(record, conn); + updateConfiguredObject(record, createIfNecessary, conn); } conn.commit(); } @@ -2054,7 +2096,9 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, Connection conn) + private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, + boolean createIfNecessary, + Connection conn) throws SQLException, AMQStoreException { PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); @@ -2089,6 +2133,31 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC stmt2.close(); } } + else if(createIfNecessary) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); + try + { + insertStmt.setString(1, configuredObject.getId().toString()); + insertStmt.setString(2, configuredObject.getType()); + if(configuredObject.getAttributes() == null) + { + insertStmt.setNull(3, Types.BLOB); + } + else + { + final Map<String, Object> attributes = configuredObject.getAttributes(); + byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes); + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); + } + insertStmt.execute(); + } + finally + { + insertStmt.close(); + } + } } finally { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DependencyListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DependencyListener.java new file mode 100644 index 0000000000..120c904cf7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DependencyListener.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.UUID; + +interface DependencyListener +{ + void dependencyResolved(String type, UUID id, Object o); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java new file mode 100644 index 0000000000..7fb80bde96 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java @@ -0,0 +1,242 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; + +import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; + +public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandler +{ + private static final Logger _logger = Logger.getLogger(DurableConfigurationRecoverer.class); + + private final Map<String, Map<UUID, Object>> _resolvedObjects = new HashMap<String, Map<UUID, Object>>(); + + private final Map<String, Map<UUID, UnresolvedObject>> _unresolvedObjects = + new HashMap<String, Map<UUID, UnresolvedObject>>(); + + private final Map<String, Map<UUID, List<DependencyListener>>> _dependencyListeners = + new HashMap<String, Map<UUID, List<DependencyListener>>>(); + private final Map<String, DurableConfiguredObjectRecoverer> _recoverers; + private final UpgraderProvider _upgraderProvider; + + private DurableConfigurationStoreUpgrader _upgrader; + + private DurableConfigurationStore _store; + private final String _name; + + private MessageStoreLogSubject _logSubject; + + public DurableConfigurationRecoverer(final String name, + Map<String, DurableConfiguredObjectRecoverer> recoverers, + UpgraderProvider upgraderProvider) + { + _recoverers = recoverers; + _name = name; + _upgraderProvider = upgraderProvider; + } + + @Override + public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + { + _logSubject = new MessageStoreLogSubject(_name, store.getClass().getSimpleName()); + + _store = store; + _upgrader = _upgraderProvider.getUpgrader(configVersion, this); + } + + @Override + public void configuredObject(final UUID id, final String type, final Map<String, Object> attributes) + { + _upgrader.configuredObject(id, type, attributes); + } + + void onConfiguredObject(final UUID id, final String type, final Map<String, Object> attributes) + { + DurableConfiguredObjectRecoverer recoverer = getRecoverer(type); + if(recoverer == null) + { + throw new IllegalConfigurationException("Unkown type for configured object: " + type); + } + recoverer.load(this, id, attributes); + } + + private DurableConfiguredObjectRecoverer getRecoverer(final String type) + { + DurableConfiguredObjectRecoverer recoverer = _recoverers.get(type); + return recoverer; + } + + @Override + public int completeConfigurationRecovery() + { + _upgrader.complete(); + checkUnresolvedDependencies(); + applyUpgrade(); + + CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE()); + return CURRENT_CONFIG_VERSION; + } + + private void applyUpgrade() + { + + final Collection<ConfiguredObjectRecord> updates = new ArrayList<ConfiguredObjectRecord>(); + final Collection<UUID> deletes = new ArrayList<UUID>(); + for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _upgrader.getUpdatedRecords().entrySet()) + { + if(entry.getValue() != null) + { + updates.add(entry.getValue()); + } + else + { + deletes.add(entry.getKey()); + } + } + + try + { + if(!updates.isEmpty()) + { + _store.update(true,updates.toArray(new ConfiguredObjectRecord[updates.size()])); + } + if(!deletes.isEmpty()) + { + _store.removeConfiguredObjects(deletes.toArray(new UUID[deletes.size()])); + } + } + catch (AMQStoreException e) + { + // TODO better exception + throw new RuntimeException("Unable to update config store when upgrading"); + } + + } + + private void checkUnresolvedDependencies() + { + if(_unresolvedObjects != null && !_unresolvedObjects.isEmpty()) + { + boolean unresolvedObjectsExist = false; + for(Map.Entry<String, Map<UUID, UnresolvedObject>>entry : _unresolvedObjects.entrySet()) + { + for(Map.Entry<UUID,UnresolvedObject> obj : entry.getValue().entrySet()) + { + unresolvedObjectsExist = true; + StringBuilder errorMessage = new StringBuilder("Durable configured object of type "); + errorMessage.append(entry.getKey()).append(" with id ").append(obj.getKey()) + .append(" has unresolved dependencies: "); + for(UnresolvedDependency dep : obj.getValue().getUnresolvedDependencies()) + { + errorMessage.append(dep.getType()).append(" with id ").append(dep.getId()).append("; "); + } + _logger.error(errorMessage); + } + } + if(unresolvedObjectsExist) + { + throw new IllegalConfigurationException("Durable configuration has unresolved dependencies"); + } + } + } + + void addResolutionListener(final String type, + final UUID id, + final DependencyListener dependencyListener) + { + Map<UUID, List<DependencyListener>> typeListeners = _dependencyListeners.get(type); + if(typeListeners == null) + { + typeListeners = new HashMap<UUID, List<DependencyListener>>(); + _dependencyListeners.put(type, typeListeners); + } + List<DependencyListener> objectListeners = typeListeners.get(id); + if(objectListeners == null) + { + objectListeners = new ArrayList<DependencyListener>(); + typeListeners.put(id, objectListeners); + } + objectListeners.add(dependencyListener); + + } + + Object getResolvedObject(final String type, final UUID id) + { + Map<UUID, Object> objects = _resolvedObjects.get(type); + return objects == null ? null : objects.get(id); + } + + void resolve(final String type, final UUID id, final Object object) + { + Map<UUID, Object> typeObjects = _resolvedObjects.get(type); + if(typeObjects == null) + { + typeObjects = new HashMap<UUID, Object>(); + _resolvedObjects.put(type, typeObjects); + } + typeObjects.put(id, object); + Map<UUID, UnresolvedObject> unresolved = _unresolvedObjects.get(type); + if(unresolved != null) + { + unresolved.remove(id); + } + + Map<UUID, List<DependencyListener>> typeListeners = _dependencyListeners.get(type); + if(typeListeners != null) + { + List<DependencyListener> listeners = typeListeners.remove(id); + if(listeners != null) + { + for(DependencyListener listener : listeners) + { + listener.dependencyResolved(type, id, object); + } + } + } + } + + void addUnresolvedObject(final String type, + final UUID id, + final UnresolvedObject obj) + { + Map<UUID, UnresolvedObject> typeObjects = _unresolvedObjects.get(type); + if(typeObjects == null) + { + typeObjects = new HashMap<UUID, UnresolvedObject>(); + _unresolvedObjects.put(type, typeObjects); + } + typeObjects.put(id, obj); + } + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 7ce761af18..08f3d83c4e 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -55,7 +55,6 @@ public interface DurableConfigurationStore VirtualHost virtualHost) throws Exception; - /** * Makes the specified object persistent. * @@ -77,6 +76,8 @@ public interface DurableConfigurationStore */ void remove(UUID id, String type) throws AMQStoreException; + public UUID[] removeConfiguredObjects(UUID... objects) throws AMQStoreException; + /** * Updates the specified object in the persistent store, IF it is already present. If the object @@ -92,6 +93,7 @@ public interface DurableConfigurationStore public void update(ConfiguredObjectRecord... records) throws AMQStoreException; + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java index 9fab29fea6..efb1e95e99 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java @@ -36,6 +36,10 @@ import org.apache.qpid.server.queue.AMQQueue; public class DurableConfigurationStoreHelper { + private static final String BINDING = Binding.class.getSimpleName(); + private static final String EXCHANGE = Exchange.class.getSimpleName(); + private static final String QUEUE = Queue.class.getSimpleName(); + public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException { Map<String, Object> attributesMap = new LinkedHashMap<String, Object>(); @@ -60,7 +64,7 @@ public class DurableConfigurationStoreHelper { attributesMap.put(Queue.ARGUMENTS, queue.getArguments()); } - store.update(queue.getId(), Queue.class.getName(), attributesMap); + store.update(queue.getId(), QUEUE, attributesMap); } public static void createQueue(DurableConfigurationStore store, AMQQueue queue, FieldTable arguments) @@ -80,12 +84,12 @@ public class DurableConfigurationStoreHelper { attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments)); } - store.create(queue.getId(),Queue.class.getName(),attributesMap); + store.create(queue.getId(), QUEUE,attributesMap); } public static void removeQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException { - store.remove(queue.getId(), Queue.class.getName()); + store.remove(queue.getId(), QUEUE); } public static void createExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange) @@ -96,7 +100,7 @@ public class DurableConfigurationStoreHelper attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString())); attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name() : LifetimePolicy.PERMANENT.name()); - store.create(exchange.getId(), Exchange.class.getName(), attributesMap); + store.create(exchange.getId(), EXCHANGE, attributesMap); } @@ -104,7 +108,7 @@ public class DurableConfigurationStoreHelper public static void removeExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange) throws AMQStoreException { - store.remove(exchange.getId(),Exchange.class.getName()); + store.remove(exchange.getId(), EXCHANGE); } public static void createBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding) @@ -119,14 +123,14 @@ public class DurableConfigurationStoreHelper { attributesMap.put(Binding.ARGUMENTS, arguments); } - store.create(binding.getId(), Binding.class.getName(), attributesMap); + store.create(binding.getId(), BINDING, attributesMap); } public static void removeBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding) throws AMQStoreException { - store.remove(binding.getId(), Binding.class.getName()); + store.remove(binding.getId(), BINDING); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java new file mode 100644 index 0000000000..1d3e4cc672 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Map; +import java.util.UUID; + +public interface DurableConfigurationStoreUpgrader +{ + void configuredObject(UUID id, String type, Map<String, Object> attributes); + + void complete(); + + void setNextUpgrader(DurableConfigurationStoreUpgrader upgrader); + + Map<UUID, ConfiguredObjectRecord> getUpdatedRecords(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java new file mode 100644 index 0000000000..e065728bd3 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Map; +import java.util.UUID; + +public interface DurableConfiguredObjectRecoverer +{ + public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer, + final UUID id, + final Map<String, Object> attributes); + + public String getType(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java new file mode 100644 index 0000000000..a671e93b26 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public abstract class NonNullUpgrader implements DurableConfigurationStoreUpgrader +{ + private DurableConfigurationStoreUpgrader _nextUpgrader; + private final Map<UUID, ConfiguredObjectRecord> _updates = new HashMap<UUID, ConfiguredObjectRecord>(); + + public final void setNextUpgrader(final DurableConfigurationStoreUpgrader upgrader) + { + if(_nextUpgrader == null) + { + _nextUpgrader = upgrader; + } + else + { + _nextUpgrader.setNextUpgrader(upgrader); + } + } + + protected DurableConfigurationStoreUpgrader getNextUpgrader() + { + return _nextUpgrader; + } + + protected Map<UUID, ConfiguredObjectRecord> getUpdateMap() + { + return _updates; + } + + @Override + public final Map<UUID, ConfiguredObjectRecord> getUpdatedRecords() + { + final Map<UUID, ConfiguredObjectRecord> updates = new HashMap<UUID, ConfiguredObjectRecord>(_updates); + updates.putAll(_nextUpgrader.getUpdatedRecords()); + return updates; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 078a0d3752..9eb0d85914 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -44,11 +44,23 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException + { + } + + + @Override public void remove(UUID id, String type) { } @Override + public UUID[] removeConfiguredObjects(final UUID... objects) throws AMQStoreException + { + return objects; + } + + @Override public void create(UUID id, String type, Map<String, Object> attributes) { } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullUpgrader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullUpgrader.java new file mode 100644 index 0000000000..c8a812fa89 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullUpgrader.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +public final class NullUpgrader implements DurableConfigurationStoreUpgrader +{ + private DurableConfigurationRecoverer _durableConfigurationRecoverer; + + public NullUpgrader(final DurableConfigurationRecoverer durableConfigurationRecoverer) + { + _durableConfigurationRecoverer = durableConfigurationRecoverer; + } + + @Override + public void configuredObject(final UUID id, final String type, final Map<String, Object> attributes) + { + _durableConfigurationRecoverer.onConfiguredObject(id, type, attributes); + } + + @Override + public void complete() + { + } + + @Override + public void setNextUpgrader(final DurableConfigurationStoreUpgrader upgrader) + { + throw new UnsupportedOperationException("NullUpgrader must always be the last upgrader"); + } + + @Override + public Map<UUID, ConfiguredObjectRecord> getUpdatedRecords() + { + return Collections.emptyMap(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java new file mode 100644 index 0000000000..98348efbd2 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.UUID; + +public interface UnresolvedDependency<T> +{ + public UUID getId(); + public String getType(); + + public void resolve(final T dependency); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java new file mode 100644 index 0000000000..7ebebadae7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public interface UnresolvedObject<T> +{ + public UnresolvedDependency[] getUnresolvedDependencies(); + + T resolve(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UpgraderProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UpgraderProvider.java new file mode 100644 index 0000000000..c2ea0745ff --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UpgraderProvider.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public interface UpgraderProvider +{ + DurableConfigurationStoreUpgrader getUpgrader(int configVersion, DurableConfigurationRecoverer recoverer); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 58f4ed48ca..4e27a008dd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -59,6 +59,7 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; +import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.txn.DtxRegistry; @@ -743,6 +744,22 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } } + protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers() + { + DurableConfiguredObjectRecoverer[] recoverers = { + new QueueRecoverer(this, getExchangeRegistry()), + new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()), + new BindingRecoverer(this, getExchangeRegistry()) + }; + + final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>(); + for(DurableConfiguredObjectRecoverer recoverer : recoverers) + { + recovererMap.put(recoverer.getType(), recoverer); + } + return recovererMap; + } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java new file mode 100644 index 0000000000..7cfadbcadf --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java @@ -0,0 +1,177 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; +import org.apache.qpid.server.store.UnresolvedDependency; +import org.apache.qpid.server.store.UnresolvedObject; + +public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<Binding> +{ + private static final Logger _logger = Logger.getLogger(BindingRecoverer.class); + + private final ExchangeRegistry _exchangeRegistry; + private final VirtualHost _virtualHost; + + public BindingRecoverer(final VirtualHost virtualHost, + final ExchangeRegistry exchangeRegistry) + { + _exchangeRegistry = exchangeRegistry; + _virtualHost = virtualHost; + } + + @Override + public UnresolvedObject<Binding> createUnresolvedObject(final UUID id, + final String type, + final Map<String, Object> attributes) + { + return new UnresolvedBinding(id, attributes); + } + + @Override + public String getType() + { + return org.apache.qpid.server.model.Binding.class.getSimpleName(); + } + + private class UnresolvedBinding implements UnresolvedObject<Binding> + { + private final Map<String, Object> _bindingArgumentsMap; + private final String _bindingName; + private final UUID _queueId; + private final UUID _exchangeId; + private final UUID _bindingId; + + private List<UnresolvedDependency> _unresolvedDependencies = + new ArrayList<UnresolvedDependency>(); + + private Exchange _exchange; + private AMQQueue _queue; + + public UnresolvedBinding(final UUID id, + final Map<String, Object> attributeMap) + { + _bindingId = id; + _exchangeId = UUID.fromString((String)attributeMap.get(org.apache.qpid.server.model.Binding.EXCHANGE)); + _queueId = UUID.fromString((String) attributeMap.get(org.apache.qpid.server.model.Binding.QUEUE)); + _exchange = _exchangeRegistry.getExchange(_exchangeId); + if(_exchange == null) + { + _unresolvedDependencies.add(new ExchangeDependency()); + } + _queue = _virtualHost.getQueueRegistry().getQueue(_queueId); + if(_queue == null) + { + _unresolvedDependencies.add(new QueueDependency()); + } + + _bindingName = (String) attributeMap.get(org.apache.qpid.server.model.Binding.NAME); + _bindingArgumentsMap = (Map<String, Object>) attributeMap.get(org.apache.qpid.server.model.Binding.ARGUMENTS); + } + + @Override + public UnresolvedDependency[] getUnresolvedDependencies() + { + return _unresolvedDependencies.toArray(new UnresolvedDependency[_unresolvedDependencies.size()]); + } + + @Override + public Binding resolve() + { + try + { + if(_exchange.getBinding(_bindingName, _queue, _bindingArgumentsMap) == null) + { + _logger.info("Restoring binding: (Exchange: " + _exchange.getNameShortString() + ", Queue: " + _queue.getName() + + ", Routing Key: " + _bindingName + ", Arguments: " + _bindingArgumentsMap + ")"); + + _exchange.restoreBinding(_bindingId, _bindingName, _queue, _bindingArgumentsMap); + } + return _exchange.getBinding(_bindingName, _queue, _bindingArgumentsMap); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + + private class QueueDependency implements UnresolvedDependency<AMQQueue> + { + + @Override + public UUID getId() + { + return _queueId; + } + + @Override + public String getType() + { + return Queue.class.getSimpleName(); + } + + @Override + public void resolve(final AMQQueue dependency) + { + _queue = dependency; + _unresolvedDependencies.remove(this); + } + + } + + private class ExchangeDependency implements UnresolvedDependency<Exchange> + { + + @Override + public UUID getId() + { + return _exchangeId; + } + + @Override + public String getType() + { + return org.apache.qpid.server.model.Exchange.class.getSimpleName(); + } + + @Override + public void resolve(final Exchange dependency) + { + _exchange = dependency; + _unresolvedDependencies.remove(this); + } + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java new file mode 100644 index 0000000000..3526551073 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -0,0 +1,227 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.FilterSupport; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.DurableConfigurationRecoverer; +import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader; +import org.apache.qpid.server.store.NonNullUpgrader; +import org.apache.qpid.server.store.NullUpgrader; +import org.apache.qpid.server.store.UpgraderProvider; + +import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; + +public class DefaultUpgraderProvider implements UpgraderProvider +{ + private final ExchangeRegistry _exchangeRegistry; + private final VirtualHost _virtualHost; + + public DefaultUpgraderProvider(final VirtualHost virtualHost, + final ExchangeRegistry exchangeRegistry) + { + _virtualHost = virtualHost; + _exchangeRegistry = exchangeRegistry; + } + + public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer) + { + DurableConfigurationStoreUpgrader currentUpgrader = null; + switch(configVersion) + { + case 0: + currentUpgrader = addUpgrader(currentUpgrader, new Version0Upgrader()); + case 1: + currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader()); + case CURRENT_CONFIG_VERSION: + currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer)); + break; + + default: + throw new IllegalStateException("Unknown configuration model version: " + configVersion + + ". Attempting to run an older instance against an upgraded configuration?"); + } + return currentUpgrader; + } + + private DurableConfigurationStoreUpgrader addUpgrader(DurableConfigurationStoreUpgrader currentUpgrader, + final DurableConfigurationStoreUpgrader nextUpgrader) + { + if(currentUpgrader == null) + { + currentUpgrader = nextUpgrader; + } + else + { + currentUpgrader.setNextUpgrader(nextUpgrader); + } + return currentUpgrader; + } + + /* + * Removes filters from queue bindings to exchanges other than topic exchanges. In older versions of the broker + * such bindings would have been ignored, starting from the point at which the config version changed, these + * arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue. + */ + private class Version0Upgrader extends NonNullUpgrader + { + private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>(); + + public Version0Upgrader() + { + } + + @Override + public void configuredObject(final UUID id, final String type, Map<String, Object> attributes) + { + _records.put(id, new ConfiguredObjectRecord(id, type, attributes)); + } + + private void removeSelectorArguments(Map<String, Object> binding) + { + @SuppressWarnings("unchecked") + Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS)); + + FilterSupport.removeFilters(arguments); + binding.put(Binding.ARGUMENTS, arguments); + } + + private boolean isTopicExchange(Map<String, Object> binding) + { + UUID exchangeId = UUID.fromString((String)binding.get(Binding.EXCHANGE)); + + if(_records.containsKey(exchangeId)) + { + return "topic".equals(_records.get(exchangeId) + .getAttributes() + .get(org.apache.qpid.server.model.Exchange.TYPE)); + } + else + { + return _exchangeRegistry.getExchange(exchangeId) != null + && _exchangeRegistry.getExchange(exchangeId).getType() == TopicExchange.TYPE; + } + + } + + private boolean hasSelectorArguments(Map<String, Object> binding) + { + @SuppressWarnings("unchecked") + Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS); + return (arguments != null) && FilterSupport.argumentsContainFilter(arguments); + } + + + + @Override + public void complete() + { + for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet()) + { + ConfiguredObjectRecord record = entry.getValue(); + String type = record.getType(); + Map<String, Object> attributes = record.getAttributes(); + UUID id = record.getId(); + if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(attributes)) + { + attributes = new LinkedHashMap<String, Object>(attributes); + removeSelectorArguments(attributes); + + record = new ConfiguredObjectRecord(id, type, attributes); + getUpdateMap().put(id, record); + entry.setValue(record); + + } + getNextUpgrader().configuredObject(id, type, attributes); + } + + getNextUpgrader().complete(); + } + + } + + /* + * Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker + * configuration store). Also remove bindings which reference non-existant queues or exchanges. + */ + private class Version1Upgrader extends NonNullUpgrader + { + @Override + public void configuredObject(final UUID id, String type, final Map<String, Object> attributes) + { + type = type.substring(1+type.lastIndexOf('.')); + getUpdateMap().put(id, new ConfiguredObjectRecord(id, type, attributes)); + + } + + @Override + public void complete() + { + for(Map.Entry<UUID, ConfiguredObjectRecord> entry : getUpdateMap().entrySet()) + { + final ConfiguredObjectRecord record = entry.getValue(); + if(isBinding(record.getType()) && (unknownExchange((String) record.getAttributes().get(Binding.EXCHANGE)) + || unknownQueue((String) record.getAttributes().get(Binding.QUEUE)))) + { + entry.setValue(null); + } + else + { + getNextUpgrader().configuredObject(record.getId(), record.getType(), record.getAttributes()); + } + } + getNextUpgrader().complete(); + } + + private boolean unknownExchange(final String exchangeIdString) + { + UUID exchangeId = UUID.fromString(exchangeIdString); + ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId); + return !((localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName())) + || _exchangeRegistry.getExchange(exchangeId) != null); + } + + private boolean unknownQueue(final String queueIdString) + { + UUID queueId = UUID.fromString(queueIdString); + ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId); + return !((localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName())) + || _virtualHost.getQueueRegistry().getQueue(queueId) != null); + } + + private boolean isBinding(final String type) + { + return Binding.class.getSimpleName().equals(type); + } + + + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java new file mode 100644 index 0000000000..702f6e1bdf --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.Map; +import java.util.UUID; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; +import org.apache.qpid.server.store.UnresolvedDependency; +import org.apache.qpid.server.store.UnresolvedObject; + +public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<Exchange> +{ + private final ExchangeRegistry _exchangeRegistry; + private final ExchangeFactory _exchangeFactory; + + public ExchangeRecoverer(final ExchangeRegistry exchangeRegistry, final ExchangeFactory exchangeFactory) + { + _exchangeRegistry = exchangeRegistry; + _exchangeFactory = exchangeFactory; + } + + @Override + public String getType() + { + return org.apache.qpid.server.model.Exchange.class.getSimpleName(); + } + + @Override + public UnresolvedObject<Exchange> createUnresolvedObject(final UUID id, + final String type, + final Map<String, Object> attributes) + { + return new UnresolvedExchange(id, attributes); + } + + private class UnresolvedExchange implements UnresolvedObject<Exchange> + { + private Exchange _exchange; + + public UnresolvedExchange(final UUID id, + final Map<String, Object> attributeMap) + { + String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME); + String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE); + String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY); + boolean autoDelete = lifeTimePolicy == null + || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE; + try + { + _exchange = _exchangeRegistry.getExchange(id); + if(_exchange == null) + { + _exchange = _exchangeRegistry.getExchange(exchangeName); + } + if (_exchange == null) + { + _exchange = _exchangeFactory.createExchange(id, exchangeName, exchangeType, true, autoDelete); + _exchangeRegistry.registerExchange(_exchange); + } + } + catch (AMQException e) + { + throw new RuntimeException("Error recovering exchange uuid " + id + " name " + exchangeName, e); + } + } + + @Override + public UnresolvedDependency[] getUnresolvedDependencies() + { + return new UnresolvedDependency[0]; + } + + @Override + public Exchange resolve() + { + return _exchange; + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java new file mode 100644 index 0000000000..4e06cf3202 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.Map; +import java.util.UUID; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; +import org.apache.qpid.server.store.UnresolvedDependency; +import org.apache.qpid.server.store.UnresolvedObject; + +public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQQueue> +{ + private static final Logger _logger = Logger.getLogger(QueueRecoverer.class); + private final VirtualHost _virtualHost; + private final ExchangeRegistry _exchangeRegistry; + + public QueueRecoverer(final VirtualHost virtualHost, final ExchangeRegistry exchangeRegistry) + { + _virtualHost = virtualHost; + _exchangeRegistry = exchangeRegistry; + } + + @Override + public String getType() + { + return Queue.class.getSimpleName(); + } + + @Override + public UnresolvedObject<AMQQueue> createUnresolvedObject(final UUID id, + final String type, + final Map<String, Object> attributes) + { + return new UnresolvedQueue(id, type, attributes); + } + + private class UnresolvedQueue implements UnresolvedObject<AMQQueue> + { + private AMQQueue _queue; + + public UnresolvedQueue(final UUID id, + final String type, + final Map<String, Object> attributeMap) + { + String queueName = (String) attributeMap.get(Queue.NAME); + String owner = (String) attributeMap.get(Queue.OWNER); + boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE); + UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE)); + @SuppressWarnings("unchecked") + Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS); + try + { + _queue = _virtualHost.getQueueRegistry().getQueue(id); + if(_queue == null) + { + _queue = _virtualHost.getQueueRegistry().getQueue(queueName); + } + + if (_queue == null) + { + _queue = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost, + queueArgumentsMap); + _virtualHost.getQueueRegistry().registerQueue(_queue); + + if (alternateExchangeId != null) + { + Exchange altExchange = _exchangeRegistry.getExchange(alternateExchangeId); + if (altExchange == null) + { + _logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id); + return; + } + _queue.setAlternateExchange(altExchange); + } + } + } + catch (AMQException e) + { + throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e); + } + } + + @Override + public UnresolvedDependency[] getUnresolvedDependencies() + { + return new UnresolvedDependency[0]; + } + + @Override + public AMQQueue resolve() + { + return _queue; + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java index b34444cb4c..143bdce85b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -23,6 +23,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreCreator; @@ -68,7 +69,7 @@ public class StandardVirtualHost extends AbstractVirtualHost final MessageStoreLogSubject - storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName()); + storeLogSubject = new MessageStoreLogSubject(getName(), messageStore.getClass().getSimpleName()); OperationalLoggingListener.listen(messageStore, storeLogSubject); return messageStore; @@ -96,10 +97,12 @@ public class StandardVirtualHost extends AbstractVirtualHost _durableConfigurationStore = initialiseConfigurationStore(virtualHost); - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory()); - - _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, virtualHost); + DurableConfigurationRecoverer configRecoverer = + new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), + new DefaultUpgraderProvider(this, getExchangeRegistry())); + _durableConfigurationStore.configureConfigStore(getName(), configRecoverer, virtualHost); + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory()); _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler); initialiseModel(hostConfig); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 605bbe5f45..3738306f6a 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -20,11 +20,7 @@ */ package org.apache.qpid.server.virtualhost; -import java.io.DataInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; import java.util.TreeMap; import java.util.UUID; @@ -32,28 +28,16 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.FilterSupport; -import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StoredMessage; @@ -65,11 +49,8 @@ import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.transport.Xid; import org.apache.qpid.transport.util.Functions; -import org.apache.qpid.util.ByteBufferInputStream; -import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; - -public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler, +public class VirtualHostConfigRecoveryHandler implements MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, TransactionLogRecoveryHandler, @@ -84,15 +65,11 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>(); private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); - private final Map<String, Map<UUID, Map<String, Object>>> _configuredObjects = new HashMap<String, Map<UUID, Map<String, Object>>>(); - private final ExchangeRegistry _exchangeRegistry; private final ExchangeFactory _exchangeFactory; private MessageStoreLogSubject _logSubject; private MessageStore _store; - private int _currentConfigVersion; - private DurableConfigurationStore _configStore; public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, ExchangeRegistry exchangeRegistry, @@ -103,76 +80,14 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa _exchangeFactory = exchangeFactory; } - @Override - public void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion) - { - _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName()); - _configStore = store; - _currentConfigVersion = configVersion; - CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_START()); - } - public VirtualHostConfigRecoveryHandler begin(MessageStore store) { - _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName()); + _logSubject = new MessageStoreLogSubject(_virtualHost.getName(), store.getClass().getSimpleName()); _store = store; CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); return this; } - public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId) - { - try - { - AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName); - - if (q == null) - { - q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost, - FieldTable.convertToMap(arguments)); - _virtualHost.getQueueRegistry().registerQueue(q); - - if (alternateExchangeId != null) - { - Exchange altExchange = _exchangeRegistry.getExchange(alternateExchangeId); - if (altExchange == null) - { - _logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id); - return; - } - q.setAlternateExchange(altExchange); - } - } - - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true)); - - //Record that we have a queue for recovery - _queueRecoveries.put(queueName, 0); - } - catch (AMQException e) - { - throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e); - } - } - - public void exchange(UUID id, String exchangeName, String type, boolean autoDelete) - { - try - { - Exchange exchange; - exchange = _exchangeRegistry.getExchange(exchangeName); - if (exchange == null) - { - exchange = _exchangeFactory.createExchange(id, exchangeName, type, true, autoDelete); - _exchangeRegistry.registerExchange(exchange); - } - } - catch (AMQException e) - { - throw new RuntimeException("Error recovering exchange uuid " + id + " name " + exchangeName, e); - } - } - public StoredMessageRecoveryHandler begin() { return this; @@ -347,56 +262,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); } - private void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf) - { - try - { - Exchange exchange = _exchangeRegistry.getExchange(exchangeId); - if (exchange == null) - { - _logger.error("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId); - return; - } - - AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId); - if (queue == null) - { - _logger.error("Unknown queue id " + queueId + ", cannot be bound to exchange: " + exchange.getName()); - } - else - { - FieldTable argumentsFT = null; - if(buf != null) - { - try - { - argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit()); - } - catch (IOException e) - { - throw new RuntimeException("IOException should not be thrown here", e); - } - } - - Map<String, Object> argumentMap = FieldTable.convertToMap(argumentsFT); - - if(exchange.getBinding(bindingKey, queue, argumentMap) == null) - { - - _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queue.getName() - + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")"); - - exchange.restoreBinding(bindingId, bindingKey, queue, argumentMap); - } - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - - } - public void complete() { } @@ -478,201 +343,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa return this; } - @Override - public void configuredObject(UUID id, String type, Map<String, Object> attributes) - { - Map<UUID, Map<String, Object>> typeMap = _configuredObjects.get(type); - if(typeMap == null) - { - typeMap = new HashMap<UUID, Map<String, Object>>(); - _configuredObjects.put(type,typeMap); - } - typeMap.put(id, attributes); - } - - @Override - public int completeConfigurationRecovery() - { - if(CURRENT_CONFIG_VERSION !=_currentConfigVersion) - { - try - { - upgrade(); - } - catch (AMQStoreException e) - { - throw new IllegalArgumentException("Unable to upgrade configuration from version " + _currentConfigVersion + " to version " + CURRENT_CONFIG_VERSION); - } - } - - Map<UUID, Map<String, Object>> exchangeObjects = - _configuredObjects.remove(org.apache.qpid.server.model.Exchange.class.getName()); - - if(exchangeObjects != null) - { - recoverExchanges(exchangeObjects); - } - - Map<UUID, Map<String, Object>> queueObjects = - _configuredObjects.remove(org.apache.qpid.server.model.Queue.class.getName()); - - if(queueObjects != null) - { - recoverQueues(queueObjects); - } - - - Map<UUID, Map<String, Object>> bindingObjects = - _configuredObjects.remove(Binding.class.getName()); - - if(bindingObjects != null) - { - recoverBindings(bindingObjects); - } - - - CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE()); - - return CURRENT_CONFIG_VERSION; - } - - private void upgrade() throws AMQStoreException - { - - Map<UUID, String> updates = new HashMap<UUID, String>(); - - final String bindingType = Binding.class.getName(); - - switch(_currentConfigVersion) - { - case 0: - Map<UUID, Map<String, Object>> bindingObjects = - _configuredObjects.get(bindingType); - if(bindingObjects != null) - { - for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingObjects.entrySet()) - { - Map<String, Object> binding = bindingEntry.getValue(); - - if(hasSelectorArguments(binding) && !isTopicExchange(binding)) - { - binding = new LinkedHashMap<String, Object>(binding); - removeSelectorArguments(binding); - bindingEntry.setValue(binding); - - updates.put(bindingEntry.getKey(), bindingType); - } - } - } - case CURRENT_CONFIG_VERSION: - if(!updates.isEmpty()) - { - ConfiguredObjectRecord[] updateRecords = new ConfiguredObjectRecord[updates.size()]; - int i = 0; - for(Map.Entry<UUID, String> update : updates.entrySet()) - { - updateRecords[i++] = new ConfiguredObjectRecord(update.getKey(), update.getValue(), _configuredObjects.get(update.getValue()).get(update.getKey())); - } - _configStore.update(updateRecords); - } - break; - default: - throw new IllegalStateException("Unknown configuration model version: " + _currentConfigVersion + ". Are you attempting to run an older instance against an upgraded configuration?"); - } - } - - private void removeSelectorArguments(Map<String, Object> binding) - { - @SuppressWarnings("unchecked") - Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS)); - - FilterSupport.removeFilters(arguments); - binding.put(Binding.ARGUMENTS, arguments); - } - - private boolean isTopicExchange(Map<String, Object> binding) - { - UUID exchangeId = UUID.fromString((String)binding.get(Binding.EXCHANGE)); - final - Map<UUID, Map<String, Object>> exchanges = - _configuredObjects.get(org.apache.qpid.server.model.Exchange.class.getName()); - - if(exchanges != null && exchanges.containsKey(exchangeId)) - { - return "topic".equals(exchanges.get(exchangeId).get(org.apache.qpid.server.model.Exchange.TYPE)); - } - else - { - return _exchangeRegistry.getExchange(exchangeId) != null - && _exchangeRegistry.getExchange(exchangeId).getType() == TopicExchange.TYPE; - } - - } - - private boolean hasSelectorArguments(Map<String, Object> binding) - { - @SuppressWarnings("unchecked") - Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS); - return (arguments != null) && FilterSupport.argumentsContainFilter(arguments); - } - - private void recoverExchanges(Map<UUID, Map<String, Object>> exchangeObjects) - { - for(Map.Entry<UUID, Map<String,Object>> entry : exchangeObjects.entrySet()) - { - Map<String,Object> attributeMap = entry.getValue(); - String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME); - String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE); - String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY); - boolean autoDelete = lifeTimePolicy == null - || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE; - exchange(entry.getKey(), exchangeName, exchangeType, autoDelete); - } - } - - private void recoverQueues(Map<UUID, Map<String, Object>> queueObjects) - { - for(Map.Entry<UUID, Map<String,Object>> entry : queueObjects.entrySet()) - { - Map<String,Object> attributeMap = entry.getValue(); - - String queueName = (String) attributeMap.get(Queue.NAME); - String owner = (String) attributeMap.get(Queue.OWNER); - boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE); - UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE)); - @SuppressWarnings("unchecked") - Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS); - FieldTable arguments = null; - if (queueArgumentsMap != null) - { - arguments = FieldTable.convertToFieldTable(queueArgumentsMap); - } - queue(entry.getKey(), queueName, owner, exclusive, arguments, alternateExchangeId); - } - } - - private void recoverBindings(Map<UUID, Map<String, Object>> bindingObjects) - { - for(Map.Entry<UUID, Map<String,Object>> entry : bindingObjects.entrySet()) - { - Map<String,Object> attributeMap = entry.getValue(); - UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE)); - UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE)); - String bindingName = (String) attributeMap.get(Binding.NAME); - - @SuppressWarnings("unchecked") - Map<String, Object> bindingArgumentsMap = (Map<String, Object>) attributeMap.get(Binding.ARGUMENTS); - FieldTable arguments = null; - if (bindingArgumentsMap != null) - { - arguments = FieldTable.convertToFieldTable(bindingArgumentsMap); - } - ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes())); - - binding(entry.getKey(), exchangeId, queueId, bindingName, argumentsBB); - } - } - private static class DummyMessage implements EnqueableMessage { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java index 65fd249d03..3d43ef0f44 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java @@ -37,7 +37,8 @@ public class MessageStoreLogSubjectTest extends AbstractTestLogSubject _testVhost = BrokerTestHelper.createVirtualHost("test"); - _subject = new MessageStoreLogSubject(_testVhost, _testVhost.getMessageStore().getClass().getSimpleName()); + _subject = new MessageStoreLogSubject(_testVhost.getName(), + _testVhost.getMessageStore().getClass().getSimpleName()); } @Override diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index ab1a0f7d0c..67cf0780da 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -56,6 +56,11 @@ import org.apache.qpid.util.FileUtils; public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase { private static final String EXCHANGE_NAME = "exchangeName"; + + private static final String EXCHANGE = org.apache.qpid.server.model.Exchange.class.getSimpleName(); + private static final String BINDING = org.apache.qpid.server.model.Binding.class.getSimpleName(); + private static final String QUEUE = Queue.class.getSimpleName(); + private String _storePath; private String _storeName; private MessageStore _messageStore; @@ -134,7 +139,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest DurableConfigurationStoreHelper.createExchange(_configStore, exchange); reopenStore(); - verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(org.apache.qpid.server.model.Exchange.class.getName()), + verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE), eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(), org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString()))); @@ -186,7 +191,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY); map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,FieldTable.convertToMap(_bindingArgs)); - verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(org.apache.qpid.server.model.Binding.class.getName()), + verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(BINDING), eq(map)); } @@ -201,7 +206,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest reopenStore(); verify(_recoveryHandler, never()).configuredObject(any(UUID.class), - eq(org.apache.qpid.server.model.Binding.class.getName()), + eq(BINDING), anyMap()); } @@ -215,7 +220,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } public void testCreateQueueAMQQueueFieldTable() throws Exception @@ -238,7 +243,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); queueAttributes.put(Queue.ARGUMENTS, attributes); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception @@ -256,7 +261,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } private Exchange createTestAlternateExchange() @@ -292,7 +297,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); queueAttributes.put(Queue.ARGUMENTS, attributes); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } @@ -323,7 +328,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.ARGUMENTS, attributes); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } public void testRemoveQueue() throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java new file mode 100644 index 0000000000..ccc7f6a697 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -0,0 +1,376 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.HeadersExchange; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.DurableConfigurationRecoverer; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; +import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; + +public class DurableConfigurationRecovererTest extends QpidTestCase +{ + private static final UUID QUEUE_ID = new UUID(0,0); + private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1); + private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2); + private static final String CUSTOM_EXCHANGE_NAME = "customExchange"; + + private DurableConfigurationRecoverer _durableConfigurationRecoverer; + private Exchange _directExchange; + private Exchange _topicExchange; + private VirtualHost _vhost; + private DurableConfigurationStore _store; + private ExchangeFactory _exchangeFactory; + private ExchangeRegistry _exchangeRegistry; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + + _directExchange = mock(Exchange.class); + when(_directExchange.getType()).thenReturn(DirectExchange.TYPE); + + + _topicExchange = mock(Exchange.class); + when(_topicExchange.getType()).thenReturn(TopicExchange.TYPE); + + AMQQueue queue = mock(AMQQueue.class); + + _vhost = mock(VirtualHost.class); + + _exchangeRegistry = mock(ExchangeRegistry.class); + when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); + when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); + + QueueRegistry queueRegistry = mock(QueueRegistry.class); + when(_vhost.getQueueRegistry()).thenReturn(queueRegistry); + + when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); + + _exchangeFactory = mock(ExchangeFactory.class); + + DurableConfiguredObjectRecoverer[] recoverers = { + new QueueRecoverer(_vhost, _exchangeRegistry), + new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory), + new BindingRecoverer(_vhost, _exchangeRegistry) + }; + + final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>(); + for(DurableConfiguredObjectRecoverer recoverer : recoverers) + { + recovererMap.put(recoverer.getType(), recoverer); + } + _durableConfigurationRecoverer = + new DurableConfigurationRecoverer(_vhost.getName(), recovererMap, + new DefaultUpgraderProvider(_vhost, _exchangeRegistry)); + + _store = mock(DurableConfigurationStore.class); + + CurrentActor.set(mock(LogActor.class)); + } + + public void testUpgradeEmptyStore() throws Exception + { + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + assertEquals("Did not upgrade to the expected version", + CURRENT_CONFIG_VERSION, + _durableConfigurationRecoverer.completeConfigurationRecovery()); + } + + public void testUpgradeNewerStoreFails() throws Exception + { + try + { + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION + 1); + _durableConfigurationRecoverer.completeConfigurationRecovery(); + fail("Should not be able to start when config model is newer than current"); + } + catch (IllegalStateException e) + { + // pass + } + } + + public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception + { + + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + DIRECT_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); + + final ConfiguredObjectRecord[] expected = { + new ConfiguredObjectRecord(new UUID(1, 0), "Binding", + createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID)) + }; + + verifyCorrectUpdates(expected); + + _durableConfigurationRecoverer.completeConfigurationRecovery(); + } + + + + public void testUpgradeOnlyRemovesSelectorBindings() throws Exception + { + + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + DIRECT_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble", + "not-a-selector", + "moo")); + + + final UUID customExchangeId = new UUID(3,0); + + _durableConfigurationRecoverer.configuredObject(new UUID(2, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + customExchangeId, + QUEUE_ID, + "x-filter-jms-selector", + "wibble", + "not-a-selector", + "moo")); + + _durableConfigurationRecoverer.configuredObject(customExchangeId, + "org.apache.qpid.server.model.Exchange", + createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE)); + + final Exchange customExchange = mock(Exchange.class); + + when(_exchangeFactory.createExchange(eq(customExchangeId), + eq(CUSTOM_EXCHANGE_NAME), + eq(HeadersExchange.TYPE.getType()), + anyBoolean(), + anyBoolean())).thenReturn(customExchange); + doAnswer(new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + when(_exchangeRegistry.getExchange(eq(customExchangeId))).thenReturn(customExchange); + return null; + } + }).when(_exchangeRegistry).registerExchange(customExchange); + + final ConfiguredObjectRecord[] expected = { + new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", + createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")), + new ConfiguredObjectRecord(new UUID(2, 0), "org.apache.qpid.server.model.Binding", + createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo")) + }; + + verifyCorrectUpdates(expected); + + _durableConfigurationRecoverer.completeConfigurationRecovery(); + } + + + public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception + { + + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "org.apache.qpid.server.model.Binding", + createBinding("key", + TOPIC_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); + + final ConfiguredObjectRecord[] expected = { + new ConfiguredObjectRecord(new UUID(1, 0), "Binding", + createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")) + }; + + verifyCorrectUpdates(expected); + + _durableConfigurationRecoverer.completeConfigurationRecovery(); + } + + public void testUpgradeDoesNotRecur() throws Exception + { + + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "Binding", + createBinding("key", + DIRECT_EXCHANGE_ID, + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); + + doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class)); + + _durableConfigurationRecoverer.completeConfigurationRecovery(); + } + + public void testFailsWithUnresolvedObjects() + { + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + + + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "Binding", + createBinding("key", + new UUID(3,0), + QUEUE_ID, + "x-filter-jms-selector", + "wibble")); + + try + { + _durableConfigurationRecoverer.completeConfigurationRecovery(); + fail("Expected resolution to fail due to unknown object"); + } + catch(IllegalConfigurationException e) + { + assertEquals("Durable configuration has unresolved dependencies", e.getMessage()); + } + + } + + public void testFailsWithUnknownObjectType() + { + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + + + try + { + final Map<String, Object> emptyArguments = Collections.emptyMap(); + _durableConfigurationRecoverer.configuredObject(new UUID(1, 0), + "Wibble", emptyArguments); + _durableConfigurationRecoverer.completeConfigurationRecovery(); + fail("Expected resolution to fail due to unknown object type"); + } + catch(IllegalConfigurationException e) + { + assertEquals("Unkown type for configured object: Wibble", e.getMessage()); + } + + + } + + private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException + { + doAnswer(new Answer() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + Object[] args = invocation.getArguments(); + assertEquals("Updated records are not as expected", new HashSet(Arrays.asList( + expected)), new HashSet(Arrays.asList(args))); + + return null; + } + }).when(_store).update(any(ConfiguredObjectRecord[].class)); + } + + private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args) + { + Map<String, Object> binding = new LinkedHashMap<String, Object>(); + + binding.put("name", bindingKey); + binding.put(Binding.EXCHANGE, exchangeId.toString()); + binding.put(Binding.QUEUE, queueId.toString()); + Map<String,String> argumentMap = new LinkedHashMap<String, String>(); + if(args != null && args.length != 0) + { + String key = null; + for(String arg : args) + { + if(key == null) + { + key = arg; + } + else + { + argumentMap.put(key, arg); + key = null; + } + } + } + binding.put(Binding.ARGUMENTS, argumentMap); + return binding; + } + + + private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type) + { + Map<String, Object> exchange = new LinkedHashMap<String, Object>(); + + exchange.put(org.apache.qpid.server.model.Exchange.NAME, name); + exchange.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType()); + + return exchange; + + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java deleted file mode 100644 index ac81f5d625..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.HeadersExchange; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.test.utils.QpidTestCase; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; - -public class VirtualHostConfigRecoveryHandlerTest extends QpidTestCase -{ - private Exchange _directExchange; - private Exchange _topicExchange; - private VirtualHost _vhost; - private VirtualHostConfigRecoveryHandler _virtualHostConfigRecoveryHandler; - private DurableConfigurationStore _store; - - private static final UUID QUEUE_ID = new UUID(0,0); - private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1); - private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2); - - @Override - public void setUp() throws Exception - { - super.setUp(); - - - _directExchange = mock(Exchange.class); - when(_directExchange.getType()).thenReturn(DirectExchange.TYPE); - - - _topicExchange = mock(Exchange.class); - when(_topicExchange.getType()).thenReturn(TopicExchange.TYPE); - - AMQQueue queue = mock(AMQQueue.class); - - _vhost = mock(VirtualHost.class); - - ExchangeRegistry exchangeRegistry = mock(ExchangeRegistry.class); - when(exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); - when(exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); - - QueueRegistry queueRegistry = mock(QueueRegistry.class); - when(_vhost.getQueueRegistry()).thenReturn(queueRegistry); - - when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); - - ExchangeFactory exchangeFactory = mock(ExchangeFactory.class); - _virtualHostConfigRecoveryHandler = new VirtualHostConfigRecoveryHandler(_vhost, exchangeRegistry, exchangeFactory); - - _store = mock(DurableConfigurationStore.class); - - CurrentActor.set(mock(LogActor.class)); - } - - public void testUpgradeEmptyStore() throws Exception - { - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); - assertEquals("Did not upgrade to the expected version", CURRENT_CONFIG_VERSION, _virtualHostConfigRecoveryHandler.completeConfigurationRecovery()); - } - - public void testUpgradeNewerStoreFails() throws Exception - { - try - { - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION+1); - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); - fail("Should not be able to start when config model is newer than current"); - } - catch (IllegalStateException e) - { - // pass - } - } - - public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception - { - - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); - - final ConfiguredObjectRecord[] expected = { - new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID)) - }; - - verifyCorrectUpdates(expected); - - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); - } - - - - public void testUpgradeOnlyRemovesSelectorBindings() throws Exception - { - - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo")); - - - UUID customExchangeId = new UUID(3,0); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(2, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", customExchangeId, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo")); - - _virtualHostConfigRecoveryHandler.configuredObject(customExchangeId, - "org.apache.qpid.server.model.Exchange", - createExchange("customExchange", HeadersExchange.TYPE)); - - - - final ConfiguredObjectRecord[] expected = { - new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")), - new ConfiguredObjectRecord(new UUID(3, 0), "org.apache.qpid.server.model.Binding", - createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo")) - }; - - verifyCorrectUpdates(expected); - - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); - } - - - public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception - { - - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); - - final ConfiguredObjectRecord[] expected = { - new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", - createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")) - }; - - verifyCorrectUpdates(expected); - - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); - } - - public void testUpgradeDoesNotRecur() throws Exception - { - - _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 1); - - _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0), - "org.apache.qpid.server.model.Binding", - createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble")); - - doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class)); - - _virtualHostConfigRecoveryHandler.completeConfigurationRecovery(); - } - - private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException - { - doAnswer(new Answer() - { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable - { - Object[] args = invocation.getArguments(); - assertEquals("Updated records are not as expected", new HashSet(Arrays.asList( - expected)), new HashSet(Arrays.asList(args))); - - return null; - } - }).when(_store).update(any(ConfiguredObjectRecord[].class)); - } - - private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args) - { - Map<String, Object> binding = new LinkedHashMap<String, Object>(); - - binding.put("name", bindingKey); - binding.put(Binding.EXCHANGE, exchangeId.toString()); - binding.put(Binding.QUEUE, queueId.toString()); - Map<String,String> argumentMap = new LinkedHashMap<String, String>(); - if(args != null && args.length != 0) - { - String key = null; - for(String arg : args) - { - if(key == null) - { - key = arg; - } - else - { - argumentMap.put(key, arg); - key = null; - } - } - } - binding.put(Binding.ARGUMENTS, argumentMap); - return binding; - } - - - private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type) - { - Map<String, Object> exchange = new LinkedHashMap<String, Object>(); - - exchange.put(org.apache.qpid.server.model.Exchange.NAME, name); - exchange.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType()); - - return exchange; - - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java index 4f8a6ee54a..8822fc5373 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -466,7 +466,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio try { - // an implicit recover performed when acknowledge throws an exception due to failover + // an implicit recover performed when acknowledge throws an exception due to failover lastMessage.acknowledge(); fail("JMSException should be thrown"); } @@ -529,7 +529,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio Message lastMessage = consumeMessages(); try { - // an implicit recover performed when acknowledge throws an exception due to failover + // an implicit recover performed when acknowledge throws an exception due to failover lastMessage.acknowledge(); fail("JMSException should be thrown"); } @@ -923,9 +923,9 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio final Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity", capacity); arguments.put("x-qpid-flow-resume-capacity", resumeCapacity); - ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), true, true, false, arguments); + ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments); Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true - + "'&autodelete='" + true + "'"); + + "'&autodelete='" + false + "'"); ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue); return queue; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index b541fcc9c6..cec982c2c5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -190,6 +190,15 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } @Override + public UUID[] removeConfiguredObjects(final UUID... objects) throws AMQStoreException + { + doPreDelay("remove"); + UUID[] removed = _durableConfigurationStore.removeConfiguredObjects(objects); + doPostDelay("remove"); + return removed; + } + + @Override public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException { doPreDelay("update"); @@ -205,6 +214,14 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore doPostDelay("update"); } + @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException + { + doPreDelay("update"); + _durableConfigurationStore.update(createIfNecessary, records); + doPostDelay("update"); + } + public Transaction newTransaction() { doPreDelay("beginTran"); |
