summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-19 21:36:14 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-19 21:36:14 +0000
commitd2f3caa7409b95cfcd93de3057b07eecf8b12548 (patch)
treefb14d5c73ce6f1d3c24d0f486ee8d7376386aded
parent416d56e96fe84fa6d99b2301b48d3071b4fe4700 (diff)
downloadqpid-python-d2f3caa7409b95cfcd93de3057b07eecf8b12548.tar.gz
QPID-5009 : Update broker store to revision 7
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1505029 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java66
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java37
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java109
-rw-r--r--qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdbbin0 -> 2576 bytes
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java79
6 files changed, 282 insertions, 11 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index e2cbabef65..f07e5b4312 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -66,7 +66,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private static final int LOCK_RETRY_ATTEMPTS = 5;
- public static final int VERSION = 6;
+ public static final int VERSION = 7;
private static final Map<String, String> ENVCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
{{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java
new file mode 100644
index 0000000000..e318c3586e
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.OperationStatus;
+import org.apache.qpid.AMQStoreException;
+
+public class UpgradeFrom6To7 extends AbstractStoreUpgrade
+{
+
+ private static final int DEFAULT_CONFIG_VERSION = 0;
+
+ @Override
+ public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName)
+ throws DatabaseException, AMQStoreException
+ {
+ reportStarting(environment, 6);
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+
+ Database versionDb = environment.openDatabase(null, "CONFIG_VERSION", dbConfig);
+
+ if(versionDb.count() == 0L)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ IntegerBinding.intToEntry(DEFAULT_CONFIG_VERSION, value);
+ ByteBinding.byteToEntry((byte) 0, key);
+ OperationStatus status = versionDb.put(null, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Error initialising config version: " + status);
+ }
+ }
+
+ versionDb.close();
+
+ reportFinished(environment, 7);
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
index f1ab012efc..e769bfae81 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store.berkeleydb.upgrade;
+import com.sleepycat.je.Cursor;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -73,7 +74,12 @@ public class Upgrader
}
int version = getSourceVersion(versionDb);
-
+ if(version > AbstractBDBMessageStore.VERSION)
+ {
+ throw new AMQStoreException("Database version " + version
+ + " is higher than the most recent known version: "
+ + AbstractBDBMessageStore.VERSION);
+ }
performUpgradeFromVersion(version, versionDb);
}
finally
@@ -87,19 +93,34 @@ public class Upgrader
int getSourceVersion(Database versionDb)
{
- int version = AbstractBDBMessageStore.VERSION + 1;
- OperationStatus result;
+ int version = -1;
- do
+ Cursor cursor = null;
+ try
{
- version--;
+ cursor = versionDb.openCursor(null, null);
+
DatabaseEntry key = new DatabaseEntry();
- IntegerBinding.intToEntry(version, key);
DatabaseEntry value = new DatabaseEntry();
- result = versionDb.get(null, key, value, LockMode.READ_COMMITTED);
+ while(cursor.getNext(key, value, null) == OperationStatus.SUCCESS)
+ {
+ int ver = IntegerBinding.entryToInt(key);
+ if(ver > version)
+ {
+ version = ver;
+ }
+ }
+ }
+ finally
+ {
+ if(cursor != null)
+ {
+ cursor.close();
+ }
}
- while(result == OperationStatus.NOTFOUND);
+
+
return version;
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
new file mode 100644
index 0000000000..75717120b3
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.upgrade;
+
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+
+public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase
+{
+ private Upgrader _upgrader;
+
+ @Override
+ protected String getStoreDirectoryName()
+ {
+ return "bdbstore-v999";
+ }
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _upgrader = new Upgrader(_environment, getVirtualHostName());
+ }
+
+ private int getStoreVersion()
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ int storeVersion = -1;
+ Database versionDb = null;
+ Cursor cursor = null;
+ try
+ {
+ versionDb = _environment.openDatabase(null, Upgrader.VERSION_DB_NAME, dbConfig);
+ cursor = versionDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS)
+ {
+ int version = IntegerBinding.entryToInt(key);
+ if (storeVersion < version)
+ {
+ storeVersion = version;
+ }
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ if (versionDb != null)
+ {
+ versionDb.close();
+ }
+ }
+ return storeVersion;
+ }
+
+ public void testUpgrade() throws Exception
+ {
+ assertEquals("Unexpected store version", 999, getStoreVersion());
+ try
+ {
+ _upgrader.upgradeIfNecessary();
+ fail("Store should not be able to be upgraded");
+ }
+ catch(AMQStoreException ex)
+ {
+ assertEquals("Incorrect exception thrown", "Database version 999 is higher than the most recent known version: "
+ + AbstractBDBMessageStore.VERSION, ex.getMessage());
+ }
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb
new file mode 100644
index 0000000000..991367019f
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v999/test-store/00000000.jdb
Binary files differ
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index e97c0d662d..ae6b4b0154 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -74,13 +74,16 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME,
XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME };
- private static final int DB_VERSION = 6;
+ private static final int DB_VERSION = 7;
private final AtomicLong _messageId = new AtomicLong(0);
private AtomicBoolean _closed = new AtomicBoolean(false);
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+ private static final String SELECT_FROM_DB_VERSION = "SELECT version FROM " + DB_VERSION_TABLE_NAME;
+ private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?";
+
private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )";
private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
@@ -208,11 +211,83 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
private void commonConfiguration(String name, VirtualHost virtualHost)
- throws ClassNotFoundException, SQLException
+ throws ClassNotFoundException, SQLException, AMQStoreException
{
implementationSpecificConfiguration(name, virtualHost);
createOrOpenDatabase();
+ upgradeIfNecessary();
+ }
+
+ protected void upgradeIfNecessary() throws SQLException, AMQStoreException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement statement = conn.prepareStatement(SELECT_FROM_DB_VERSION);
+ try
+ {
+ ResultSet rs = statement.executeQuery();
+ try
+ {
+ if(!rs.next())
+ {
+ throw new AMQStoreException(DB_VERSION_TABLE_NAME + " does not contain the database version");
+ }
+ int version = rs.getInt(1);
+ switch (version)
+ {
+ case 6:
+ upgradeFromV6();
+ case DB_VERSION:
+ return;
+ default:
+ throw new AMQStoreException("Unknown database version: " + version);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ statement.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
+ private void upgradeFromV6() throws SQLException
+ {
+ updateDbVersion(7);
+ }
+
+ private void updateDbVersion(int newVersion) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ PreparedStatement statement = conn.prepareStatement(UPDATE_DB_VERSION);
+ try
+ {
+ statement.setInt(1,newVersion);
+ statement.execute();
+ }
+ finally
+ {
+ statement.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
}
protected abstract void implementationSpecificConfiguration(String name,