diff options
| author | Keith Wall <kwall@apache.org = kwall = Keith Wall kwall@apache.org@apache.org> | 2014-04-14 08:54:19 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org = kwall = Keith Wall kwall@apache.org@apache.org> | 2014-04-14 08:54:19 +0000 |
| commit | cde1072e86b57286594eb4fdb494576689aa8bca (patch) | |
| tree | 8e0f378d16d5cf564f8ab0d2f93e5ec6f338621f /qpid/java | |
| parent | 981b8f5357355f842a523e4b50a1d5c711095a68 (diff) | |
| download | qpid-python-cde1072e86b57286594eb4fdb494576689aa8bca.tar.gz | |
QPID-5685: Store configuration version as an attribute of virtualhost within configuration store rather than within separate database/table
* ConfiguredObjectRecordHandler begin/end methods no longer take/return config version
* DefaultUpgraderProvider uses the virtualhost record for the config version only and uses this to trigger
the correct upgrade. Note this record is *not* recovered (yet).
* BDB/SQL Upgraders migrate the config version from database/table to be the modelVersion attribute of a virtualhost entry.
* BDB Upgrader tests (7 to 8).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1587165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
23 files changed, 618 insertions, 303 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 652e4c135d..722a3a090d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -70,7 +70,6 @@ import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.util.FileUtils; import com.sleepycat.bind.tuple.ByteBinding; -import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.Cursor; @@ -109,8 +108,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private static String BRIDGEDB_NAME = "BRIDGES"; private static String LINKDB_NAME = "LINKS"; private static String XID_DB_NAME = "XIDS"; - private static String CONFIG_VERSION_DB_NAME = "CONFIG_VERSION"; - private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIG_VERSION_DB_NAME , CONFIGURED_OBJECT_HIERARCHY_DB_NAME}; + private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIGURED_OBJECT_HIERARCHY_DB_NAME}; private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME }; private EnvironmentFacade _environmentFacade; @@ -182,16 +180,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore try { - int configVersion = getConfigVersion(); - - handler.begin(configVersion); + handler.begin(); doVisitAllConfiguredObjectRecords(handler); - - int newConfigVersion = handler.end(); - if(newConfigVersion != configVersion) - { - updateConfigVersion(newConfigVersion); - } + handler.end(); } catch (DatabaseException e) { @@ -372,70 +363,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - @SuppressWarnings("resource") - private void updateConfigVersion(int newConfigVersion) throws StoreException - { - Transaction txn = null; - Cursor cursor = null; - try - { - txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - cursor = getConfigVersionDb().openCursor(txn, null); - DatabaseEntry key = new DatabaseEntry(); - ByteBinding.byteToEntry((byte) 0,key); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - IntegerBinding.intToEntry(newConfigVersion, value); - OperationStatus status = cursor.put(key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error setting config version: " + status); - } - } - cursor.close(); - cursor = null; - txn.commit(); - txn = null; - } - finally - { - closeCursorSafely(cursor); - abortTransactionIgnoringException("Error setting config version", txn);; - } - - } - - private int getConfigVersion() throws StoreException - { - Cursor cursor = null; - try - { - cursor = getConfigVersionDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - return IntegerBinding.entryToInt(value); - } - - // Insert 0 as the default config version - IntegerBinding.intToEntry(0,value); - ByteBinding.byteToEntry((byte) 0,key); - OperationStatus status = getConfigVersionDb().put(null, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error initialising config version: " + status); - } - return 0; - } - finally - { - closeCursorSafely(cursor); - } - } - private void closeCursorSafely(Cursor cursor) throws StoreException { if (cursor != null) @@ -1622,11 +1549,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME); } - private Database getConfigVersionDb() - { - return _environmentFacade.getOpenDatabase(CONFIG_VERSION_DB_NAME); - } - private Database getMessageMetaDataDb() { return _environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME); @@ -1644,8 +1566,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore class UpgradeTask implements EnvironmentFacadeTask { - - private ConfiguredObject<?> _parent; + private final ConfiguredObject<?> _parent; public UpgradeTask(ConfiguredObject<?> parent) { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java index e5e1201c6a..8f3cf73275 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java @@ -20,32 +20,38 @@ */ package org.apache.qpid.server.store.berkeleydb.upgrade; -import com.sleepycat.bind.tuple.ByteBinding; -import com.sleepycat.bind.tuple.IntegerBinding; -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import com.sleepycat.je.*; +import java.io.IOException; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.model.ConfiguredObject; - +import org.apache.qpid.server.model.Model; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.berkeleydb.BDBConfiguredObjectRecord; -import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; -import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; -import java.io.IOException; -import java.io.StringWriter; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import com.sleepycat.je.Cursor; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.Environment; +import com.sleepycat.je.LockMode; +import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.Transaction; public class UpgradeFrom7To8 extends AbstractStoreUpgrade { + private static final TypeReference<HashMap<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<HashMap<String,Object>>(){}; @Override public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent) @@ -58,9 +64,25 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade Database hierarchyDb = environment.openDatabase(null, "CONFIGURED_OBJECT_HIERARCHY", dbConfig); Database configuredObjectsDb = environment.openDatabase(null, "CONFIGURED_OBJECTS", dbConfig); + Database configVersionDb = environment.openDatabase(null, "CONFIG_VERSION", dbConfig); Cursor objectsCursor = null; + String stringifiedConfigVersion = Model.MODEL_VERSION; + int configVersion = getConfigVersion(configVersionDb); + if (configVersion > -1) + { + stringifiedConfigVersion = "0." + configVersion; + } + configVersionDb.close(); + + Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + virtualHostAttributes.put("modelVersion", stringifiedConfigVersion); + + String virtualHostName = parent.getName(); + UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName); + ConfiguredObjectRecord virtualHostRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes); + Transaction txn = environment.beginTransaction(null, null); try @@ -69,9 +91,6 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); - Map<UUID, BDBConfiguredObjectRecord> configuredObjects = - new HashMap<UUID, BDBConfiguredObjectRecord>(); - while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { UUID id = UUIDTupleBinding.getInstance().entryToObject(key); @@ -80,7 +99,7 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade if(!type.endsWith("Binding")) { - UUIDTupleBinding.getInstance().objectToEntry(parent.getId(),value); + UUIDTupleBinding.getInstance().objectToEntry(virtualHostId, value); TupleOutput tupleOutput = new TupleOutput(); tupleOutput.writeLong(id.getMostSignificantBits()); tupleOutput.writeLong(id.getLeastSignificantBits()); @@ -97,7 +116,7 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade DatabaseEntry hierarchyKey = new DatabaseEntry(); DatabaseEntry hierarchyValue = new DatabaseEntry(); - Map<String,Object> attributes = mapper.readValue(json, Map.class); + Map<String,Object> attributes = mapper.readValue(json, MAP_TYPE_REFERENCE); Object queueIdString = attributes.remove("queue"); if(queueIdString instanceof String) { @@ -134,13 +153,8 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade { throw new StoreException(e); } - } - - } - - } finally { @@ -149,13 +163,51 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade objectsCursor.close(); } } + + storeConfiguredObjectEntry(configuredObjectsDb, txn, virtualHostRecord); txn.commit(); hierarchyDb.close(); configuredObjectsDb.close(); + reportFinished(environment, 8); + } + private int getConfigVersion(Database configVersionDb) + { + Cursor cursor = null; + try + { + cursor = configVersionDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + return IntegerBinding.entryToInt(value); + } + return -1; + } + finally + { + cursor.close(); + } + } - reportFinished(environment, 8); + private void storeConfiguredObjectEntry(Database configuredObjectsDb, final Transaction txn, ConfiguredObjectRecord configuredObject) + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); + uuidBinding.objectToEntry(configuredObject.getId(), key); + + DatabaseEntry value = new DatabaseEntry(); + ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); + + configuredObjectBinding.objectToEntry(configuredObject, value); + OperationStatus status = configuredObjectsDb.put(txn, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error writing configured object " + configuredObject + " to database: " + + status); + } } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java new file mode 100644 index 0000000000..57adc5afe4 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java @@ -0,0 +1,367 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.upgrade; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; + +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.util.MapJsonSerializer; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.Transaction; + +public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase +{ + private static final String ARGUMENTS = "arguments"; + + private static final String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; + private static final String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY"; + + @Override + public VirtualHost<?,?,?> getVirtualHost() + { + VirtualHost<?,?,?> virtualHost = mock(VirtualHost.class); + when(virtualHost.getName()).thenReturn("test"); + return virtualHost; + } + + @Override + protected String getStoreDirectoryName() + { + return "bdbstore-v7"; + } + + public void testPerformUpgrade() throws Exception + { + UpgradeFrom7To8 upgrade = new UpgradeFrom7To8(); + upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost()); + + assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 7); + assertDatabaseRecordCount(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, 9); + + assertConfiguredObjects(); + assertConfiguredObjectHierarchy(); + } + + + private void assertConfiguredObjectHierarchy() + { + Map<UpgradeHierarchyKey, UUID> hierarchy = loadConfiguredObjectHierarchy(); + assertEquals("Unexpected number of configured objects", 9, hierarchy.size()); + + UUID vhUuid = UUIDGenerator.generateVhostUUID(getVirtualHost().getName()); + UUID myExchUuid = UUIDGenerator.generateExchangeUUID("myexch", getVirtualHost().getName()); + UUID amqDirectUuid = UUIDGenerator.generateExchangeUUID("amq.direct", getVirtualHost().getName()); + UUID queue1Uuid = UUIDGenerator.generateQueueUUID("queue1", getVirtualHost().getName()); + UUID queue1ToAmqDirectBindingUuid = UUIDGenerator.generateBindingUUID("amq.direct", "queue1", "queue1", getVirtualHost().getName()); + + // myexch -> virtualhost + UpgradeHierarchyKey myExchToVhParent = new UpgradeHierarchyKey(myExchUuid, VirtualHost.class.getSimpleName()); + assertExpectedHierarchyEntry(hierarchy, myExchToVhParent, vhUuid); + + // queue1 -> virtualhost + UpgradeHierarchyKey queue1ToVhParent = new UpgradeHierarchyKey(queue1Uuid, VirtualHost.class.getSimpleName()); + assertExpectedHierarchyEntry(hierarchy, queue1ToVhParent, vhUuid); + + // ! amq.direct -> virtualhost (This will change when the upgrader is changed to create the default exchanges) + UpgradeHierarchyKey amqDirectToVhParent = new UpgradeHierarchyKey(amqDirectUuid, VirtualHost.class.getSimpleName()); + assertFalse("amq.direct should not have a binding to virtualhost", hierarchy.containsKey(amqDirectToVhParent)); + + // queue1binding -> amq.direct + // queue1binding -> queue1 + UpgradeHierarchyKey queue1BindingToAmqDirect = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Exchange.class.getSimpleName()); + UpgradeHierarchyKey queue1BindingToQueue1 = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Queue.class.getSimpleName()); + assertExpectedHierarchyEntry(hierarchy, queue1BindingToAmqDirect, amqDirectUuid); + assertExpectedHierarchyEntry(hierarchy, queue1BindingToQueue1, queue1Uuid); + } + + private void assertExpectedHierarchyEntry( + Map<UpgradeHierarchyKey, UUID> hierarchy, + UpgradeHierarchyKey childHierarchyKey, UUID parentUUID) + { + assertTrue("Expected hierarchy entry does not exist", hierarchy.containsKey(childHierarchyKey)); + assertEquals("Expected hierarchy entry does not exist", parentUUID, hierarchy.get(childHierarchyKey)); + } + + + private void assertConfiguredObjects() + { + Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); + assertEquals("Unexpected number of configured objects", 7, configuredObjects.size()); + + Map<UUID, Map<String, Object>> expected = new HashMap<UUID, Map<String, Object>>(); + + String configVersion = "0.3"; + expected.putAll(createExpectedVirtualHost(configVersion)); + + expected.putAll(createExpectedQueue("queue1", Boolean.FALSE, null, null)); + expected.putAll(createExpectedQueue("queue2", Boolean.FALSE, null, null)); + + expected.putAll(createExpectedExchangeMap("myexch", "direct")); + + expected.putAll(createExpectedBindingMap("queue1", "queue1", "amq.direct", null)); + expected.putAll(createExpectedBindingMap("queue1", "queue1", "myexch", null)); + expected.putAll(createExpectedBindingMap("queue2", "queue2", "amq.fanout", null)); + + MapJsonSerializer jsonSerializer = new MapJsonSerializer(); + for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet()) + { + UpgradeConfiguredObjectRecord object = entry.getValue(); + + UUID actualKey = entry.getKey(); + String actualType = object.getType(); + String actualJson = object.getAttributes(); + Map<String, Object> actualDeserializedAttributes = jsonSerializer.deserialize(actualJson); + + assertTrue("Entry UUID " + actualKey + " of type " + actualType + " is unexpected", expected.containsKey(actualKey)); + + Map<String, Object> expectedDeserializedAttributes = expected.get(actualKey); + + assertEquals("Entry UUID " + actualKey + " of type " + actualType + " has uenxpected deserialised value, json was: " + actualJson, + expectedDeserializedAttributes, actualDeserializedAttributes); + } + } + + private Map<UUID, Map<String, Object>> createExpectedVirtualHost(String modelVersion) + { + Map<String, Object> expectedVirtualHostEntry = new HashMap<String, Object>(); + expectedVirtualHostEntry.put("modelVersion", modelVersion); + + UUID expectedUUID = UUIDGenerator.generateVhostUUID(getVirtualHost().getName()); + return Collections.singletonMap(expectedUUID, expectedVirtualHostEntry); + } + + private Map<UUID, Map<String, Object>> createExpectedQueue(String queueName, boolean exclusiveFlag, String owner, Map<String, Object> argumentMap) + { + Map<String, Object> expectedQueueEntry = new HashMap<String, Object>(); + expectedQueueEntry.put(Queue.NAME, queueName); + expectedQueueEntry.put(Queue.EXCLUSIVE, exclusiveFlag); + expectedQueueEntry.put(Queue.OWNER, owner); + expectedQueueEntry.put(Queue.TYPE, "standard"); + + if (argumentMap != null) + { + expectedQueueEntry.put(ARGUMENTS, argumentMap); + } + UUID expectedUUID = UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName()); + return Collections.singletonMap(expectedUUID, expectedQueueEntry); + } + + private Map<UUID, Map<String, Object>> createExpectedExchangeMap(String exchangeName, String type) + { + Map<String, Object> expectedExchangeMap = new HashMap<String, Object>(); + expectedExchangeMap.put(Exchange.NAME, exchangeName); + expectedExchangeMap.put(Exchange.TYPE, type); + expectedExchangeMap.put(Exchange.LIFETIME_POLICY, LifetimePolicy.PERMANENT.name()); + UUID expectedUUID = UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()); + return Collections.singletonMap(expectedUUID, expectedExchangeMap); + } + + private Map<UUID, Map<String, Object>> createExpectedBindingMap(String queueName, String bindingName, String exchangeName, Map<String, String> argumentMap) + { + Map<String, Object> expectedBinding = new HashMap<String, Object>(); + expectedBinding.put(Binding.NAME, bindingName); + expectedBinding.put(Binding.ARGUMENTS, argumentMap == null ? Collections.emptyMap() : argumentMap); + + UUID expectedUUID = UUIDGenerator.generateBindingUUID(exchangeName, queueName, bindingName, getVirtualHost().getName()); + return Collections.singletonMap(expectedUUID, expectedBinding); + } + + private Map<UUID, UpgradeConfiguredObjectRecord> loadConfiguredObjects() + { + final Map<UUID, UpgradeConfiguredObjectRecord> configuredObjectsRecords = new HashMap<UUID, UpgradeConfiguredObjectRecord>(); + final UpgradeConfiguredObjectBinding binding = new UpgradeConfiguredObjectBinding(); + final UpgradeUUIDBinding uuidBinding = new UpgradeUUIDBinding(); + CursorOperation configuredObjectsCursor = new CursorOperation() + { + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + UUID id = uuidBinding.entryToObject(key); + UpgradeConfiguredObjectRecord object = binding.entryToObject(value); + configuredObjectsRecords.put(id, object); + } + }; + new DatabaseTemplate(_environment, CONFIGURED_OBJECTS_DB_NAME, null).run(configuredObjectsCursor); + return configuredObjectsRecords; + } + + + private Map<UpgradeHierarchyKey, UUID> loadConfiguredObjectHierarchy() + { + final Map<UpgradeHierarchyKey, UUID> hierarchyRecords = new HashMap<UpgradeHierarchyKey, UUID>(); + final UpgradeHierarchyKeyBinding hierarchyKeyBinding = new UpgradeHierarchyKeyBinding(); + final UpgradeUUIDBinding uuidParentBinding = new UpgradeUUIDBinding(); + CursorOperation hierarchyCursor = new CursorOperation() + { + @Override + public void processEntry(Database sourceDatabase, Database targetDatabase, Transaction transaction, + DatabaseEntry key, DatabaseEntry value) + { + UpgradeHierarchyKey hierarchyKey = hierarchyKeyBinding.entryToObject(key); + UUID parentId = uuidParentBinding.entryToObject(value); + hierarchyRecords.put(hierarchyKey, parentId); + } + }; + new DatabaseTemplate(_environment, CONFIGURED_OBJECT_HIERARCHY_DB_NAME, null).run(hierarchyCursor); + return hierarchyRecords; + } + + private static class UpgradeConfiguredObjectBinding extends TupleBinding<UpgradeConfiguredObjectRecord> + { + @Override + public UpgradeConfiguredObjectRecord entryToObject(TupleInput tupleInput) + { + String type = tupleInput.readString(); + String json = tupleInput.readString(); + UpgradeConfiguredObjectRecord configuredObject = new UpgradeConfiguredObjectRecord(type, json); + return configuredObject; + } + + @Override + public void objectToEntry(UpgradeConfiguredObjectRecord object, TupleOutput tupleOutput) + { + throw new UnsupportedOperationException(); + } + } + + private static class UpgradeConfiguredObjectRecord + { + private final String _attributes; + private final String _type; + + public UpgradeConfiguredObjectRecord(String type, String attributes) + { + super(); + _attributes = attributes; + _type = type; + } + + public String getAttributes() + { + return _attributes; + } + + public String getType() + { + return _type; + } + + } + + private static class UpgradeUUIDBinding extends TupleBinding<UUID> + { + @Override + public UUID entryToObject(final TupleInput tupleInput) + { + return new UUID(tupleInput.readLong(), tupleInput.readLong()); + } + + @Override + public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput) + { + throw new UnsupportedOperationException(); + } + } + + private static class UpgradeHierarchyKeyBinding extends TupleBinding<UpgradeHierarchyKey> + { + @Override + public UpgradeHierarchyKey entryToObject(TupleInput tupleInput) + { + UUID childId = new UUID(tupleInput.readLong(), tupleInput.readLong()); + String parentType = tupleInput.readString(); + + return new UpgradeHierarchyKey(childId, parentType); + } + + @Override + public void objectToEntry(UpgradeHierarchyKey hk, TupleOutput tupleOutput) + { + throw new UnsupportedOperationException(); + } + } + + private static class UpgradeHierarchyKey + { + private final UUID _childId; + private final String _parentType; + + public UpgradeHierarchyKey(final UUID childId, final String parentType) + { + _childId = childId; + _parentType = parentType; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final UpgradeHierarchyKey that = (UpgradeHierarchyKey) o; + + if (!_childId.equals(that._childId)) + { + return false; + } + if (!_parentType.equals(that._parentType)) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = _childId.hashCode(); + result = 31 * result + _parentType.hashCode(); + return result; + } + + } + +} diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/readme.txt b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/readme.txt new file mode 100644 index 0000000000..efb929c944 --- /dev/null +++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/readme.txt @@ -0,0 +1,6 @@ +The bdbstore v7 data was obtained by running 0.26 and: + +* creating an exchange 'myexch' of type direct +* creating queues 'queue1' and 'queue2' +* binding 'queue1' to 'myexch' and 'amq.direct' using binding key 'queue1' +* binding 'queue2' to amq.fanout only using binding key 'queue2'
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/test-store/00000000.jdb Binary files differnew file mode 100644 index 0000000000..4957f86e1a --- /dev/null +++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v7/test-store/00000000.jdb diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java index e82a92bb83..c900ea047d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/BrokerStoreUpgrader.java @@ -601,7 +601,6 @@ public class BrokerStoreUpgrader private DurableConfigurationStoreUpgrader _upgrader; private DurableConfigurationStore _store; private final Map<UUID, ConfiguredObjectRecord> _records = new HashMap<UUID, ConfiguredObjectRecord>(); - private int _version; private final SystemContext _systemContext; private BrokerStoreRecoveryHandler(final SystemContext systemContext, DurableConfigurationStore store) @@ -612,9 +611,8 @@ public class BrokerStoreUpgrader @Override - public void begin(final int configVersion) + public void begin() { - _version = configVersion; } @Override @@ -625,7 +623,7 @@ public class BrokerStoreUpgrader } @Override - public int end() + public void end() { String version = getCurrentVersion(); @@ -748,7 +746,6 @@ public class BrokerStoreUpgrader }); - return _version; } private void applyRecursively(final ConfiguredObject<?> object, final Action<ConfiguredObject<?>> action) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java index 59f248c9f5..d4409e666e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java @@ -126,11 +126,9 @@ public class JsonConfigurationEntryStore extends MemoryConfigurationEntryStore final Collection<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(); final ConfiguredObjectRecordHandler replayHandler = new ConfiguredObjectRecordHandler() { - private int _configVersion; @Override - public void begin(final int configVersion) + public void begin() { - _configVersion = configVersion; } @Override @@ -141,9 +139,8 @@ public class JsonConfigurationEntryStore extends MemoryConfigurationEntryStore } @Override - public int end() + public void end() { - return _configVersion; } }; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java index 88817be972..a17bbbff47 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandler.java @@ -89,9 +89,8 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore private boolean _quiesceHttpPort = _options.getManagementModeHttpPortOverride() > 0; @Override - public void begin(final int configVersion) + public void begin() { - _version = configVersion; } @Override @@ -160,9 +159,8 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore @Override - public int end() + public void end() { - return _version; } }; @@ -186,7 +184,7 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore { - recoveryHandler.begin(0); + recoveryHandler.begin(); for(ConfiguredObjectRecord record : _records.values()) { @@ -366,7 +364,7 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore _store.visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler() { @Override - public void begin(final int configVersion) + public void begin() { } @@ -428,9 +426,8 @@ public class ManagementModeStoreHandler implements DurableConfigurationStore @Override - public int end() + public void end() { - return 0; } }); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java index b3f91d7ee1..e6e4d0052b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java @@ -124,11 +124,9 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore final Collection<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(); final ConfiguredObjectRecordHandler replayHandler = new ConfiguredObjectRecordHandler() { - private int _configVersion; @Override - public void begin(final int configVersion) + public void begin() { - _configVersion = configVersion; } @Override @@ -139,9 +137,8 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore } @Override - public int end() + public void end() { - return _configVersion; } }; @@ -363,7 +360,7 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore public void visitConfiguredObjectRecords(final ConfiguredObjectRecordHandler recoveryHandler) throws StoreException { - recoveryHandler.begin(0); + recoveryHandler.begin(); final Map<UUID,Map<String,UUID>> parentMap = new HashMap<UUID, Map<String, UUID>>(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 33ccee8e71..d7b4868fb4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -49,8 +49,6 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, String CONFIGURATION_STORE_SETTINGS = "configurationStoreSettings"; String MESSAGE_STORE_SETTINGS = "messageStoreSettings"; - int CURRENT_CONFIG_VERSION = 5; - @ManagedAttribute Collection<String> getSupportedExchangeTypes(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 6be5460d5f..6ff959fad8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Model; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; @@ -84,7 +86,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private static final int DEFAULT_CONFIG_VERSION = 0; - public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME)); + public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME)); public static final Set<String> MESSAGE_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(DB_VERSION_TABLE_NAME, META_DATA_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, @@ -100,10 +102,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?"; - private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )"; - private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )"; private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME; - private static final String UPDATE_CONFIG_VERSION = "UPDATE " + CONFIGURATION_VERSION_TABLE_NAME + " SET version = ?"; + private static final String DROP_CONFIG_VERSION_TABLE = "DROP TABLE "+ CONFIGURATION_VERSION_TABLE_NAME; private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)"; @@ -230,16 +230,9 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC try { - int configVersion = getConfigVersion(); - - handler.begin(configVersion); + handler.begin(); doVisitAllConfiguredObjectRecords(handler); - - int newConfigVersion = handler.end(); - if(newConfigVersion != configVersion) - { - setConfigVersion(newConfigVersion); - } + handler.end(); } catch (SQLException e) { @@ -470,6 +463,32 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC Connection connection = newConnection(); try { + UUID virtualHostId = UUIDGenerator.generateVhostUUID(parent.getName()); + + String stringifiedConfigVersion = Model.MODEL_VERSION; + boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection); + if(tableExists) + { + int configVersion = getConfigVersion(connection); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Upgrader read existing config version " + configVersion); + } + + stringifiedConfigVersion = "0." + configVersion; + } + + Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + virtualHostAttributes.put("modelVersion", stringifiedConfigVersion); + + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes); + insertConfiguredObject(configuredObject, connection); + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Upgrader created VirtualHost configuration entry with config version " + stringifiedConfigVersion); + } + Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>(); List<UUID> others = new ArrayList<UUID>(); final ObjectMapper objectMapper = new ObjectMapper(); @@ -525,7 +544,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { stmt.setString(1, id.toString()); stmt.setString(2, "VirtualHost"); - stmt.setString(3, parent.getId().toString()); + stmt.setString(3, virtualHostId.toString()); stmt.execute(); } for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet()) @@ -586,6 +605,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC stmt.close(); } connection.commit(); + + if (tableExists) + { + dropConfigVersionTable(connection); + } } finally { @@ -643,7 +667,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { Connection conn = newAutoCommitConnection(); - createConfigVersionTable(conn); createConfiguredObjectsTable(conn); createConfiguredObjectHierarchyTable(conn); @@ -677,30 +700,19 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - private void createConfigVersionTable(final Connection conn) throws SQLException + private void dropConfigVersionTable(final Connection conn) throws SQLException { if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { - stmt.execute(CREATE_CONFIG_VERSION_TABLE); + stmt.execute(DROP_CONFIG_VERSION_TABLE); } finally { stmt.close(); } - - PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_CONFIG_VERSION); - try - { - pstmt.setInt(1, DEFAULT_CONFIG_VERSION); - pstmt.execute(); - } - finally - { - pstmt.close(); - } } } @@ -872,63 +884,30 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - private void setConfigVersion(int version) throws SQLException - { - Connection conn = newAutoCommitConnection(); - try - { - - PreparedStatement stmt = conn.prepareStatement(UPDATE_CONFIG_VERSION); - try - { - stmt.setInt(1, version); - stmt.execute(); - - } - finally - { - stmt.close(); - } - } - finally - { - conn.close(); - } - } - - private int getConfigVersion() throws SQLException + private int getConfigVersion(Connection conn) throws SQLException { - Connection conn = newAutoCommitConnection(); + Statement stmt = conn.createStatement(); try { - - Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION); try { - ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION); - try - { - if(rs.next()) - { - return rs.getInt(1); - } - return DEFAULT_CONFIG_VERSION; - } - finally + if(rs.next()) { - rs.close(); + return rs.getInt(1); } - + return DEFAULT_CONFIG_VERSION; } finally { - stmt.close(); + rs.close(); } + } finally { - conn.close(); + stmt.close(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java index 99785c48a9..63413fc0de 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java @@ -261,7 +261,7 @@ abstract class AbstractMemoryMessageStore implements MessageStore, DurableConfig @Override public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException { - handler.begin(VirtualHost.CURRENT_CONFIG_VERSION); + handler.begin(); for (ConfiguredObjectRecord record : _configuredObjectRecords.values()) { if (!handler.handle(record)) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index c8aef92a95..81d3f7f9da 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.server.store; -import java.util.Map; -import java.util.UUID; public interface ConfigurationRecoveryHandler { - void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion); + void beginConfigurationRecovery(DurableConfigurationStore store); void configuredObject(ConfiguredObjectRecord object); @@ -33,6 +31,6 @@ public interface ConfigurationRecoveryHandler * * @return the model version of the configuration */ - int completeConfigurationRecovery(); + String completeConfigurationRecovery(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java index 2cadb8ac4c..e9e6d88f36 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java @@ -39,9 +39,9 @@ public class ConfiguredObjectRecordRecoveverAndUpgrader implements ConfiguredObj } @Override - public void begin(int configVersion) + public void begin() { - _configRecoverer.beginConfigurationRecovery(_store, configVersion); + _configRecoverer.beginConfigurationRecovery(_store); } @Override @@ -52,9 +52,9 @@ public class ConfiguredObjectRecordRecoveverAndUpgrader implements ConfiguredObj } @Override - public int end() + public void end() { - return _configRecoverer.completeConfigurationRecovery(); + _configRecoverer.completeConfigurationRecovery(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java index 5975bf58b3..f8a8741a37 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java @@ -33,8 +33,7 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; - -import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; +import org.apache.qpid.server.model.Model; public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandler { @@ -44,6 +43,7 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl private final Map<String, Map<UUID, UnresolvedObject>> _unresolvedObjects = new HashMap<String, Map<UUID, UnresolvedObject>>(); + private final List<ConfiguredObjectRecord> _records = new ArrayList<ConfiguredObjectRecord>(); private final Map<String, Map<UUID, List<DependencyListener>>> _dependencyListeners = new HashMap<String, Map<UUID, List<DependencyListener>>>(); @@ -69,19 +69,56 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl } @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void beginConfigurationRecovery(final DurableConfigurationStore store) { _logSubject = new MessageStoreLogSubject(_name, store.getClass().getSimpleName()); _store = store; - _upgrader = _upgraderProvider.getUpgrader(configVersion, this); _eventLogger.message(_logSubject, ConfigStoreMessages.RECOVERY_START()); } @Override public void configuredObject(ConfiguredObjectRecord record) { - _upgrader.configuredObject(record); + _records.add(record); + } + + @Override + public String completeConfigurationRecovery() + { + String configVersion = getConfigVersionFromRecords(); + + _upgrader = _upgraderProvider.getUpgrader(configVersion, this); + + for (ConfiguredObjectRecord record : _records) + { + // We don't yet recover the VirtualHost record. + if (!"VirtualHost".equals(record.getType())) + { + _upgrader.configuredObject(record); + } + } + _upgrader.complete(); + checkUnresolvedDependencies(); + applyUpgrade(); + + _eventLogger.message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE()); + return Model.MODEL_VERSION; + } + + private String getConfigVersionFromRecords() + { + String configVersion = Model.MODEL_VERSION; + for (ConfiguredObjectRecord record : _records) + { + if ("VirtualHost".equals(record.getType())) + { + configVersion = (String) record.getAttributes().get("modelVersion"); + _logger.debug("Confifuration has config version : " + configVersion); + break; + } + } + return configVersion; } void onConfiguredObject(ConfiguredObjectRecord record) @@ -94,23 +131,13 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl recoverer.load(this, record); } + private DurableConfiguredObjectRecoverer getRecoverer(final String type) { DurableConfiguredObjectRecoverer recoverer = _recoverers.get(type); return recoverer; } - @Override - public int completeConfigurationRecovery() - { - _upgrader.complete(); - checkUnresolvedDependencies(); - applyUpgrade(); - - _eventLogger.message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE()); - return CURRENT_CONFIG_VERSION; - } - private void applyUpgrade() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java index a5ace16cfa..1cf1d1a36d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java @@ -100,7 +100,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore @Override public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) { - handler.begin(_configVersion); + handler.begin(); List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values()); for(ConfiguredObjectRecord record : records) { @@ -110,12 +110,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore break; } } - int oldConfigVersion = _configVersion; - _configVersion = handler.end(); - if(oldConfigVersion != _configVersion) - { - save(); - } + handler.end(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderProvider.java index c2ea0745ff..9785be78a6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderProvider.java @@ -22,5 +22,5 @@ package org.apache.qpid.server.store; public interface UpgraderProvider { - DurableConfigurationStoreUpgrader getUpgrader(int configVersion, DurableConfigurationRecoverer recoverer); + DurableConfigurationStoreUpgrader getUpgrader(String configVersion, DurableConfigurationRecoverer recoverer); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java index 747c735ff1..f251a442c7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java @@ -24,8 +24,7 @@ import org.apache.qpid.server.store.ConfiguredObjectRecord; public interface ConfiguredObjectRecordHandler { - // TODO configVersion argument will be removed. - void begin(int configVersion); + void begin(); /** * Handles the given record. @@ -35,7 +34,5 @@ public interface ConfiguredObjectRecordHandler */ boolean handle(ConfiguredObjectRecord record); - //TODO: return should be void - // temporarily returning new config version - int end(); + void end(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index 8c3ebaf9be..fe2f867e41 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.virtualhost; -import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; - import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -34,6 +32,7 @@ import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.QueueArgumentsConverter; @@ -76,14 +75,16 @@ public class DefaultUpgraderProvider implements UpgraderProvider _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds); } - public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer) + public DurableConfigurationStoreUpgrader getUpgrader(final String configVersion, DurableConfigurationRecoverer recoverer) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Getting upgrader for configVersion: " + configVersion); } DurableConfigurationStoreUpgrader currentUpgrader = null; - switch(configVersion) + + int conigVersionAsInteger = Integer.parseInt(configVersion.replace(".", "")); + switch(conigVersionAsInteger) { case 0: currentUpgrader = addUpgrader(currentUpgrader, new Version0Upgrader()); @@ -95,7 +96,7 @@ public class DefaultUpgraderProvider implements UpgraderProvider currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader()); case 4: currentUpgrader = addUpgrader(currentUpgrader, new Version4Upgrader()); - case CURRENT_CONFIG_VERSION: + case (Model.MODEL_MAJOR_VERSION * 10) + Model.MODEL_MINOR_VERSION: currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer)); break; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java index 0fe116570a..7859a4110b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java @@ -471,9 +471,8 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase private int _version; @Override - public void begin(final int configVersion) + public void begin() { - _version = configVersion; } @Override @@ -488,9 +487,8 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } @Override - public int end() + public void end() { - return _version; } public ConfiguredObjectRecord getBrokerRecord() @@ -503,7 +501,6 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase { private final UUID _id; private ConfiguredObjectRecord _foundRecord; - private int _version; private RecordFinder(final UUID id) { @@ -511,9 +508,8 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } @Override - public void begin(final int configVersion) + public void begin() { - _version = configVersion; } @Override @@ -528,9 +524,8 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } @Override - public int end() + public void end() { - return _version; } public ConfiguredObjectRecord getFoundRecord() @@ -543,7 +538,6 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase { private final Collection<UUID> _childIds = new HashSet<UUID>(); private final ConfiguredObjectRecord _parent; - private int _version; private ChildFinder(final ConfiguredObjectRecord parent) { @@ -551,9 +545,8 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } @Override - public void begin(final int configVersion) + public void begin() { - _version = configVersion; } @Override @@ -575,9 +568,8 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } @Override - public int end() + public void end() { - return _version; } public Collection<UUID> getChildIds() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java index 6907898a6c..cdaab22fed 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java @@ -121,35 +121,14 @@ public class JsonFileConfigStoreTest extends QpidTestCase { _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); _store.visitConfiguredObjectRecords(_handler); + InOrder inorder = inOrder(_handler); - inorder.verify(_handler).begin(eq(0)); + inorder.verify(_handler).begin(); inorder.verify(_handler,never()).handle(any(ConfiguredObjectRecord.class)); inorder.verify(_handler).end(); _store.closeConfigurationStore(); } - public void testUpdatedConfigVersionIsRetained() throws Exception - { - final int NEW_CONFIG_VERSION = 42; - when(_handler.end()).thenReturn(NEW_CONFIG_VERSION); - - _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.visitConfiguredObjectRecords(_handler); - _store.closeConfigurationStore(); - - _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.visitConfiguredObjectRecords(_handler); - InOrder inorder = inOrder(_handler); - - // first time the config version should be the initial version - 0 - inorder.verify(_handler).begin(eq(0)); - - // second time the config version should be the updated version - inorder.verify(_handler).begin(eq(NEW_CONFIG_VERSION)); - - _store.closeConfigurationStore(); - } - public void testCreateObject() throws Exception { _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 5d5856bf4a..e6b57d8039 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -41,8 +41,10 @@ import org.apache.qpid.server.exchange.FanoutExchange; import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueFactory; @@ -63,11 +65,11 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; public class DurableConfigurationRecovererTest extends QpidTestCase { private static final String VIRTUAL_HOST_NAME = "test"; + private static final UUID VIRTUAL_HOST_ID = UUID.randomUUID(); private static final UUID QUEUE_ID = new UUID(0,0); private static final UUID TOPIC_EXCHANGE_ID = UUIDGenerator.generateExchangeUUID(TopicExchange.TYPE.getDefaultExchangeName(), VIRTUAL_HOST_NAME); private static final UUID DIRECT_EXCHANGE_ID = UUIDGenerator.generateExchangeUUID(DirectExchange.TYPE.getDefaultExchangeName(), VIRTUAL_HOST_NAME); @@ -205,19 +207,22 @@ public class DurableConfigurationRecovererTest extends QpidTestCase public void testUpgradeEmptyStore() throws Exception { - _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store); assertEquals("Did not upgrade to the expected version", - CURRENT_CONFIG_VERSION, + Model.MODEL_VERSION, _durableConfigurationRecoverer.completeConfigurationRecovery()); } public void testUpgradeNewerStoreFails() throws Exception { + String bumpedModelVersion = Model.MODEL_MAJOR_VERSION + "." + (Model.MODEL_MINOR_VERSION + 1); try { - _durableConfigurationRecoverer.beginConfigurationRecovery(_store, CURRENT_CONFIG_VERSION + 1); - _durableConfigurationRecoverer.completeConfigurationRecovery(); - fail("Should not be able to start when config model is newer than current"); + + _durableConfigurationRecoverer.beginConfigurationRecovery(_store); + _durableConfigurationRecoverer.configuredObject(getVirtualHostModelRecord(bumpedModelVersion)); + String newVersion = _durableConfigurationRecoverer.completeConfigurationRecovery(); + fail("Should not be able to start when config model is newer than current. Actually upgraded to " + newVersion); } catch (IllegalStateException e) { @@ -225,10 +230,20 @@ public class DurableConfigurationRecovererTest extends QpidTestCase } } + private ConfiguredObjectRecordImpl getVirtualHostModelRecord( + String modelVersion) + { + ConfiguredObjectRecordImpl virtualHostRecord = new ConfiguredObjectRecordImpl(VIRTUAL_HOST_ID, + VirtualHost.class.getSimpleName(), + Collections.<String,Object>singletonMap("modelVersion", modelVersion)); + return virtualHostRecord; + } + public void testUpgradeRemovesBindingsToNonTopicExchanges() throws Exception { - _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store); + _durableConfigurationRecoverer.configuredObject(getVirtualHostModelRecord("0.0")); _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0), "org.apache.qpid.server.model.Binding", @@ -252,7 +267,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase public void testUpgradeOnlyRemovesSelectorBindings() throws Exception { - _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store); + _durableConfigurationRecoverer.configuredObject(getVirtualHostModelRecord("0.0")); _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0), "org.apache.qpid.server.model.Binding", @@ -336,7 +352,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase public void testUpgradeKeepsBindingsToTopicExchanges() throws Exception { - _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store); + _durableConfigurationRecoverer.configuredObject(getVirtualHostModelRecord("0.0")); _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0), "org.apache.qpid.server.model.Binding", @@ -358,7 +375,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase public void testUpgradeDoesNotRecur() throws Exception { - _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store); + _durableConfigurationRecoverer.configuredObject(getVirtualHostModelRecord("0.0")); _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0), "Binding", @@ -375,7 +393,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase public void testFailsWithUnresolvedObjects() { - _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store); _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(new UUID(1, 0), @@ -400,7 +418,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase public void testFailsWithUnknownObjectType() { - _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store); try @@ -468,7 +486,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase } }); - _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + _durableConfigurationRecoverer.beginConfigurationRecovery(_store); _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(queueId, Queue.class.getSimpleName(), createQueue("testQueue", exchangeId))); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java index 99c0e351d4..fd62ce75b9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java @@ -241,7 +241,6 @@ public class TestBrokerConfiguration private final Class<? extends ConfiguredObject> _category; private final String _objectName; public ConfiguredObjectRecord _foundRecord; - private int _version; public RecordFindingVisitor(final Class<? extends ConfiguredObject> category, final String objectName) { @@ -250,9 +249,8 @@ public class TestBrokerConfiguration } @Override - public void begin(final int configVersion) + public void begin() { - _version = configVersion; } @Override @@ -269,9 +267,8 @@ public class TestBrokerConfiguration } @Override - public int end() + public void end() { - return _version; } public ConfiguredObjectRecord getFoundRecord() |
