diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-19 21:36:14 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-19 21:36:14 +0000 |
| commit | d2f3caa7409b95cfcd93de3057b07eecf8b12548 (patch) | |
| tree | fb14d5c73ce6f1d3c24d0f486ee8d7376386aded | |
| parent | 416d56e96fe84fa6d99b2301b48d3071b4fe4700 (diff) | |
| download | qpid-python-d2f3caa7409b95cfcd93de3057b07eecf8b12548.tar.gz | |
QPID-5009 : Update broker store to revision 7
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1505029 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 282 insertions, 11 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index e2cbabef65..f07e5b4312 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -66,7 +66,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private static final int LOCK_RETRY_ATTEMPTS = 5; - public static final int VERSION = 6; + public static final int VERSION = 7; private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() {{ diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java new file mode 100644 index 0000000000..e318c3586e --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.upgrade; + +import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Environment; +import com.sleepycat.je.OperationStatus; +import org.apache.qpid.AMQStoreException; + +public class UpgradeFrom6To7 extends AbstractStoreUpgrade +{ + + private static final int DEFAULT_CONFIG_VERSION = 0; + + @Override + public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName) + throws DatabaseException, AMQStoreException + { + reportStarting(environment, 6); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + + Database versionDb = environment.openDatabase(null, "CONFIG_VERSION", dbConfig); + + if(versionDb.count() == 0L) + { + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + IntegerBinding.intToEntry(DEFAULT_CONFIG_VERSION, value); + ByteBinding.byteToEntry((byte) 0, key); + OperationStatus status = versionDb.put(null, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error initialising config version: " + status); + } + } + + versionDb.close(); + + reportFinished(environment, 7); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java index f1ab012efc..e769bfae81 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.store.berkeleydb.upgrade; +import com.sleepycat.je.Cursor; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -73,7 +74,12 @@ public class Upgrader } int version = getSourceVersion(versionDb); - + if(version > AbstractBDBMessageStore.VERSION) + { + throw new AMQStoreException("Database version " + version + + " is higher than the most recent known version: " + + AbstractBDBMessageStore.VERSION); + } performUpgradeFromVersion(version, versionDb); } finally @@ -87,19 +93,34 @@ public class Upgrader int getSourceVersion(Database versionDb) { - int version = AbstractBDBMessageStore.VERSION + 1; - OperationStatus result; + int version = -1; - do + Cursor cursor = null; + try { - version--; + cursor = versionDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); - IntegerBinding.intToEntry(version, key); DatabaseEntry value = new DatabaseEntry(); - result = versionDb.get(null, key, value, LockMode.READ_COMMITTED); + while(cursor.getNext(key, value, null) == OperationStatus.SUCCESS) + { + int ver = IntegerBinding.entryToInt(key); + if(ver > version) + { + version = ver; + } + } + } + finally + { + if(cursor != null) + { + cursor.close(); + } } - while(result == OperationStatus.NOTFOUND); + + return version; } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java new file mode 100644 index 0000000000..75717120b3 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.upgrade; + +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.Cursor; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.Transaction; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; +import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; + +public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase +{ + private Upgrader _upgrader; + + @Override + protected String getStoreDirectoryName() + { + return "bdbstore-v999"; + } + + @Override + public void setUp() throws Exception + { + super.setUp(); + _upgrader = new Upgrader(_environment, getVirtualHostName()); + } + + private int getStoreVersion() + { + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + int storeVersion = -1; + Database versionDb = null; + Cursor cursor = null; + try + { + versionDb = _environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig); + cursor = versionDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) + { + int version = IntegerBinding.entryToInt(key); + if (storeVersion < version) + { + storeVersion = version; + } + } + } + finally + { + if (cursor != null) + { + cursor.close(); + } + if (versionDb != null) + { + versionDb.close(); + } + } + return storeVersion; + } + + public void testUpgrade() throws Exception + { + assertEquals("Unexpected store version", 999, getStoreVersion()); + try + { + _upgrader.upgradeIfNecessary(); + fail("Store should not be able to be upgraded"); + } + catch(AMQStoreException ex) + { + assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: " + + AbstractBDBMessageStore.VERSION, ex.getMessage()); + } + } + +} diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb Binary files differnew file mode 100644 index 0000000000..991367019f --- /dev/null +++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index e97c0d662d..ae6b4b0154 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -74,13 +74,16 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME, XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME }; - private static final int DB_VERSION = 6; + private static final int DB_VERSION = 7; private final AtomicLong _messageId = new AtomicLong(0); private AtomicBoolean _closed = new AtomicBoolean(false); private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )"; private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )"; + private static final String SELECT_FROM_DB_VERSION = "SELECT version FROM " + DB_VERSION_TABLE_NAME; + private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?"; + private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )"; private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )"; @@ -208,11 +211,83 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } private void commonConfiguration(String name, VirtualHost virtualHost) - throws ClassNotFoundException, SQLException + throws ClassNotFoundException, SQLException, AMQStoreException { implementationSpecificConfiguration(name, virtualHost); createOrOpenDatabase(); + upgradeIfNecessary(); + } + + protected void upgradeIfNecessary() throws SQLException, AMQStoreException + { + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement statement = conn.prepareStatement(SELECT_FROM_DB_VERSION); + try + { + ResultSet rs = statement.executeQuery(); + try + { + if(!rs.next()) + { + throw new AMQStoreException(DB_VERSION_TABLE_NAME + " does not contain the database version"); + } + int version = rs.getInt(1); + switch (version) + { + case 6: + upgradeFromV6(); + case DB_VERSION: + return; + default: + throw new AMQStoreException("Unknown database version: " + version); + } + } + finally + { + rs.close(); + } + } + finally + { + statement.close(); + } + } + finally + { + conn.close(); + } + + } + + private void upgradeFromV6() throws SQLException + { + updateDbVersion(7); + } + + private void updateDbVersion(int newVersion) throws SQLException + { + Connection conn = newAutoCommitConnection(); + try + { + + PreparedStatement statement = conn.prepareStatement(UPDATE_DB_VERSION); + try + { + statement.setInt(1,newVersion); + statement.execute(); + } + finally + { + statement.close(); + } + } + finally + { + conn.close(); + } } protected abstract void implementationSpecificConfiguration(String name, |
