summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-08-15 16:42:39 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-08-15 16:42:39 +0000
commit7057688d9214cffd217781db3c51abef5e227c93 (patch)
treeaf52519ecd8844b7061ae442c84dec1f83bd45ae /qpid/java
parentf203ee690d73b8f6ff19ba8b4f3f39808a1eddde (diff)
downloadqpid-python-7057688d9214cffd217781db3c51abef5e227c93.tar.gz
QPID-5073 : [Java Broker] Refactor DurableConfigurationStore recovery to allow for additional configured object children other than just Exchange/Binding/Queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1514360 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java36
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java77
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java93
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DependencyListener.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java242
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java35
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java33
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java62
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullUpgrader.java58
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java31
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/UpgraderProvider.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java177
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java227
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java102
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java121
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java11
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java334
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java21
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java376
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java266
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java17
34 files changed, 1842 insertions, 650 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index c2a530499a..2350e28ee2 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -29,6 +29,7 @@ import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@@ -809,31 +810,52 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
{
LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called");
}
- OperationStatus status = removeConfiguredObject(id);
+ OperationStatus status = removeConfiguredObject(null, id);
if (status == OperationStatus.NOTFOUND)
{
throw new AMQStoreException("Configured object of type " + type + " with id " + id + " not found");
}
}
+ @Override
+ public UUID[] removeConfiguredObjects(final UUID... objects) throws AMQStoreException
+ {
+ com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
+ Collection<UUID> removed = new ArrayList<UUID>(objects.length);
+ for(UUID id : objects)
+ {
+ if(removeConfiguredObject(txn, id) == OperationStatus.SUCCESS)
+ {
+ removed.add(id);
+ }
+ }
+
+ txn.commit();
+ return removed.toArray(new UUID[removed.size()]);
+ }
@Override
public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
- update(id, type, attributes, null);
+ update(false, id, type, attributes, null);
}
public void update(ConfiguredObjectRecord... records) throws AMQStoreException
{
+ update(false, records);
+ }
+
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException
+ {
com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
for(ConfiguredObjectRecord record : records)
{
- update(record.getId(), record.getType(), record.getAttributes(), txn);
+ update(createIfNecessary, record.getId(), record.getType(), record.getAttributes(), txn);
}
txn.commit();
}
- private void update(UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws AMQStoreException
+ private void update(boolean createIfNecessary, UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws AMQStoreException
{
if (LOGGER.isDebugEnabled())
{
@@ -851,7 +873,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
+ if (status == OperationStatus.SUCCESS || (createIfNecessary && status == OperationStatus.NOTFOUND))
{
ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
@@ -1406,14 +1428,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
}
- private OperationStatus removeConfiguredObject(UUID id) throws AMQStoreException
+ private OperationStatus removeConfiguredObject(Transaction tx, UUID id) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance();
uuidBinding.objectToEntry(id, key);
try
{
- return _configuredObjectsDb.delete(null, key);
+ return _configuredObjectsDb.delete(tx, key);
}
catch (DatabaseException e)
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index be91e4a484..b92a97c8cb 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -25,12 +25,14 @@ import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.OperationalLoggingListener;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
+import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -58,7 +60,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
_messageStore = new BDBHAMessageStore();
final MessageStoreLogSubject storeLogSubject =
- new MessageStoreLogSubject(this, _messageStore.getClass().getSimpleName());
+ new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
OperationalLoggingListener.listen(_messageStore, storeLogSubject);
_messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
@@ -71,9 +73,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
_messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE);
VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
+ DurableConfigurationRecoverer configRecoverer =
+ new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
+ new DefaultUpgraderProvider(this, getExchangeRegistry()));
_messageStore.configureConfigStore(getName(),
- recoveryHandler,
+ configRecoverer,
virtualHost);
_messageStore.configureMessageStore(getName(),
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
index 647e19d659..ece4e2275e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
@@ -180,7 +180,12 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore
return _entries.get(id);
}
- @Override
+ /**
+ * Copies the store into the given location
+ *
+ * @param target location to copy store into
+ * @throws IllegalConfigurationException if store cannot be copied into given location
+ */
public void copyTo(String copyLocation)
{
File file = new File(copyLocation);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
index 08963bd8f1..ed989d764f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
@@ -28,8 +28,10 @@ public class MessageStoreLogSubject extends AbstractLogSubject
{
/** Create an MessageStoreLogSubject that Logs in the following format. */
- public MessageStoreLogSubject(VirtualHost vhost, String messageStoreName)
+ public MessageStoreLogSubject(String vhostName, String messageStoreName)
{
- setLogStringWithFormat(STORE_FORMAT, vhost.getName(), messageStoreName);
+ setLogStringWithFormat(STORE_FORMAT, vhostName, messageStoreName);
}
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 9e32d303fb..a84a041b72 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -119,7 +119,8 @@ public interface VirtualHost extends ConfiguredObject
QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
CONFIG_PATH));
- int CURRENT_CONFIG_VERSION = 1;
+
+ int CURRENT_CONFIG_VERSION = 2;
//children
Collection<VirtualHostAlias> getAliases();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
index 7ff8c88331..9ac2cb00a3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
@@ -39,7 +39,7 @@ import org.apache.qpid.server.configuration.updater.CreateChildTask;
import org.apache.qpid.server.configuration.updater.SetAttributeTask;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
-abstract class AbstractAdapter implements ConfiguredObject
+public abstract class AbstractAdapter implements ConfiguredObject
{
private static final Object ID = "id";
private final Map<String,Object> _attributes = new HashMap<String, Object>();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index aaa6f460d2..e8bacb2712 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -190,7 +190,7 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection
}
else if(name.equals(INCOMING))
{
-
+ return true;
}
else if(name.equals(LOCAL_ADDRESS))
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java
new file mode 100644
index 0000000000..dbe8bf22a0
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Map;
+import java.util.UUID;
+
+public abstract class AbstractDurableConfiguredObjectRecoverer<T> implements DurableConfiguredObjectRecoverer
+{
+ @Override
+ public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer,
+ final UUID id,
+ final Map<String, Object> attributes)
+ {
+ final UnresolvedObject obj = createUnresolvedObject(id, getType(), attributes);
+ UnresolvedDependency[] dependencies = obj.getUnresolvedDependencies();
+ for(final UnresolvedDependency dependency : dependencies)
+ {
+ Object dep;
+ if((dep = durableConfigurationRecoverer.getResolvedObject(dependency.getType(), dependency.getId())) != null)
+ {
+ dependency.resolve(dep);
+ }
+ else
+ {
+ durableConfigurationRecoverer.addResolutionListener(dependency.getType(), dependency.getId(),
+ new DependencyListener()
+ {
+
+ @Override
+ public void dependencyResolved(final String depType,
+ final UUID depId,
+ final Object o)
+ {
+ dependency.resolve(o);
+ if(obj.getUnresolvedDependencies().length == 0)
+ {
+ durableConfigurationRecoverer.resolve(getType(), id, obj.resolve());
+ }
+ }
+ });
+ }
+ }
+ if(obj.getUnresolvedDependencies().length == 0)
+ {
+ durableConfigurationRecoverer.resolve(getType(), id, obj.resolve());
+ }
+ else
+ {
+ durableConfigurationRecoverer.addUnresolvedObject(getType(), id, obj);
+ }
+
+ }
+
+ public abstract UnresolvedObject<T> createUnresolvedObject(final UUID id,
+ final String type,
+ final Map<String, Object> attributes);
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index ae6b4b0154..e2fea8f50b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -35,6 +35,8 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -1978,15 +1980,35 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
Connection conn = newAutoCommitConnection();
try
{
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
- try
- {
- stmt.setString(1, id.toString());
- results = stmt.executeUpdate();
- }
- finally
+ results = removeConfiguredObject(id, conn);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
+ }
+ return results;
+ }
+
+ public UUID[] removeConfiguredObjects(UUID... objects) throws AMQStoreException
+ {
+ Collection<UUID> removed = new ArrayList<UUID>(objects.length);
+ try
+ {
+
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ for(UUID id : objects)
{
- stmt.close();
+ if(removeConfiguredObject(id, conn) != 0)
+ {
+ removed.add(id);
+ }
}
}
finally
@@ -1996,7 +2018,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
catch (SQLException e)
{
- throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
+ throw new AMQStoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e);
+ }
+ return removed.toArray(new UUID[removed.size()]);
+ }
+
+ private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException
+ {
+ final int results;PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt.setString(1, id.toString());
+ results = stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
}
return results;
}
@@ -2010,7 +2047,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
Connection conn = newAutoCommitConnection();
try
{
- updateConfiguredObject(configuredObject, conn);
+ updateConfiguredObject(configuredObject, false, conn);
}
finally
{
@@ -2027,6 +2064,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void update(ConfiguredObjectRecord... records) throws AMQStoreException
{
+ update(false, records);
+ }
+
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException
+ {
if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING))
{
try
@@ -2036,7 +2078,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
for(ConfiguredObjectRecord record : records)
{
- updateConfiguredObject(record, conn);
+ updateConfiguredObject(record, createIfNecessary, conn);
}
conn.commit();
}
@@ -2054,7 +2096,9 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, Connection conn)
+ private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
+ boolean createIfNecessary,
+ Connection conn)
throws SQLException, AMQStoreException
{
PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
@@ -2089,6 +2133,31 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
stmt2.close();
}
}
+ else if(createIfNecessary)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
}
finally
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DependencyListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DependencyListener.java
new file mode 100644
index 0000000000..120c904cf7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DependencyListener.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.UUID;
+
+interface DependencyListener
+{
+ void dependencyResolved(String type, UUID id, Object o);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
new file mode 100644
index 0000000000..7fb80bde96
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
@@ -0,0 +1,242 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+
+import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
+
+public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandler
+{
+ private static final Logger _logger = Logger.getLogger(DurableConfigurationRecoverer.class);
+
+ private final Map<String, Map<UUID, Object>> _resolvedObjects = new HashMap<String, Map<UUID, Object>>();
+
+ private final Map<String, Map<UUID, UnresolvedObject>> _unresolvedObjects =
+ new HashMap<String, Map<UUID, UnresolvedObject>>();
+
+ private final Map<String, Map<UUID, List<DependencyListener>>> _dependencyListeners =
+ new HashMap<String, Map<UUID, List<DependencyListener>>>();
+ private final Map<String, DurableConfiguredObjectRecoverer> _recoverers;
+ private final UpgraderProvider _upgraderProvider;
+
+ private DurableConfigurationStoreUpgrader _upgrader;
+
+ private DurableConfigurationStore _store;
+ private final String _name;
+
+ private MessageStoreLogSubject _logSubject;
+
+ public DurableConfigurationRecoverer(final String name,
+ Map<String, DurableConfiguredObjectRecoverer> recoverers,
+ UpgraderProvider upgraderProvider)
+ {
+ _recoverers = recoverers;
+ _name = name;
+ _upgraderProvider = upgraderProvider;
+ }
+
+ @Override
+ public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ {
+ _logSubject = new MessageStoreLogSubject(_name, store.getClass().getSimpleName());
+
+ _store = store;
+ _upgrader = _upgraderProvider.getUpgrader(configVersion, this);
+ }
+
+ @Override
+ public void configuredObject(final UUID id, final String type, final Map<String, Object> attributes)
+ {
+ _upgrader.configuredObject(id, type, attributes);
+ }
+
+ void onConfiguredObject(final UUID id, final String type, final Map<String, Object> attributes)
+ {
+ DurableConfiguredObjectRecoverer recoverer = getRecoverer(type);
+ if(recoverer == null)
+ {
+ throw new IllegalConfigurationException("Unkown type for configured object: " + type);
+ }
+ recoverer.load(this, id, attributes);
+ }
+
+ private DurableConfiguredObjectRecoverer getRecoverer(final String type)
+ {
+ DurableConfiguredObjectRecoverer recoverer = _recoverers.get(type);
+ return recoverer;
+ }
+
+ @Override
+ public int completeConfigurationRecovery()
+ {
+ _upgrader.complete();
+ checkUnresolvedDependencies();
+ applyUpgrade();
+
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+ return CURRENT_CONFIG_VERSION;
+ }
+
+ private void applyUpgrade()
+ {
+
+ final Collection<ConfiguredObjectRecord> updates = new ArrayList<ConfiguredObjectRecord>();
+ final Collection<UUID> deletes = new ArrayList<UUID>();
+ for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _upgrader.getUpdatedRecords().entrySet())
+ {
+ if(entry.getValue() != null)
+ {
+ updates.add(entry.getValue());
+ }
+ else
+ {
+ deletes.add(entry.getKey());
+ }
+ }
+
+ try
+ {
+ if(!updates.isEmpty())
+ {
+ _store.update(true,updates.toArray(new ConfiguredObjectRecord[updates.size()]));
+ }
+ if(!deletes.isEmpty())
+ {
+ _store.removeConfiguredObjects(deletes.toArray(new UUID[deletes.size()]));
+ }
+ }
+ catch (AMQStoreException e)
+ {
+ // TODO better exception
+ throw new RuntimeException("Unable to update config store when upgrading");
+ }
+
+ }
+
+ private void checkUnresolvedDependencies()
+ {
+ if(_unresolvedObjects != null && !_unresolvedObjects.isEmpty())
+ {
+ boolean unresolvedObjectsExist = false;
+ for(Map.Entry<String, Map<UUID, UnresolvedObject>>entry : _unresolvedObjects.entrySet())
+ {
+ for(Map.Entry<UUID,UnresolvedObject> obj : entry.getValue().entrySet())
+ {
+ unresolvedObjectsExist = true;
+ StringBuilder errorMessage = new StringBuilder("Durable configured object of type ");
+ errorMessage.append(entry.getKey()).append(" with id ").append(obj.getKey())
+ .append(" has unresolved dependencies: ");
+ for(UnresolvedDependency dep : obj.getValue().getUnresolvedDependencies())
+ {
+ errorMessage.append(dep.getType()).append(" with id ").append(dep.getId()).append("; ");
+ }
+ _logger.error(errorMessage);
+ }
+ }
+ if(unresolvedObjectsExist)
+ {
+ throw new IllegalConfigurationException("Durable configuration has unresolved dependencies");
+ }
+ }
+ }
+
+ void addResolutionListener(final String type,
+ final UUID id,
+ final DependencyListener dependencyListener)
+ {
+ Map<UUID, List<DependencyListener>> typeListeners = _dependencyListeners.get(type);
+ if(typeListeners == null)
+ {
+ typeListeners = new HashMap<UUID, List<DependencyListener>>();
+ _dependencyListeners.put(type, typeListeners);
+ }
+ List<DependencyListener> objectListeners = typeListeners.get(id);
+ if(objectListeners == null)
+ {
+ objectListeners = new ArrayList<DependencyListener>();
+ typeListeners.put(id, objectListeners);
+ }
+ objectListeners.add(dependencyListener);
+
+ }
+
+ Object getResolvedObject(final String type, final UUID id)
+ {
+ Map<UUID, Object> objects = _resolvedObjects.get(type);
+ return objects == null ? null : objects.get(id);
+ }
+
+ void resolve(final String type, final UUID id, final Object object)
+ {
+ Map<UUID, Object> typeObjects = _resolvedObjects.get(type);
+ if(typeObjects == null)
+ {
+ typeObjects = new HashMap<UUID, Object>();
+ _resolvedObjects.put(type, typeObjects);
+ }
+ typeObjects.put(id, object);
+ Map<UUID, UnresolvedObject> unresolved = _unresolvedObjects.get(type);
+ if(unresolved != null)
+ {
+ unresolved.remove(id);
+ }
+
+ Map<UUID, List<DependencyListener>> typeListeners = _dependencyListeners.get(type);
+ if(typeListeners != null)
+ {
+ List<DependencyListener> listeners = typeListeners.remove(id);
+ if(listeners != null)
+ {
+ for(DependencyListener listener : listeners)
+ {
+ listener.dependencyResolved(type, id, object);
+ }
+ }
+ }
+ }
+
+ void addUnresolvedObject(final String type,
+ final UUID id,
+ final UnresolvedObject obj)
+ {
+ Map<UUID, UnresolvedObject> typeObjects = _unresolvedObjects.get(type);
+ if(typeObjects == null)
+ {
+ typeObjects = new HashMap<UUID, UnresolvedObject>();
+ _unresolvedObjects.put(type, typeObjects);
+ }
+ typeObjects.put(id, obj);
+ }
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 7ce761af18..08f3d83c4e 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -55,7 +55,6 @@ public interface DurableConfigurationStore
VirtualHost virtualHost) throws Exception;
-
/**
* Makes the specified object persistent.
*
@@ -77,6 +76,8 @@ public interface DurableConfigurationStore
*/
void remove(UUID id, String type) throws AMQStoreException;
+ public UUID[] removeConfiguredObjects(UUID... objects) throws AMQStoreException;
+
/**
* Updates the specified object in the persistent store, IF it is already present. If the object
@@ -92,6 +93,7 @@ public interface DurableConfigurationStore
public void update(ConfiguredObjectRecord... records) throws AMQStoreException;
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
index 9fab29fea6..efb1e95e99 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
@@ -36,6 +36,10 @@ import org.apache.qpid.server.queue.AMQQueue;
public class DurableConfigurationStoreHelper
{
+ private static final String BINDING = Binding.class.getSimpleName();
+ private static final String EXCHANGE = Exchange.class.getSimpleName();
+ private static final String QUEUE = Queue.class.getSimpleName();
+
public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
{
Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
@@ -60,7 +64,7 @@ public class DurableConfigurationStoreHelper
{
attributesMap.put(Queue.ARGUMENTS, queue.getArguments());
}
- store.update(queue.getId(), Queue.class.getName(), attributesMap);
+ store.update(queue.getId(), QUEUE, attributesMap);
}
public static void createQueue(DurableConfigurationStore store, AMQQueue queue, FieldTable arguments)
@@ -80,12 +84,12 @@ public class DurableConfigurationStoreHelper
{
attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments));
}
- store.create(queue.getId(),Queue.class.getName(),attributesMap);
+ store.create(queue.getId(), QUEUE,attributesMap);
}
public static void removeQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
{
- store.remove(queue.getId(), Queue.class.getName());
+ store.remove(queue.getId(), QUEUE);
}
public static void createExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange)
@@ -96,7 +100,7 @@ public class DurableConfigurationStoreHelper
attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString()));
attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name()
: LifetimePolicy.PERMANENT.name());
- store.create(exchange.getId(), Exchange.class.getName(), attributesMap);
+ store.create(exchange.getId(), EXCHANGE, attributesMap);
}
@@ -104,7 +108,7 @@ public class DurableConfigurationStoreHelper
public static void removeExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange)
throws AMQStoreException
{
- store.remove(exchange.getId(),Exchange.class.getName());
+ store.remove(exchange.getId(), EXCHANGE);
}
public static void createBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding)
@@ -119,14 +123,14 @@ public class DurableConfigurationStoreHelper
{
attributesMap.put(Binding.ARGUMENTS, arguments);
}
- store.create(binding.getId(), Binding.class.getName(), attributesMap);
+ store.create(binding.getId(), BINDING, attributesMap);
}
public static void removeBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding)
throws AMQStoreException
{
- store.remove(binding.getId(), Binding.class.getName());
+ store.remove(binding.getId(), BINDING);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java
new file mode 100644
index 0000000000..1d3e4cc672
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreUpgrader.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Map;
+import java.util.UUID;
+
+public interface DurableConfigurationStoreUpgrader
+{
+ void configuredObject(UUID id, String type, Map<String, Object> attributes);
+
+ void complete();
+
+ void setNextUpgrader(DurableConfigurationStoreUpgrader upgrader);
+
+ Map<UUID, ConfiguredObjectRecord> getUpdatedRecords();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java
new file mode 100644
index 0000000000..e065728bd3
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Map;
+import java.util.UUID;
+
+public interface DurableConfiguredObjectRecoverer
+{
+ public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer,
+ final UUID id,
+ final Map<String, Object> attributes);
+
+ public String getType();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java
new file mode 100644
index 0000000000..a671e93b26
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NonNullUpgrader.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public abstract class NonNullUpgrader implements DurableConfigurationStoreUpgrader
+{
+ private DurableConfigurationStoreUpgrader _nextUpgrader;
+ private final Map<UUID, ConfiguredObjectRecord> _updates = new HashMap<UUID, ConfiguredObjectRecord>();
+
+ public final void setNextUpgrader(final DurableConfigurationStoreUpgrader upgrader)
+ {
+ if(_nextUpgrader == null)
+ {
+ _nextUpgrader = upgrader;
+ }
+ else
+ {
+ _nextUpgrader.setNextUpgrader(upgrader);
+ }
+ }
+
+ protected DurableConfigurationStoreUpgrader getNextUpgrader()
+ {
+ return _nextUpgrader;
+ }
+
+ protected Map<UUID, ConfiguredObjectRecord> getUpdateMap()
+ {
+ return _updates;
+ }
+
+ @Override
+ public final Map<UUID, ConfiguredObjectRecord> getUpdatedRecords()
+ {
+ final Map<UUID, ConfiguredObjectRecord> updates = new HashMap<UUID, ConfiguredObjectRecord>(_updates);
+ updates.putAll(_nextUpgrader.getUpdatedRecords());
+ return updates;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 078a0d3752..9eb0d85914 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -44,11 +44,23 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ }
+
+
+ @Override
public void remove(UUID id, String type)
{
}
@Override
+ public UUID[] removeConfiguredObjects(final UUID... objects) throws AMQStoreException
+ {
+ return objects;
+ }
+
+ @Override
public void create(UUID id, String type, Map<String, Object> attributes)
{
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullUpgrader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullUpgrader.java
new file mode 100644
index 0000000000..c8a812fa89
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullUpgrader.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+
+public final class NullUpgrader implements DurableConfigurationStoreUpgrader
+{
+ private DurableConfigurationRecoverer _durableConfigurationRecoverer;
+
+ public NullUpgrader(final DurableConfigurationRecoverer durableConfigurationRecoverer)
+ {
+ _durableConfigurationRecoverer = durableConfigurationRecoverer;
+ }
+
+ @Override
+ public void configuredObject(final UUID id, final String type, final Map<String, Object> attributes)
+ {
+ _durableConfigurationRecoverer.onConfiguredObject(id, type, attributes);
+ }
+
+ @Override
+ public void complete()
+ {
+ }
+
+ @Override
+ public void setNextUpgrader(final DurableConfigurationStoreUpgrader upgrader)
+ {
+ throw new UnsupportedOperationException("NullUpgrader must always be the last upgrader");
+ }
+
+ @Override
+ public Map<UUID, ConfiguredObjectRecord> getUpdatedRecords()
+ {
+ return Collections.emptyMap();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java
new file mode 100644
index 0000000000..98348efbd2
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.UUID;
+
+public interface UnresolvedDependency<T>
+{
+ public UUID getId();
+ public String getType();
+
+ public void resolve(final T dependency);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java
new file mode 100644
index 0000000000..7ebebadae7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public interface UnresolvedObject<T>
+{
+ public UnresolvedDependency[] getUnresolvedDependencies();
+
+ T resolve();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UpgraderProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UpgraderProvider.java
new file mode 100644
index 0000000000..c2ea0745ff
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/UpgraderProvider.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public interface UpgraderProvider
+{
+ DurableConfigurationStoreUpgrader getUpgrader(int configVersion, DurableConfigurationRecoverer recoverer);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 58f4ed48ca..4e27a008dd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -59,6 +59,7 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.txn.DtxRegistry;
@@ -743,6 +744,22 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
}
+ protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers()
+ {
+ DurableConfiguredObjectRecoverer[] recoverers = {
+ new QueueRecoverer(this, getExchangeRegistry()),
+ new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()),
+ new BindingRecoverer(this, getExchangeRegistry())
+ };
+
+ final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>();
+ for(DurableConfiguredObjectRecoverer recoverer : recoverers)
+ {
+ recovererMap.put(recoverer.getType(), recoverer);
+ }
+ return recovererMap;
+ }
+
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
public VirtualHostHouseKeepingTask()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
new file mode 100644
index 0000000000..7cfadbcadf
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
@@ -0,0 +1,177 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.store.UnresolvedDependency;
+import org.apache.qpid.server.store.UnresolvedObject;
+
+public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<Binding>
+{
+ private static final Logger _logger = Logger.getLogger(BindingRecoverer.class);
+
+ private final ExchangeRegistry _exchangeRegistry;
+ private final VirtualHost _virtualHost;
+
+ public BindingRecoverer(final VirtualHost virtualHost,
+ final ExchangeRegistry exchangeRegistry)
+ {
+ _exchangeRegistry = exchangeRegistry;
+ _virtualHost = virtualHost;
+ }
+
+ @Override
+ public UnresolvedObject<Binding> createUnresolvedObject(final UUID id,
+ final String type,
+ final Map<String, Object> attributes)
+ {
+ return new UnresolvedBinding(id, attributes);
+ }
+
+ @Override
+ public String getType()
+ {
+ return org.apache.qpid.server.model.Binding.class.getSimpleName();
+ }
+
+ private class UnresolvedBinding implements UnresolvedObject<Binding>
+ {
+ private final Map<String, Object> _bindingArgumentsMap;
+ private final String _bindingName;
+ private final UUID _queueId;
+ private final UUID _exchangeId;
+ private final UUID _bindingId;
+
+ private List<UnresolvedDependency> _unresolvedDependencies =
+ new ArrayList<UnresolvedDependency>();
+
+ private Exchange _exchange;
+ private AMQQueue _queue;
+
+ public UnresolvedBinding(final UUID id,
+ final Map<String, Object> attributeMap)
+ {
+ _bindingId = id;
+ _exchangeId = UUID.fromString((String)attributeMap.get(org.apache.qpid.server.model.Binding.EXCHANGE));
+ _queueId = UUID.fromString((String) attributeMap.get(org.apache.qpid.server.model.Binding.QUEUE));
+ _exchange = _exchangeRegistry.getExchange(_exchangeId);
+ if(_exchange == null)
+ {
+ _unresolvedDependencies.add(new ExchangeDependency());
+ }
+ _queue = _virtualHost.getQueueRegistry().getQueue(_queueId);
+ if(_queue == null)
+ {
+ _unresolvedDependencies.add(new QueueDependency());
+ }
+
+ _bindingName = (String) attributeMap.get(org.apache.qpid.server.model.Binding.NAME);
+ _bindingArgumentsMap = (Map<String, Object>) attributeMap.get(org.apache.qpid.server.model.Binding.ARGUMENTS);
+ }
+
+ @Override
+ public UnresolvedDependency[] getUnresolvedDependencies()
+ {
+ return _unresolvedDependencies.toArray(new UnresolvedDependency[_unresolvedDependencies.size()]);
+ }
+
+ @Override
+ public Binding resolve()
+ {
+ try
+ {
+ if(_exchange.getBinding(_bindingName, _queue, _bindingArgumentsMap) == null)
+ {
+ _logger.info("Restoring binding: (Exchange: " + _exchange.getNameShortString() + ", Queue: " + _queue.getName()
+ + ", Routing Key: " + _bindingName + ", Arguments: " + _bindingArgumentsMap + ")");
+
+ _exchange.restoreBinding(_bindingId, _bindingName, _queue, _bindingArgumentsMap);
+ }
+ return _exchange.getBinding(_bindingName, _queue, _bindingArgumentsMap);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private class QueueDependency implements UnresolvedDependency<AMQQueue>
+ {
+
+ @Override
+ public UUID getId()
+ {
+ return _queueId;
+ }
+
+ @Override
+ public String getType()
+ {
+ return Queue.class.getSimpleName();
+ }
+
+ @Override
+ public void resolve(final AMQQueue dependency)
+ {
+ _queue = dependency;
+ _unresolvedDependencies.remove(this);
+ }
+
+ }
+
+ private class ExchangeDependency implements UnresolvedDependency<Exchange>
+ {
+
+ @Override
+ public UUID getId()
+ {
+ return _exchangeId;
+ }
+
+ @Override
+ public String getType()
+ {
+ return org.apache.qpid.server.model.Exchange.class.getSimpleName();
+ }
+
+ @Override
+ public void resolve(final Exchange dependency)
+ {
+ _exchange = dependency;
+ _unresolvedDependencies.remove(this);
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
new file mode 100644
index 0000000000..3526551073
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -0,0 +1,227 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.FilterSupport;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader;
+import org.apache.qpid.server.store.NonNullUpgrader;
+import org.apache.qpid.server.store.NullUpgrader;
+import org.apache.qpid.server.store.UpgraderProvider;
+
+import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
+
+public class DefaultUpgraderProvider implements UpgraderProvider
+{
+ private final ExchangeRegistry _exchangeRegistry;
+ private final VirtualHost _virtualHost;
+
+ public DefaultUpgraderProvider(final VirtualHost virtualHost,
+ final ExchangeRegistry exchangeRegistry)
+ {
+ _virtualHost = virtualHost;
+ _exchangeRegistry = exchangeRegistry;
+ }
+
+ public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer)
+ {
+ DurableConfigurationStoreUpgrader currentUpgrader = null;
+ switch(configVersion)
+ {
+ case 0:
+ currentUpgrader = addUpgrader(currentUpgrader, new Version0Upgrader());
+ case 1:
+ currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader());
+ case CURRENT_CONFIG_VERSION:
+ currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer));
+ break;
+
+ default:
+ throw new IllegalStateException("Unknown configuration model version: " + configVersion
+ + ". Attempting to run an older instance against an upgraded configuration?");
+ }
+ return currentUpgrader;
+ }
+
+ private DurableConfigurationStoreUpgrader addUpgrader(DurableConfigurationStoreUpgrader currentUpgrader,
+ final DurableConfigurationStoreUpgrader nextUpgrader)
+ {
+ if(currentUpgrader == null)
+ {
+ currentUpgrader = nextUpgrader;
+ }
+ else
+ {
+ currentUpgrader.setNextUpgrader(nextUpgrader);
+ }
+ return currentUpgrader;
+ }
+
+ /*
+ * Removes filters from queue bindings to exchanges other than topic exchanges. In older versions of the broker
+ * such bindings would have been ignored, starting from the point at which the config version changed, these
+ * arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue.
+ */
+ private class Version0Upgrader extends NonNullUpgrader
+ {
+ private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>();
+
+ public Version0Upgrader()
+ {
+ }
+
+ @Override
+ public void configuredObject(final UUID id, final String type, Map<String, Object> attributes)
+ {
+ _records.put(id, new ConfiguredObjectRecord(id, type, attributes));
+ }
+
+ private void removeSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
+
+ FilterSupport.removeFilters(arguments);
+ binding.put(Binding.ARGUMENTS, arguments);
+ }
+
+ private boolean isTopicExchange(Map<String, Object> binding)
+ {
+ UUID exchangeId = UUID.fromString((String)binding.get(Binding.EXCHANGE));
+
+ if(_records.containsKey(exchangeId))
+ {
+ return "topic".equals(_records.get(exchangeId)
+ .getAttributes()
+ .get(org.apache.qpid.server.model.Exchange.TYPE));
+ }
+ else
+ {
+ return _exchangeRegistry.getExchange(exchangeId) != null
+ && _exchangeRegistry.getExchange(exchangeId).getType() == TopicExchange.TYPE;
+ }
+
+ }
+
+ private boolean hasSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
+ return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
+ }
+
+
+
+ @Override
+ public void complete()
+ {
+ for(Map.Entry<UUID,ConfiguredObjectRecord> entry : _records.entrySet())
+ {
+ ConfiguredObjectRecord record = entry.getValue();
+ String type = record.getType();
+ Map<String, Object> attributes = record.getAttributes();
+ UUID id = record.getId();
+ if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(attributes))
+ {
+ attributes = new LinkedHashMap<String, Object>(attributes);
+ removeSelectorArguments(attributes);
+
+ record = new ConfiguredObjectRecord(id, type, attributes);
+ getUpdateMap().put(id, record);
+ entry.setValue(record);
+
+ }
+ getNextUpgrader().configuredObject(id, type, attributes);
+ }
+
+ getNextUpgrader().complete();
+ }
+
+ }
+
+ /*
+ * Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker
+ * configuration store). Also remove bindings which reference non-existant queues or exchanges.
+ */
+ private class Version1Upgrader extends NonNullUpgrader
+ {
+ @Override
+ public void configuredObject(final UUID id, String type, final Map<String, Object> attributes)
+ {
+ type = type.substring(1+type.lastIndexOf('.'));
+ getUpdateMap().put(id, new ConfiguredObjectRecord(id, type, attributes));
+
+ }
+
+ @Override
+ public void complete()
+ {
+ for(Map.Entry<UUID, ConfiguredObjectRecord> entry : getUpdateMap().entrySet())
+ {
+ final ConfiguredObjectRecord record = entry.getValue();
+ if(isBinding(record.getType()) && (unknownExchange((String) record.getAttributes().get(Binding.EXCHANGE))
+ || unknownQueue((String) record.getAttributes().get(Binding.QUEUE))))
+ {
+ entry.setValue(null);
+ }
+ else
+ {
+ getNextUpgrader().configuredObject(record.getId(), record.getType(), record.getAttributes());
+ }
+ }
+ getNextUpgrader().complete();
+ }
+
+ private boolean unknownExchange(final String exchangeIdString)
+ {
+ UUID exchangeId = UUID.fromString(exchangeIdString);
+ ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
+ return !((localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()))
+ || _exchangeRegistry.getExchange(exchangeId) != null);
+ }
+
+ private boolean unknownQueue(final String queueIdString)
+ {
+ UUID queueId = UUID.fromString(queueIdString);
+ ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
+ return !((localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName()))
+ || _virtualHost.getQueueRegistry().getQueue(queueId) != null);
+ }
+
+ private boolean isBinding(final String type)
+ {
+ return Binding.class.getSimpleName().equals(type);
+ }
+
+
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
new file mode 100644
index 0000000000..702f6e1bdf
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.store.UnresolvedDependency;
+import org.apache.qpid.server.store.UnresolvedObject;
+
+public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<Exchange>
+{
+ private final ExchangeRegistry _exchangeRegistry;
+ private final ExchangeFactory _exchangeFactory;
+
+ public ExchangeRecoverer(final ExchangeRegistry exchangeRegistry, final ExchangeFactory exchangeFactory)
+ {
+ _exchangeRegistry = exchangeRegistry;
+ _exchangeFactory = exchangeFactory;
+ }
+
+ @Override
+ public String getType()
+ {
+ return org.apache.qpid.server.model.Exchange.class.getSimpleName();
+ }
+
+ @Override
+ public UnresolvedObject<Exchange> createUnresolvedObject(final UUID id,
+ final String type,
+ final Map<String, Object> attributes)
+ {
+ return new UnresolvedExchange(id, attributes);
+ }
+
+ private class UnresolvedExchange implements UnresolvedObject<Exchange>
+ {
+ private Exchange _exchange;
+
+ public UnresolvedExchange(final UUID id,
+ final Map<String, Object> attributeMap)
+ {
+ String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME);
+ String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE);
+ String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY);
+ boolean autoDelete = lifeTimePolicy == null
+ || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
+ try
+ {
+ _exchange = _exchangeRegistry.getExchange(id);
+ if(_exchange == null)
+ {
+ _exchange = _exchangeRegistry.getExchange(exchangeName);
+ }
+ if (_exchange == null)
+ {
+ _exchange = _exchangeFactory.createExchange(id, exchangeName, exchangeType, true, autoDelete);
+ _exchangeRegistry.registerExchange(_exchange);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Error recovering exchange uuid " + id + " name " + exchangeName, e);
+ }
+ }
+
+ @Override
+ public UnresolvedDependency[] getUnresolvedDependencies()
+ {
+ return new UnresolvedDependency[0];
+ }
+
+ @Override
+ public Exchange resolve()
+ {
+ return _exchange;
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
new file mode 100644
index 0000000000..4e06cf3202
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.store.UnresolvedDependency;
+import org.apache.qpid.server.store.UnresolvedObject;
+
+public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQQueue>
+{
+ private static final Logger _logger = Logger.getLogger(QueueRecoverer.class);
+ private final VirtualHost _virtualHost;
+ private final ExchangeRegistry _exchangeRegistry;
+
+ public QueueRecoverer(final VirtualHost virtualHost, final ExchangeRegistry exchangeRegistry)
+ {
+ _virtualHost = virtualHost;
+ _exchangeRegistry = exchangeRegistry;
+ }
+
+ @Override
+ public String getType()
+ {
+ return Queue.class.getSimpleName();
+ }
+
+ @Override
+ public UnresolvedObject<AMQQueue> createUnresolvedObject(final UUID id,
+ final String type,
+ final Map<String, Object> attributes)
+ {
+ return new UnresolvedQueue(id, type, attributes);
+ }
+
+ private class UnresolvedQueue implements UnresolvedObject<AMQQueue>
+ {
+ private AMQQueue _queue;
+
+ public UnresolvedQueue(final UUID id,
+ final String type,
+ final Map<String, Object> attributeMap)
+ {
+ String queueName = (String) attributeMap.get(Queue.NAME);
+ String owner = (String) attributeMap.get(Queue.OWNER);
+ boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
+ UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
+ @SuppressWarnings("unchecked")
+ Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS);
+ try
+ {
+ _queue = _virtualHost.getQueueRegistry().getQueue(id);
+ if(_queue == null)
+ {
+ _queue = _virtualHost.getQueueRegistry().getQueue(queueName);
+ }
+
+ if (_queue == null)
+ {
+ _queue = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost,
+ queueArgumentsMap);
+ _virtualHost.getQueueRegistry().registerQueue(_queue);
+
+ if (alternateExchangeId != null)
+ {
+ Exchange altExchange = _exchangeRegistry.getExchange(alternateExchangeId);
+ if (altExchange == null)
+ {
+ _logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id);
+ return;
+ }
+ _queue.setAlternateExchange(altExchange);
+ }
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e);
+ }
+ }
+
+ @Override
+ public UnresolvedDependency[] getUnresolvedDependencies()
+ {
+ return new UnresolvedDependency[0];
+ }
+
+ @Override
+ public AMQQueue resolve()
+ {
+ return _queue;
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index b34444cb4c..143bdce85b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -23,6 +23,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreCreator;
@@ -68,7 +69,7 @@ public class StandardVirtualHost extends AbstractVirtualHost
final
MessageStoreLogSubject
- storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName());
+ storeLogSubject = new MessageStoreLogSubject(getName(), messageStore.getClass().getSimpleName());
OperationalLoggingListener.listen(messageStore, storeLogSubject);
return messageStore;
@@ -96,10 +97,12 @@ public class StandardVirtualHost extends AbstractVirtualHost
_durableConfigurationStore = initialiseConfigurationStore(virtualHost);
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
-
- _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, virtualHost);
+ DurableConfigurationRecoverer configRecoverer =
+ new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
+ new DefaultUpgraderProvider(this, getExchangeRegistry()));
+ _durableConfigurationStore.configureConfigStore(getName(), configRecoverer, virtualHost);
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getExchangeRegistry(), getExchangeFactory());
_messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler);
initialiseModel(hostConfig);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 605bbe5f45..3738306f6a 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -20,11 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
@@ -32,28 +28,16 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.FilterSupport;
-import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StoredMessage;
@@ -65,11 +49,8 @@ import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
-import org.apache.qpid.util.ByteBufferInputStream;
-import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
-
-public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
+public class VirtualHostConfigRecoveryHandler implements
MessageStoreRecoveryHandler,
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
TransactionLogRecoveryHandler,
@@ -84,15 +65,11 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
- private final Map<String, Map<UUID, Map<String, Object>>> _configuredObjects = new HashMap<String, Map<UUID, Map<String, Object>>>();
-
private final ExchangeRegistry _exchangeRegistry;
private final ExchangeFactory _exchangeFactory;
private MessageStoreLogSubject _logSubject;
private MessageStore _store;
- private int _currentConfigVersion;
- private DurableConfigurationStore _configStore;
public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost,
ExchangeRegistry exchangeRegistry,
@@ -103,76 +80,14 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
_exchangeFactory = exchangeFactory;
}
- @Override
- public void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion)
- {
- _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
- _configStore = store;
- _currentConfigVersion = configVersion;
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_START());
- }
-
public VirtualHostConfigRecoveryHandler begin(MessageStore store)
{
- _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
+ _logSubject = new MessageStoreLogSubject(_virtualHost.getName(), store.getClass().getSimpleName());
_store = store;
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
return this;
}
- public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId)
- {
- try
- {
- AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName);
-
- if (q == null)
- {
- q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost,
- FieldTable.convertToMap(arguments));
- _virtualHost.getQueueRegistry().registerQueue(q);
-
- if (alternateExchangeId != null)
- {
- Exchange altExchange = _exchangeRegistry.getExchange(alternateExchangeId);
- if (altExchange == null)
- {
- _logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id);
- return;
- }
- q.setAlternateExchange(altExchange);
- }
- }
-
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true));
-
- //Record that we have a queue for recovery
- _queueRecoveries.put(queueName, 0);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e);
- }
- }
-
- public void exchange(UUID id, String exchangeName, String type, boolean autoDelete)
- {
- try
- {
- Exchange exchange;
- exchange = _exchangeRegistry.getExchange(exchangeName);
- if (exchange == null)
- {
- exchange = _exchangeFactory.createExchange(id, exchangeName, type, true, autoDelete);
- _exchangeRegistry.registerExchange(exchange);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Error recovering exchange uuid " + id + " name " + exchangeName, e);
- }
- }
-
public StoredMessageRecoveryHandler begin()
{
return this;
@@ -347,56 +262,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
}
- private void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf)
- {
- try
- {
- Exchange exchange = _exchangeRegistry.getExchange(exchangeId);
- if (exchange == null)
- {
- _logger.error("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId);
- return;
- }
-
- AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId);
- if (queue == null)
- {
- _logger.error("Unknown queue id " + queueId + ", cannot be bound to exchange: " + exchange.getName());
- }
- else
- {
- FieldTable argumentsFT = null;
- if(buf != null)
- {
- try
- {
- argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit());
- }
- catch (IOException e)
- {
- throw new RuntimeException("IOException should not be thrown here", e);
- }
- }
-
- Map<String, Object> argumentMap = FieldTable.convertToMap(argumentsFT);
-
- if(exchange.getBinding(bindingKey, queue, argumentMap) == null)
- {
-
- _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queue.getName()
- + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
-
- exchange.restoreBinding(bindingId, bindingKey, queue, argumentMap);
- }
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
public void complete()
{
}
@@ -478,201 +343,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
return this;
}
- @Override
- public void configuredObject(UUID id, String type, Map<String, Object> attributes)
- {
- Map<UUID, Map<String, Object>> typeMap = _configuredObjects.get(type);
- if(typeMap == null)
- {
- typeMap = new HashMap<UUID, Map<String, Object>>();
- _configuredObjects.put(type,typeMap);
- }
- typeMap.put(id, attributes);
- }
-
- @Override
- public int completeConfigurationRecovery()
- {
- if(CURRENT_CONFIG_VERSION !=_currentConfigVersion)
- {
- try
- {
- upgrade();
- }
- catch (AMQStoreException e)
- {
- throw new IllegalArgumentException("Unable to upgrade configuration from version " + _currentConfigVersion + " to version " + CURRENT_CONFIG_VERSION);
- }
- }
-
- Map<UUID, Map<String, Object>> exchangeObjects =
- _configuredObjects.remove(org.apache.qpid.server.model.Exchange.class.getName());
-
- if(exchangeObjects != null)
- {
- recoverExchanges(exchangeObjects);
- }
-
- Map<UUID, Map<String, Object>> queueObjects =
- _configuredObjects.remove(org.apache.qpid.server.model.Queue.class.getName());
-
- if(queueObjects != null)
- {
- recoverQueues(queueObjects);
- }
-
-
- Map<UUID, Map<String, Object>> bindingObjects =
- _configuredObjects.remove(Binding.class.getName());
-
- if(bindingObjects != null)
- {
- recoverBindings(bindingObjects);
- }
-
-
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
-
- return CURRENT_CONFIG_VERSION;
- }
-
- private void upgrade() throws AMQStoreException
- {
-
- Map<UUID, String> updates = new HashMap<UUID, String>();
-
- final String bindingType = Binding.class.getName();
-
- switch(_currentConfigVersion)
- {
- case 0:
- Map<UUID, Map<String, Object>> bindingObjects =
- _configuredObjects.get(bindingType);
- if(bindingObjects != null)
- {
- for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingObjects.entrySet())
- {
- Map<String, Object> binding = bindingEntry.getValue();
-
- if(hasSelectorArguments(binding) && !isTopicExchange(binding))
- {
- binding = new LinkedHashMap<String, Object>(binding);
- removeSelectorArguments(binding);
- bindingEntry.setValue(binding);
-
- updates.put(bindingEntry.getKey(), bindingType);
- }
- }
- }
- case CURRENT_CONFIG_VERSION:
- if(!updates.isEmpty())
- {
- ConfiguredObjectRecord[] updateRecords = new ConfiguredObjectRecord[updates.size()];
- int i = 0;
- for(Map.Entry<UUID, String> update : updates.entrySet())
- {
- updateRecords[i++] = new ConfiguredObjectRecord(update.getKey(), update.getValue(), _configuredObjects.get(update.getValue()).get(update.getKey()));
- }
- _configStore.update(updateRecords);
- }
- break;
- default:
- throw new IllegalStateException("Unknown configuration model version: " + _currentConfigVersion + ". Are you attempting to run an older instance against an upgraded configuration?");
- }
- }
-
- private void removeSelectorArguments(Map<String, Object> binding)
- {
- @SuppressWarnings("unchecked")
- Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
-
- FilterSupport.removeFilters(arguments);
- binding.put(Binding.ARGUMENTS, arguments);
- }
-
- private boolean isTopicExchange(Map<String, Object> binding)
- {
- UUID exchangeId = UUID.fromString((String)binding.get(Binding.EXCHANGE));
- final
- Map<UUID, Map<String, Object>> exchanges =
- _configuredObjects.get(org.apache.qpid.server.model.Exchange.class.getName());
-
- if(exchanges != null && exchanges.containsKey(exchangeId))
- {
- return "topic".equals(exchanges.get(exchangeId).get(org.apache.qpid.server.model.Exchange.TYPE));
- }
- else
- {
- return _exchangeRegistry.getExchange(exchangeId) != null
- && _exchangeRegistry.getExchange(exchangeId).getType() == TopicExchange.TYPE;
- }
-
- }
-
- private boolean hasSelectorArguments(Map<String, Object> binding)
- {
- @SuppressWarnings("unchecked")
- Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
- return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
- }
-
- private void recoverExchanges(Map<UUID, Map<String, Object>> exchangeObjects)
- {
- for(Map.Entry<UUID, Map<String,Object>> entry : exchangeObjects.entrySet())
- {
- Map<String,Object> attributeMap = entry.getValue();
- String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME);
- String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE);
- String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY);
- boolean autoDelete = lifeTimePolicy == null
- || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
- exchange(entry.getKey(), exchangeName, exchangeType, autoDelete);
- }
- }
-
- private void recoverQueues(Map<UUID, Map<String, Object>> queueObjects)
- {
- for(Map.Entry<UUID, Map<String,Object>> entry : queueObjects.entrySet())
- {
- Map<String,Object> attributeMap = entry.getValue();
-
- String queueName = (String) attributeMap.get(Queue.NAME);
- String owner = (String) attributeMap.get(Queue.OWNER);
- boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
- UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
- @SuppressWarnings("unchecked")
- Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS);
- FieldTable arguments = null;
- if (queueArgumentsMap != null)
- {
- arguments = FieldTable.convertToFieldTable(queueArgumentsMap);
- }
- queue(entry.getKey(), queueName, owner, exclusive, arguments, alternateExchangeId);
- }
- }
-
- private void recoverBindings(Map<UUID, Map<String, Object>> bindingObjects)
- {
- for(Map.Entry<UUID, Map<String,Object>> entry : bindingObjects.entrySet())
- {
- Map<String,Object> attributeMap = entry.getValue();
- UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE));
- UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE));
- String bindingName = (String) attributeMap.get(Binding.NAME);
-
- @SuppressWarnings("unchecked")
- Map<String, Object> bindingArgumentsMap = (Map<String, Object>) attributeMap.get(Binding.ARGUMENTS);
- FieldTable arguments = null;
- if (bindingArgumentsMap != null)
- {
- arguments = FieldTable.convertToFieldTable(bindingArgumentsMap);
- }
- ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes()));
-
- binding(entry.getKey(), exchangeId, queueId, bindingName, argumentsBB);
- }
- }
-
private static class DummyMessage implements EnqueableMessage
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
index 65fd249d03..3d43ef0f44 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
@@ -37,7 +37,8 @@ public class MessageStoreLogSubjectTest extends AbstractTestLogSubject
_testVhost = BrokerTestHelper.createVirtualHost("test");
- _subject = new MessageStoreLogSubject(_testVhost, _testVhost.getMessageStore().getClass().getSimpleName());
+ _subject = new MessageStoreLogSubject(_testVhost.getName(),
+ _testVhost.getMessageStore().getClass().getSimpleName());
}
@Override
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index ab1a0f7d0c..67cf0780da 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -56,6 +56,11 @@ import org.apache.qpid.util.FileUtils;
public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase
{
private static final String EXCHANGE_NAME = "exchangeName";
+
+ private static final String EXCHANGE = org.apache.qpid.server.model.Exchange.class.getSimpleName();
+ private static final String BINDING = org.apache.qpid.server.model.Binding.class.getSimpleName();
+ private static final String QUEUE = Queue.class.getSimpleName();
+
private String _storePath;
private String _storeName;
private MessageStore _messageStore;
@@ -134,7 +139,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
reopenStore();
- verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(org.apache.qpid.server.model.Exchange.class.getName()),
+ verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE),
eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(),
org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString())));
@@ -186,7 +191,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY);
map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,FieldTable.convertToMap(_bindingArgs));
- verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(org.apache.qpid.server.model.Binding.class.getName()),
+ verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(BINDING),
eq(map));
}
@@ -201,7 +206,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
reopenStore();
verify(_recoveryHandler, never()).configuredObject(any(UUID.class),
- eq(org.apache.qpid.server.model.Binding.class.getName()),
+ eq(BINDING),
anyMap());
}
@@ -215,7 +220,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
public void testCreateQueueAMQQueueFieldTable() throws Exception
@@ -238,7 +243,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
queueAttributes.put(Queue.ARGUMENTS, attributes);
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
@@ -256,7 +261,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
private Exchange createTestAlternateExchange()
@@ -292,7 +297,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
queueAttributes.put(Queue.ARGUMENTS, attributes);
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
@@ -323,7 +328,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.ARGUMENTS, attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes));
+ verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
public void testRemoveQueue() throws Exception
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
new file mode 100644
index 0000000000..ccc7f6a697
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -0,0 +1,376 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.HeadersExchange;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
+
+public class DurableConfigurationRecovererTest extends QpidTestCase
+{
+ private static final UUID QUEUE_ID = new UUID(0,0);
+ private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1);
+ private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2);
+ private static final String CUSTOM_EXCHANGE_NAME = "customExchange";
+
+ private DurableConfigurationRecoverer _durableConfigurationRecoverer;
+ private Exchange _directExchange;
+ private Exchange _topicExchange;
+ private VirtualHost _vhost;
+ private DurableConfigurationStore _store;
+ private ExchangeFactory _exchangeFactory;
+ private ExchangeRegistry _exchangeRegistry;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+
+ _directExchange = mock(Exchange.class);
+ when(_directExchange.getType()).thenReturn(DirectExchange.TYPE);
+
+
+ _topicExchange = mock(Exchange.class);
+ when(_topicExchange.getType()).thenReturn(TopicExchange.TYPE);
+
+ AMQQueue queue = mock(AMQQueue.class);
+
+ _vhost = mock(VirtualHost.class);
+
+ _exchangeRegistry = mock(ExchangeRegistry.class);
+ when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
+ when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
+
+ QueueRegistry queueRegistry = mock(QueueRegistry.class);
+ when(_vhost.getQueueRegistry()).thenReturn(queueRegistry);
+
+ when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
+
+ _exchangeFactory = mock(ExchangeFactory.class);
+
+ DurableConfiguredObjectRecoverer[] recoverers = {
+ new QueueRecoverer(_vhost, _exchangeRegistry),
+ new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory),
+ new BindingRecoverer(_vhost, _exchangeRegistry)
+ };
+
+ final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>();
+ for(DurableConfiguredObjectRecoverer recoverer : recoverers)
+ {
+ recovererMap.put(recoverer.getType(), recoverer);
+ }
+ _durableConfigurationRecoverer =
+ new DurableConfigurationRecoverer(_vhost.getName(), recovererMap,
+ new DefaultUpgraderProvider(_vhost, _exchangeRegistry));
+
+ _store = mock(DurableConfigurationStore.class);
+
+ CurrentActor.set(mock(LogActor.class));
+ }
+
+ public void testUpgradeEmptyStore() throws Exception
+ {
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
+ assertEquals("Did not upgrade to the expected version",
+ CURRENT_CONFIG_VERSION,
+ _durableConfigurationRecoverer.completeConfigurationRecovery());
+ }
+
+ public void testUpgradeNewerStoreFails() throws Exception
+ {
+ try
+ {
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION + 1);
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ fail("Should not be able to start when config model is newer than current");
+ }
+ catch (IllegalStateException e)
+ {
+ // pass
+ }
+ }
+
+ public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception
+ {
+
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key",
+ DIRECT_EXCHANGE_ID,
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble"));
+
+ final ConfiguredObjectRecord[] expected = {
+ new ConfiguredObjectRecord(new UUID(1, 0), "Binding",
+ createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID))
+ };
+
+ verifyCorrectUpdates(expected);
+
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ }
+
+
+
+ public void testUpgradeOnlyRemovesSelectorBindings() throws Exception
+ {
+
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key",
+ DIRECT_EXCHANGE_ID,
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble",
+ "not-a-selector",
+ "moo"));
+
+
+ final UUID customExchangeId = new UUID(3,0);
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(2, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key",
+ customExchangeId,
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble",
+ "not-a-selector",
+ "moo"));
+
+ _durableConfigurationRecoverer.configuredObject(customExchangeId,
+ "org.apache.qpid.server.model.Exchange",
+ createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE));
+
+ final Exchange customExchange = mock(Exchange.class);
+
+ when(_exchangeFactory.createExchange(eq(customExchangeId),
+ eq(CUSTOM_EXCHANGE_NAME),
+ eq(HeadersExchange.TYPE.getType()),
+ anyBoolean(),
+ anyBoolean())).thenReturn(customExchange);
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ when(_exchangeRegistry.getExchange(eq(customExchangeId))).thenReturn(customExchange);
+ return null;
+ }
+ }).when(_exchangeRegistry).registerExchange(customExchange);
+
+ final ConfiguredObjectRecord[] expected = {
+ new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
+ createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")),
+ new ConfiguredObjectRecord(new UUID(2, 0), "org.apache.qpid.server.model.Binding",
+ createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo"))
+ };
+
+ verifyCorrectUpdates(expected);
+
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ }
+
+
+ public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception
+ {
+
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "org.apache.qpid.server.model.Binding",
+ createBinding("key",
+ TOPIC_EXCHANGE_ID,
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble"));
+
+ final ConfiguredObjectRecord[] expected = {
+ new ConfiguredObjectRecord(new UUID(1, 0), "Binding",
+ createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"))
+ };
+
+ verifyCorrectUpdates(expected);
+
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ }
+
+ public void testUpgradeDoesNotRecur() throws Exception
+ {
+
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "Binding",
+ createBinding("key",
+ DIRECT_EXCHANGE_ID,
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble"));
+
+ doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class));
+
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ }
+
+ public void testFailsWithUnresolvedObjects()
+ {
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
+
+
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "Binding",
+ createBinding("key",
+ new UUID(3,0),
+ QUEUE_ID,
+ "x-filter-jms-selector",
+ "wibble"));
+
+ try
+ {
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ fail("Expected resolution to fail due to unknown object");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ assertEquals("Durable configuration has unresolved dependencies", e.getMessage());
+ }
+
+ }
+
+ public void testFailsWithUnknownObjectType()
+ {
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
+
+
+ try
+ {
+ final Map<String, Object> emptyArguments = Collections.emptyMap();
+ _durableConfigurationRecoverer.configuredObject(new UUID(1, 0),
+ "Wibble", emptyArguments);
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+ fail("Expected resolution to fail due to unknown object type");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ assertEquals("Unkown type for configured object: Wibble", e.getMessage());
+ }
+
+
+ }
+
+ private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException
+ {
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ Object[] args = invocation.getArguments();
+ assertEquals("Updated records are not as expected", new HashSet(Arrays.asList(
+ expected)), new HashSet(Arrays.asList(args)));
+
+ return null;
+ }
+ }).when(_store).update(any(ConfiguredObjectRecord[].class));
+ }
+
+ private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args)
+ {
+ Map<String, Object> binding = new LinkedHashMap<String, Object>();
+
+ binding.put("name", bindingKey);
+ binding.put(Binding.EXCHANGE, exchangeId.toString());
+ binding.put(Binding.QUEUE, queueId.toString());
+ Map<String,String> argumentMap = new LinkedHashMap<String, String>();
+ if(args != null && args.length != 0)
+ {
+ String key = null;
+ for(String arg : args)
+ {
+ if(key == null)
+ {
+ key = arg;
+ }
+ else
+ {
+ argumentMap.put(key, arg);
+ key = null;
+ }
+ }
+ }
+ binding.put(Binding.ARGUMENTS, argumentMap);
+ return binding;
+ }
+
+
+ private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type)
+ {
+ Map<String, Object> exchange = new LinkedHashMap<String, Object>();
+
+ exchange.put(org.apache.qpid.server.model.Exchange.NAME, name);
+ exchange.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType());
+
+ return exchange;
+
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java
deleted file mode 100644
index ac81f5d625..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.HeadersExchange;
-import org.apache.qpid.server.exchange.TopicExchange;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
-
-public class VirtualHostConfigRecoveryHandlerTest extends QpidTestCase
-{
- private Exchange _directExchange;
- private Exchange _topicExchange;
- private VirtualHost _vhost;
- private VirtualHostConfigRecoveryHandler _virtualHostConfigRecoveryHandler;
- private DurableConfigurationStore _store;
-
- private static final UUID QUEUE_ID = new UUID(0,0);
- private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1);
- private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2);
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
-
-
- _directExchange = mock(Exchange.class);
- when(_directExchange.getType()).thenReturn(DirectExchange.TYPE);
-
-
- _topicExchange = mock(Exchange.class);
- when(_topicExchange.getType()).thenReturn(TopicExchange.TYPE);
-
- AMQQueue queue = mock(AMQQueue.class);
-
- _vhost = mock(VirtualHost.class);
-
- ExchangeRegistry exchangeRegistry = mock(ExchangeRegistry.class);
- when(exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
- when(exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
-
- QueueRegistry queueRegistry = mock(QueueRegistry.class);
- when(_vhost.getQueueRegistry()).thenReturn(queueRegistry);
-
- when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
-
- ExchangeFactory exchangeFactory = mock(ExchangeFactory.class);
- _virtualHostConfigRecoveryHandler = new VirtualHostConfigRecoveryHandler(_vhost, exchangeRegistry, exchangeFactory);
-
- _store = mock(DurableConfigurationStore.class);
-
- CurrentActor.set(mock(LogActor.class));
- }
-
- public void testUpgradeEmptyStore() throws Exception
- {
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
- assertEquals("Did not upgrade to the expected version", CURRENT_CONFIG_VERSION, _virtualHostConfigRecoveryHandler.completeConfigurationRecovery());
- }
-
- public void testUpgradeNewerStoreFails() throws Exception
- {
- try
- {
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION+1);
- _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
- fail("Should not be able to start when config model is newer than current");
- }
- catch (IllegalStateException e)
- {
- // pass
- }
- }
-
- public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception
- {
-
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
-
- _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
- "org.apache.qpid.server.model.Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"));
-
- final ConfiguredObjectRecord[] expected = {
- new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID))
- };
-
- verifyCorrectUpdates(expected);
-
- _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
- }
-
-
-
- public void testUpgradeOnlyRemovesSelectorBindings() throws Exception
- {
-
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
-
- _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
- "org.apache.qpid.server.model.Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo"));
-
-
- UUID customExchangeId = new UUID(3,0);
-
- _virtualHostConfigRecoveryHandler.configuredObject(new UUID(2, 0),
- "org.apache.qpid.server.model.Binding",
- createBinding("key", customExchangeId, QUEUE_ID, "x-filter-jms-selector", "wibble", "not-a-selector", "moo"));
-
- _virtualHostConfigRecoveryHandler.configuredObject(customExchangeId,
- "org.apache.qpid.server.model.Exchange",
- createExchange("customExchange", HeadersExchange.TYPE));
-
-
-
- final ConfiguredObjectRecord[] expected = {
- new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "not-a-selector", "moo")),
- new ConfiguredObjectRecord(new UUID(3, 0), "org.apache.qpid.server.model.Binding",
- createBinding("key", customExchangeId, QUEUE_ID, "not-a-selector", "moo"))
- };
-
- verifyCorrectUpdates(expected);
-
- _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
- }
-
-
- public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception
- {
-
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 0);
-
- _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
- "org.apache.qpid.server.model.Binding",
- createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"));
-
- final ConfiguredObjectRecord[] expected = {
- new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
- createBinding("key", TOPIC_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"))
- };
-
- verifyCorrectUpdates(expected);
-
- _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
- }
-
- public void testUpgradeDoesNotRecur() throws Exception
- {
-
- _virtualHostConfigRecoveryHandler.beginConfigurationRecovery(_store, 1);
-
- _virtualHostConfigRecoveryHandler.configuredObject(new UUID(1, 0),
- "org.apache.qpid.server.model.Binding",
- createBinding("key", DIRECT_EXCHANGE_ID, QUEUE_ID, "x-filter-jms-selector", "wibble"));
-
- doThrow(new RuntimeException("Update Should not be called")).when(_store).update(any(ConfiguredObjectRecord[].class));
-
- _virtualHostConfigRecoveryHandler.completeConfigurationRecovery();
- }
-
- private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException
- {
- doAnswer(new Answer()
- {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable
- {
- Object[] args = invocation.getArguments();
- assertEquals("Updated records are not as expected", new HashSet(Arrays.asList(
- expected)), new HashSet(Arrays.asList(args)));
-
- return null;
- }
- }).when(_store).update(any(ConfiguredObjectRecord[].class));
- }
-
- private Map<String,Object> createBinding(String bindingKey, UUID exchangeId, UUID queueId, String... args)
- {
- Map<String, Object> binding = new LinkedHashMap<String, Object>();
-
- binding.put("name", bindingKey);
- binding.put(Binding.EXCHANGE, exchangeId.toString());
- binding.put(Binding.QUEUE, queueId.toString());
- Map<String,String> argumentMap = new LinkedHashMap<String, String>();
- if(args != null && args.length != 0)
- {
- String key = null;
- for(String arg : args)
- {
- if(key == null)
- {
- key = arg;
- }
- else
- {
- argumentMap.put(key, arg);
- key = null;
- }
- }
- }
- binding.put(Binding.ARGUMENTS, argumentMap);
- return binding;
- }
-
-
- private Map<String, Object> createExchange(String name, ExchangeType<HeadersExchange> type)
- {
- Map<String, Object> exchange = new LinkedHashMap<String, Object>();
-
- exchange.put(org.apache.qpid.server.model.Exchange.NAME, name);
- exchange.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType());
-
- return exchange;
-
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
index 4f8a6ee54a..8822fc5373 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
@@ -466,7 +466,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
try
{
- // an implicit recover performed when acknowledge throws an exception due to failover
+ // an implicit recover performed when acknowledge throws an exception due to failover
lastMessage.acknowledge();
fail("JMSException should be thrown");
}
@@ -529,7 +529,7 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
Message lastMessage = consumeMessages();
try
{
- // an implicit recover performed when acknowledge throws an exception due to failover
+ // an implicit recover performed when acknowledge throws an exception due to failover
lastMessage.acknowledge();
fail("JMSException should be thrown");
}
@@ -923,9 +923,9 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
final Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity", capacity);
arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
- ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), true, true, false, arguments);
+ ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments);
Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true
- + "'&autodelete='" + true + "'");
+ + "'&autodelete='" + false + "'");
((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
return queue;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index b541fcc9c6..cec982c2c5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -190,6 +190,15 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
+ public UUID[] removeConfiguredObjects(final UUID... objects) throws AMQStoreException
+ {
+ doPreDelay("remove");
+ UUID[] removed = _durableConfigurationStore.removeConfiguredObjects(objects);
+ doPostDelay("remove");
+ return removed;
+ }
+
+ @Override
public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
doPreDelay("update");
@@ -205,6 +214,14 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
doPostDelay("update");
}
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ doPreDelay("update");
+ _durableConfigurationStore.update(createIfNecessary, records);
+ doPostDelay("update");
+ }
+
public Transaction newTransaction()
{
doPreDelay("beginTran");