diff options
Diffstat (limited to 'java/bdbstore')
44 files changed, 0 insertions, 6974 deletions
diff --git a/java/bdbstore/bin/backup.sh b/java/bdbstore/bin/backup.sh deleted file mode 100755 index 0e2f0fda09..0000000000 --- a/java/bdbstore/bin/backup.sh +++ /dev/null @@ -1,40 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Parse arguments taking all - prefixed args as JAVA_OPTS -for arg in "$@"; do - if [[ $arg == -java:* ]]; then - JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " - else - ARGS="${ARGS}$arg " - fi -done - -WHEREAMI=`dirname $0` -if [ -z "$QPID_HOME" ]; then - export QPID_HOME=`cd $WHEREAMI/../ && pwd` -fi -VERSION=0.13 - -LIBS=$QPID_HOME/lib/je-4.0.103.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar - - -echo "Starting Hot Backup Script" -java -Dlog4j.configuration=backup-log4j.xml ${JAVA_OPTS} -cp $LIBS org.apache.qpid.server.store.berkeleydb.BDBBackup ${ARGS} diff --git a/java/bdbstore/bin/storeUpgrade.sh b/java/bdbstore/bin/storeUpgrade.sh deleted file mode 100755 index 076b9d3f7e..0000000000 --- a/java/bdbstore/bin/storeUpgrade.sh +++ /dev/null @@ -1,43 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Parse arguements taking all - prefixed args as JAVA_OPTS -for arg in "$@"; do - if [[ $arg == -java:* ]]; then - JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " - else - ARGS="${ARGS}$arg " - fi -done - -if [ -z "$QPID_HOME" ]; then - export QPID_HOME=$(dirname $(dirname $(readlink -f $0))) - export PATH=${PATH}:${QPID_HOME}/bin -fi - -if [ -z "$BDB_HOME" ]; then - export BDB_HOME=$(dirname $(dirname $(readlink -f $0))) -fi - -VERSION=0.13 - -LIBS=$BDB_HOME/lib/je-4.0.103.jar:$BDB_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar - -java -Xms256m -Dlog4j.configuration=BDBStoreUpgrade.log4j.xml -Xmx256m -Damqj.logging.level=warn ${JAVA_OPTS} -cp $LIBS org.apache.qpid.server.store.berkeleydb.BDBStoreUpgrade ${ARGS} diff --git a/java/bdbstore/build.xml b/java/bdbstore/build.xml deleted file mode 100644 index 9355358e6c..0000000000 --- a/java/bdbstore/build.xml +++ /dev/null @@ -1,84 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - - or more contributor license agreements. See the NOTICE file - - distributed with this work for additional information - - regarding copyright ownership. The ASF licenses this file - - to you under the Apache License, Version 2.0 (the - - "License"); you may not use this file except in compliance - - with the License. You may obtain a copy of the License at - - - - http://www.apache.org/licenses/LICENSE-2.0 - - - - Unless required by applicable law or agreed to in writing, - - software distributed under the License is distributed on an - - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - - KIND, either express or implied. See the License for the - - specific language governing permissions and limitations - - under the License. - --> -<project name="bdbstore" default="build"> - <property name="module.depends" value="common client management/common broker perftests systests" /> - <property name="module.test.depends" value="test common/test broker/test management/common perftests systests" /> - - <import file="../module.xml" /> - - <property name="bdb.lib.dir" value="${project.root}/lib/bdbstore" /> - <property name="bdb.version" value="4.0.103" /> - <property name="bdb.download.url" value="http://download.oracle.com/maven/com/sleepycat/je/${bdb.version}/je-${bdb.version}.jar" /> - <property name="bdb.jar.file" value="${bdb.lib.dir}/je-${bdb.version}.jar" /> - - <!--check whether the BDB jar is present, possibly after download--> - <target name="check-bdb-jar"> - <available file="${bdb.jar.file}" type="file" property="bdb.jar.available"/> - </target> - - <!--echo that BDB is required if it isnt present, with associated licencing note--> - <target name="bdb-jar-required" depends="bdb-licence-note-optional" unless="bdb.jar.available"> - <echo>The BDB JE library is required to use this optional module. - -The jar file may be downloaded by either: - - Seperately running the following command from the qpid/java/bdbstore dir: ant download-bdb - - OR - - Adding -Ddownload-bdb=true to your regular build command.</echo> - <fail>The BDB JE library was not found</fail> - </target> - - <!--issue BDB licencing note if BDB isnt already present--> - <target name="bdb-licence-note-optional" depends="check-bdb-jar" unless="bdb.jar.available"> - <antcall target="bdb-licence-note"/> - </target> - - <!--issue BDB licencing note--> - <target name="bdb-licence-note"> - <echo>*NOTE* The BDB JE library required by this optional module is licensed under the Sleepycat Licence, which is not compatible with the Apache Licence v2.0. - -For a copy of the Sleepycat Licence, please see: -http://www.oracle.com/technetwork/database/berkeleydb/downloads/jeoslicense-086837.html</echo> - </target> - - <!--check if an inline BDB download was requested with the build--> - <target name="check-request-props" if="download-bdb"> - <antcall target="download-bdb"/> - </target> - - <!--download BDB, with licencing note--> - <target name="download-bdb" depends="bdb-licence-note"> - <mkdir dir="${bdb.lib.dir}"/> - <echo>Downloading BDB JE</echo> - <get src="${bdb.download.url}" dest="${bdb.jar.file}" usetimestamp="true" /> - </target> - - <target name="build" depends="check-request-props, bdb-jar-required, module.build" /> - - <target name="postbuild" depends="copy-store-to-upgrade" /> - - <target name="copy-store-to-upgrade" description="copy the upgrade tool resource folder contents into the build tree"> - <copy todir="${qpid.home}" failonerror="true"> - <fileset dir="src/test/resources/upgrade"/> - </copy> - </target> - -</project> diff --git a/java/bdbstore/etc/scripts/bdbbackuptest.sh b/java/bdbstore/etc/scripts/bdbbackuptest.sh deleted file mode 100755 index 2a0b72e5ad..0000000000 --- a/java/bdbstore/etc/scripts/bdbbackuptest.sh +++ /dev/null @@ -1,44 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -if [ -z "$QPID_HOME" ]; then - export QPID_HOME=$(dirname $(dirname $(readlink -f $0))) - export PATH=${PATH}:${QPID_HOME}/bin -fi - -# Parse arguements taking all - prefixed args as JAVA_OPTS -for arg in "$@"; do - if [[ $arg == -java:* ]]; then - JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " - else - ARGS="${ARGS}$arg " - fi -done - -VERSION=0.5 - -# Set classpath to include Qpid jar with all required jars in manifest -QPID_LIBS=$QPID_HOME/lib/qpid-all.jar:$QPID_HOME/lib/qpid-junit-toolkit-$VERSION.jar:$QPID_HOME/lib/junit-3.8.1.jar:$QPID_HOME/lib/log4j-1.2.12.jar:$QPID_HOME/lib/qpid-systests-$VERSION.jar:$QPID_HOME/lib/qpid-perftests-$VERSION.jar:$QPID_HOME/lib/slf4j-log4j12-1.4.0.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar - -# Set other variables used by the qpid-run script before calling -export JAVA=java JAVA_MEM=-Xmx256m QPID_CLASSPATH=$QPID_LIBS - -. qpid-run -Dlog4j.configuration=perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} org.apache.qpid.server.store.berkeleydb.testclient.BackupTestClient -o $QPID_WORK/results numMessagesToAction=55 ${ARGS} - diff --git a/java/bdbstore/etc/scripts/bdbtest.sh b/java/bdbstore/etc/scripts/bdbtest.sh deleted file mode 100755 index eafdae9710..0000000000 --- a/java/bdbstore/etc/scripts/bdbtest.sh +++ /dev/null @@ -1,43 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -if [ -z "$QPID_HOME" ]; then - export QPID_HOME=$(dirname $(dirname $(readlink -f $0))) - export PATH=${PATH}:${QPID_HOME}/bin -fi - -# Parse arguements taking all - prefixed args as JAVA_OPTS -for arg in "$@"; do - if [[ $arg == -java:* ]]; then - JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " - else - ARGS="${ARGS}$arg " - fi -done - -VERSION=0.5 - -# Set classpath to include Qpid jar with all required jars in manifest -QPID_LIBS=$QPID_HOME/lib/qpid-all.jar:$QPID_HOME/lib/qpid-junit-toolkit-$VERSION.jar:$QPID_HOME/lib/junit-3.8.1.jar:$QPID_HOME/lib/log4j-1.2.12.jar:$QPID_HOME/lib/qpid-systests-$VERSION.jar:$QPID_HOME/lib/qpid-perftests-$VERSION.jar:$QPID_HOME/lib/slf4j-log4j12-1.4.0.jar - -# Set other variables used by the qpid-run script before calling -export JAVA=java JAVA_MEM=-Xmx256m QPID_CLASSPATH=$QPID_LIBS - -. qpid-run -Dlog4j.configuration=perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} org.apache.qpid.ping.PingDurableClient -o $QPID_WORK/results ${ARGS} diff --git a/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml b/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml deleted file mode 100644 index 4d71963ea7..0000000000 --- a/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml +++ /dev/null @@ -1,52 +0,0 @@ -<?xml version="1.0"?> -<!-- - - - - Licensed to the Apache Software Foundation (ASF) under one - - or more contributor license agreements. See the NOTICE file - - distributed with this work for additional information - - regarding copyright ownership. The ASF licenses this file - - to you under the Apache License, Version 2.0 (the - - "License"); you may not use this file except in compliance - - with the License. You may obtain a copy of the License at - - - - http://www.apache.org/licenses/LICENSE-2.0 - - - - Unless required by applicable law or agreed to in writing, - - software distributed under the License is distributed on an - - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - - KIND, either express or implied. See the License for the - - specific language governing permissions and limitations - - under the License. - - - --> -<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> -<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> - - <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender"> - - <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %-5p - %m%n"/> - </layout> - </appender> - - <category name="org.apache.qpid.server.store.berkeleydb.BDBStoreUpgrade"> - <priority value="info"/> - </category> - - <!-- Only show errors from the BDB Store --> - <category name="org.apache.qpid.server.store.berkeleydb.berkeleydb.BDBMessageStore"> - <priority value="error"/> - </category> - - <!-- Provide warnings to standard output --> - <category name="org.apache.qpid"> - <priority value="error"/> - </category> - - <!-- Log all info events to file --> - <root> - <priority value="info"/> - <appender-ref ref="STDOUT"/> - </root> - -</log4j:configuration> diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java deleted file mode 100644 index 8b887b1876..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.qpid.framing.AMQShortString; - -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; - -public class AMQShortStringEncoding -{ - public static AMQShortString readShortString(TupleInput tupleInput) - { - int length = (int) tupleInput.readShort(); - if (length < 0) - { - return null; - } - else - { - byte[] stringBytes = new byte[length]; - tupleInput.readFast(stringBytes); - return new AMQShortString(stringBytes); - } - - } - - public static void writeShortString(AMQShortString shortString, TupleOutput tupleOutput) - { - - if (shortString == null) - { - tupleOutput.writeShort(-1); - } - else - { - tupleOutput.writeShort(shortString.length()); - tupleOutput.writeFast(shortString.getBytes(), 0, shortString.length()); - } - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java deleted file mode 100644 index 81ae315fe2..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQShortString; - -public class AMQShortStringTB extends TupleBinding -{ - private static final Logger _log = Logger.getLogger(AMQShortStringTB.class); - - - public AMQShortStringTB() - { - } - - public Object entryToObject(TupleInput tupleInput) - { - return AMQShortStringEncoding.readShortString(tupleInput); - } - - public void objectToEntry(Object object, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput); - } - -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java deleted file mode 100644 index c515ca5d8e..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.util.DbBackup; - -import org.apache.log4j.Logger; - -import org.apache.qpid.util.CommandLineParser; -import org.apache.qpid.util.FileUtils; - -import java.io.*; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; - -/** - * BDBBackup is a utility for taking hot backups of the current state of a BDB transaction log database. - * - * <p/>This utility makes the following assumptions/performs the following actions: - * - * <p/><ul> <li>The from and to directory locations will already exist. This scripts does not create them. <li>If this - * script fails to complete in one minute it will terminate. <li>This script always exits with code 1 on error, code 0 - * on success (standard unix convention). <li>This script will log out at info level, when it starts and ends and a list - * of all files backed up. <li>This script logs all errors at error level. <li>This script does not perform regular - * backups, wrap its calling script in a cron job or similar to do this. </ul> - * - * <p/>This utility is build around the BDB provided backup helper utility class, DbBackup. This utility class provides - * an ability to force BDB to stop writing to the current log file set, whilst the backup is taken, to ensure that a - * consistent snapshot is acquired. Preventing BDB from writing to the current log file set, does not stop BDB from - * continuing to run concurrently while the backup is running, it simply moves onto a new set of log files; this - * provides a 'hot' backup facility. - * - * <p/>DbBackup can also help with incremental backups, by providing the number of the last log file backed up. - * Subsequent backups can be taken, from later log files only. In a messaging application, messages are not expected to - * be long-lived in most cases, so the log files will usually have been completely turned over between backups. This - * utility does not support incremental backups for this reason. - * - * <p/>If the database is locked by BDB, as is required when using transactions, and therefore will always be the case - * in Qpid, this utility cannot make use of the DbBackup utility in a seperate process. DbBackup, needs to ensure that - * the BDB envinronment used to take the backup has exclusive write access to the log files. This utility can take a - * backup as a standalone utility against log files, when a broker is not running, using the {@link #takeBackup(String, - *String,com.sleepycat.je.Environment)} method. - * - * <p/>A seperate backup machanism is provided by the {@link #takeBackupNoLock(String,String)} method which can take a - * hot backup against a running broker. This works by finding out the set of files to copy, and then opening them all to - * read, and repeating this process until a consistent set of open files is obtained. This is done to avoid the - * situation where the BDB cleanup thread deletes a file, between the directory listing and opening of the file to copy. - * All consistently opened files are copied. This is the default mechanism the the {@link #main} method of this utility - * uses. - * - * <p/><table id="crc><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Hot copy all - * BDB log files from one directory to another. </table> - */ -public class BDBBackup -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(BDBBackup.class); - - /** Used for communicating with the user. */ - private static final Logger console = Logger.getLogger("Console"); - - /** Defines the suffix used to identify BDB log files. */ - private static final String LOG_FILE_SUFFIX = ".jdb"; - - /** Defines the command line format for this utility. */ - public static final String[][] COMMAND_LINE_SPEC = - new String[][] - { - { "fromdir", "The path to the directory to back the bdb log file from.", "dir", "true" }, - { "todir", "The path to the directory to save the backed up bdb log files to.", "dir", "true" } - }; - - /** Defines the timeout to terminate the backup operation on if it fails to complete. One minte. */ - public static final long TIMEOUT = 60000; - - /** - * Runs a backup of the BDB log files in a specified directory, copying the backed up files to another specified - * directory. - * - * <p/>The following arguments must be specified: - * - * <p/><table><caption>Command Line</caption> <tr><th> Option <th> Comment <tr><td> -fromdir <td> The path to the - * directory to back the bdb log file from. <tr><td> -todir <td> The path to the directory to save the backed up - * bdb log files to. </table> - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - // Process the command line using standard handling (errors and usage followed by System.exit when it is wrong). - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(COMMAND_LINE_SPEC), System.getProperties()); - - // Extract the from and to directory locations and perform a backup between them. - try - { - String fromDir = options.getProperty("fromdir"); - String toDir = options.getProperty("todir"); - - log.info("BDBBackup Utility: Starting Hot Backup."); - - BDBBackup bdbBackup = new BDBBackup(); - String[] backedUpFiles = bdbBackup.takeBackupNoLock(fromDir, toDir); - - if (log.isInfoEnabled()) - { - log.info("BDBBackup Utility: Hot Backup Completed. Files backed up: " + backedUpFiles); - } - } - catch (Exception e) - { - console.info("Backup script encountered an error and has failed: " + e.getMessage()); - log.error("Backup script got exception: " + e.getMessage(), e); - System.exit(1); - } - } - - /** - * Creates a backup of the BDB log files in the source directory, copying them to the destination directory. - * - * @param fromdir The source directory path. - * @param todir The destination directory path. - * @param environment An open BDB environment to perform the back up. - * - * @throws DatabaseException Any underlying execeptions from BDB are allowed to fall through. - */ - public void takeBackup(String fromdir, String todir, Environment environment) throws DatabaseException - { - DbBackup backupHelper = null; - - try - { - backupHelper = new DbBackup(environment); - - // Prevent BDB from writing to its log files while the backup it taken. - backupHelper.startBackup(); - - // Back up the BDB log files to the destination directory. - String[] filesForBackup = backupHelper.getLogFilesInBackupSet(); - - for (int i = 0; i < filesForBackup.length; i++) - { - File sourceFile = new File(fromdir + File.separator + filesForBackup[i]); - File destFile = new File(todir + File.separator + filesForBackup[i]); - FileUtils.copy(sourceFile, destFile); - } - } - finally - { - // Remember to exit backup mode, or all log files won't be cleaned and disk usage will bloat. - if (backupHelper != null) - { - backupHelper.endBackup(); - } - } - } - - /** - * Takes a hot backup when another process has locked the BDB database. - * - * @param fromdir The source directory path. - * @param todir The destination directory path. - * - * @return A list of all of the names of the files succesfully backed up. - */ - public String[] takeBackupNoLock(String fromdir, String todir) - { - if (log.isDebugEnabled()) - { - log.debug("public void takeBackupNoLock(String fromdir = " + fromdir + ", String todir = " + todir - + "): called"); - } - - File fromDirFile = new File(fromdir); - - if (!fromDirFile.isDirectory()) - { - throw new IllegalArgumentException("The specified fromdir(" + fromdir - + ") must be the directory containing your bdbstore."); - } - - File toDirFile = new File(todir); - - if (!toDirFile.exists()) - { - // Create directory if it doesn't exist - toDirFile.mkdirs(); - - if (log.isDebugEnabled()) - { - log.debug("Created backup directory:" + toDirFile); - } - } - - if (!toDirFile.isDirectory()) - { - throw new IllegalArgumentException("The specified todir(" + todir + ") must be a directory."); - } - - // Repeat until manage to open consistent set of files for reading. - boolean consistentSet = false; - FileInputStream[] fileInputStreams = new FileInputStream[0]; - File[] fileSet = new File[0]; - long start = System.currentTimeMillis(); - - while (!consistentSet) - { - // List all .jdb files in the directory. - fileSet = fromDirFile.listFiles(new FilenameFilter() - { - public boolean accept(File dir, String name) - { - return name.endsWith(LOG_FILE_SUFFIX); - } - }); - - // Open them all for reading. - fileInputStreams = new FileInputStream[fileSet.length]; - - if (fileSet.length == 0) - { - throw new RuntimeException("There are no BDB log files to backup in the " + fromdir + " directory."); - } - - for (int i = 0; i < fileSet.length; i++) - { - try - { - fileInputStreams[i] = new FileInputStream(fileSet[i]); - } - catch (FileNotFoundException e) - { - // Close any files opened for reading so far. - for (int j = 0; j < i; j++) - { - if (fileInputStreams[j] != null) - { - try - { - fileInputStreams[j].close(); - } - catch (IOException ioEx) - { - // Rethrow this as a runtime exception, as something strange has happened. - throw new RuntimeException(ioEx); - } - } - } - - // Could not open a consistent file set so try again. - break; - } - - // A consistent set has been opened if all files were sucesfully opened for reading. - if (i == (fileSet.length - 1)) - { - consistentSet = true; - } - } - - // Check that the script has not timed out, and raise an error if it has. - long now = System.currentTimeMillis(); - if ((now - start) > TIMEOUT) - { - throw new RuntimeException("Hot backup script failed to complete in " + (TIMEOUT / 1000) + " seconds."); - } - } - - // Copy the consistent set of open files. - List<String> backedUpFileNames = new LinkedList<String>(); - - for (int j = 0; j < fileSet.length; j++) - { - File destFile = new File(todir + File.separator + fileSet[j].getName()); - try - { - FileUtils.copy(fileSet[j], destFile); - } - catch (RuntimeException re) - { - Throwable cause = re.getCause(); - if ((cause != null) && (cause instanceof IOException)) - { - throw new RuntimeException(re.getMessage() + " fromDir:" + fromdir + " toDir:" + toDirFile, cause); - } - else - { - throw re; - } - } - - backedUpFileNames.add(destFile.getName()); - - // Close all of the files. - try - { - fileInputStreams[j].close(); - } - catch (IOException e) - { - // Rethrow this as a runtime exception, as something strange has happened. - throw new RuntimeException(e); - } - } - - return backedUpFileNames.toArray(new String[backedUpFileNames.size()]); - } - - /* - * Creates an environment for the bdb log files in the specified directory. This envrinonment can only be used - * to backup these files, if they are not locked by another database instance. - * - * @param fromdir The path to the directory to create the environment for. - * - * @throws DatabaseException Any underlying exceptions from BDB are allowed to fall through. - */ - private Environment createSourceDirEnvironment(String fromdir) throws DatabaseException - { - // Initialize the BDB backup utility on the source directory. - return new Environment(new File(fromdir), new EnvironmentConfig()); - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java deleted file mode 100644 index f900159808..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ /dev/null @@ -1,2124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.lang.ref.SoftReference; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ConfigStoreMessages; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; -import org.apache.qpid.server.logging.messages.TransactionLogMessages; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoredMemoryMessage; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; -import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; -import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; -import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; -import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory; -import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5; -import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory; -import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB; -import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory; - -import com.sleepycat.bind.EntryBinding; -import com.sleepycat.bind.tuple.ByteBinding; -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.je.CheckpointConfig; -import com.sleepycat.je.Cursor; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.LockMode; -import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.TransactionConfig; - -/** - * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log. - * - * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept - * transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove - * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and - * dequeue messages to queues. <tr><td> Generate message identifiers. </table> - */ -@SuppressWarnings({"unchecked"}) -public class BDBMessageStore implements MessageStore -{ - private static final Logger _log = Logger.getLogger(BDBMessageStore.class); - - static final int DATABASE_FORMAT_VERSION = 5; - private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version"; - public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; - - private Environment _environment; - - private String MESSAGEMETADATADB_NAME = "messageMetaDataDb"; - private String MESSAGECONTENTDB_NAME = "messageContentDb"; - private String QUEUEBINDINGSDB_NAME = "queueBindingsDb"; - private String DELIVERYDB_NAME = "deliveryDb"; - private String EXCHANGEDB_NAME = "exchangeDb"; - private String QUEUEDB_NAME = "queueDb"; - private Database _messageMetaDataDb; - private Database _messageContentDb; - private Database _queueBindingsDb; - private Database _deliveryDb; - private Database _exchangeDb; - private Database _queueDb; - - /* ======= - * Schema: - * ======= - * - * Queue: - * name(AMQShortString) - name(AMQShortString), owner(AMQShortString), - * arguments(FieldTable encoded as binary), exclusive (boolean) - * - * Exchange: - * name(AMQShortString) - name(AMQShortString), typeName(AMQShortString), autodelete (boolean) - * - * Binding: - * exchangeName(AMQShortString), queueName(AMQShortString), routingKey(AMQShortString), - * arguments (FieldTable encoded as binary) - 0 (zero) - * - * QueueEntry: - * queueName(AMQShortString), messageId (long) - 0 (zero) - * - * Message (MetaData): - * messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary) - * - * Message (Content): - * messageId (long), byteOffset (integer) - dataLength(integer), data(binary); - */ - - private LogSubject _logSubject; - - private final AtomicLong _messageId = new AtomicLong(0); - - private final CommitThread _commitThread = new CommitThread("Commit-Thread"); - - // Factory Classes to create the TupleBinding objects that reflect the version instance of this BDBStore - private MessageMetaDataTupleBindingFactory _metaDataTupleBindingFactory; - private QueueTupleBindingFactory _queueTupleBindingFactory; - private BindingTupleBindingFactory _bindingTupleBindingFactory; - - /** The data version this store should run with */ - private int _version; - private enum State - { - INITIAL, - CONFIGURING, - CONFIGURED, - RECOVERING, - STARTED, - CLOSING, - CLOSED - } - - private State _state = State.INITIAL; - - private TransactionConfig _transactionConfig = new TransactionConfig(); - - private boolean _readOnly = false; - - private boolean _configured; - - - public BDBMessageStore() - { - this(DATABASE_FORMAT_VERSION); - } - - public BDBMessageStore(int version) - { - _version = version; - } - - private void setDatabaseNames(int version) - { - if (version > 1) - { - MESSAGEMETADATADB_NAME += "_v" + version; - - MESSAGECONTENTDB_NAME += "_v" + version; - - QUEUEDB_NAME += "_v" + version; - - DELIVERYDB_NAME += "_v" + version; - - EXCHANGEDB_NAME += "_v" + version; - - QUEUEBINDINGSDB_NAME += "_v" + version; - } - } - - public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception - { - _logSubject = logSubject; - CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName())); - - if(_configured) - { - throw new Exception("ConfigStore already configured"); - } - - configure(name,storeConfiguration); - - _configured = true; - stateTransition(State.CONFIGURING, State.CONFIGURED); - - recover(recoveryHandler); - stateTransition(State.RECOVERING, State.STARTED); - } - - public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception - { - CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); - - if(!_configured) - { - throw new Exception("ConfigStore not configured"); - } - - recoverMessages(recoveryHandler); - } - - public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, LogSubject logSubject) throws Exception - { - CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName())); - - if(!_configured) - { - throw new Exception("ConfigStore not configured"); - } - - recoverQueueEntries(recoveryHandler); - - - } - - public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction() - { - return new BDBTransaction(); - } - - - /** - * Called after instantiation in order to configure the message store. - * - * @param name The name of the virtual host using this store - * @return whether a new store environment was created or not (to indicate whether recovery is necessary) - * - * @throws Exception If any error occurs that means the store is unable to configure itself. - */ - public boolean configure(String name, Configuration storeConfig) throws Exception - { - File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY, - System.getProperty("QPID_WORK") + "/bdbstore/" + name)); - if (!environmentPath.exists()) - { - if (!environmentPath.mkdirs()) - { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); - } - } - - CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath())); - - _version = storeConfig.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION); - - return configure(environmentPath, false); - } - - /** - * @param environmentPath location for the store to be created in/recovered from - * @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists - * @return whether or not a new store environment was created - * @throws AMQStoreException - * @throws DatabaseException - */ - protected boolean configure(File environmentPath, boolean readonly) throws AMQStoreException, DatabaseException - { - _readOnly = readonly; - stateTransition(State.INITIAL, State.CONFIGURING); - - _log.info("Configuring BDB message store"); - - createTupleBindingFactories(_version); - - setDatabaseNames(_version); - - return setupStore(environmentPath, readonly); - } - - private void createTupleBindingFactories(int version) - { - _bindingTupleBindingFactory = new BindingTupleBindingFactory(version); - _queueTupleBindingFactory = new QueueTupleBindingFactory(version); - _metaDataTupleBindingFactory = new MessageMetaDataTupleBindingFactory(version); - } - - /** - * Move the store state from CONFIGURING to STARTED. - * - * This is required if you do not want to perform recovery of the store data - * - * @throws AMQStoreException if the store is not in the correct state - */ - public void start() throws AMQStoreException - { - stateTransition(State.CONFIGURING, State.STARTED); - } - - private boolean setupStore(File storePath, boolean readonly) throws DatabaseException, AMQStoreException - { - checkState(State.CONFIGURING); - - boolean newEnvironment = createEnvironment(storePath, readonly); - - verifyVersionByTables(); - - openDatabases(readonly); - - if (!readonly) - { - _commitThread.start(); - } - - return newEnvironment; - } - - private void verifyVersionByTables() throws DatabaseException - { - for (String s : _environment.getDatabaseNames()) - { - int versionIndex = s.indexOf("_v"); - - // lack of _v index suggests DB is v1 - // so if _version is not v1 then error - if (versionIndex == -1) - { - if (_version != 1) - { - closeEnvironment(); - throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version - + ". Store on disk contains version 1 data."); - } - else // DB is v1 and _version is v1 - { - continue; - } - } - - // Otherwise Check Versions - int version = Integer.parseInt(s.substring(versionIndex + 2)); - - if (version != _version) - { - closeEnvironment(); - throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version - + ". Store on disk contains version " + version + " data."); - } - } - } - - private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException - { - if (_state != requiredState) - { - throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState - + "; currently in state: " + _state); - } - - _state = newState; - } - - private void checkState(State requiredState) throws AMQStoreException - { - if (_state != requiredState) - { - throw new AMQStoreException("Unexpected state: " + _state + "; required state: " + requiredState); - } - } - - private boolean createEnvironment(File environmentPath, boolean readonly) throws DatabaseException - { - _log.info("BDB message store using environment path " + environmentPath.getAbsolutePath()); - EnvironmentConfig envConfig = new EnvironmentConfig(); - // This is what allows the creation of the store if it does not already exist. - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - envConfig.setConfigParam("je.lock.nLockTables", "7"); - - // Restore 500,000 default timeout. - //envConfig.setLockTimeout(15000); - - // Added to help diagnosis of Deadlock issue - // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23 - if (Boolean.getBoolean("qpid.bdb.lock.debug")) - { - envConfig.setConfigParam("je.txn.deadlockStackTrace", "true"); - envConfig.setConfigParam("je.txn.dumpLocks", "true"); - } - - // Set transaction mode - _transactionConfig.setReadCommitted(true); - - //This prevents background threads running which will potentially update the store. - envConfig.setReadOnly(readonly); - try - { - _environment = new Environment(environmentPath, envConfig); - return false; - } - catch (DatabaseException de) - { - if (de.getMessage().contains("Environment.setAllowCreate is false")) - { - //Allow the creation this time - envConfig.setAllowCreate(true); - if (_environment != null ) - { - _environment.cleanLog(); - _environment.close(); - } - _environment = new Environment(environmentPath, envConfig); - - return true; - } - else - { - throw de; - } - } - } - - private void openDatabases(boolean readonly) throws DatabaseException - { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - - //This is required if we are wanting read only access. - dbConfig.setReadOnly(readonly); - - _messageMetaDataDb = _environment.openDatabase(null, MESSAGEMETADATADB_NAME, dbConfig); - _queueDb = _environment.openDatabase(null, QUEUEDB_NAME, dbConfig); - _exchangeDb = _environment.openDatabase(null, EXCHANGEDB_NAME, dbConfig); - _queueBindingsDb = _environment.openDatabase(null, QUEUEBINDINGSDB_NAME, dbConfig); - _messageContentDb = _environment.openDatabase(null, MESSAGECONTENTDB_NAME, dbConfig); - _deliveryDb = _environment.openDatabase(null, DELIVERYDB_NAME, dbConfig); - - } - - /** - * Called to close and cleanup any resources used by the message store. - * - * @throws Exception If the close fails. - */ - public void close() throws Exception - { - if (_state != State.STARTED) - { - return; - } - - _state = State.CLOSING; - - _commitThread.close(); - _commitThread.join(); - - if (_messageMetaDataDb != null) - { - _log.info("Closing message metadata database"); - _messageMetaDataDb.close(); - } - - if (_messageContentDb != null) - { - _log.info("Closing message content database"); - _messageContentDb.close(); - } - - if (_exchangeDb != null) - { - _log.info("Closing exchange database"); - _exchangeDb.close(); - } - - if (_queueBindingsDb != null) - { - _log.info("Closing bindings database"); - _queueBindingsDb.close(); - } - - if (_queueDb != null) - { - _log.info("Closing queue database"); - _queueDb.close(); - } - - if (_deliveryDb != null) - { - _log.info("Close delivery database"); - _deliveryDb.close(); - } - - closeEnvironment(); - - _state = State.CLOSED; - - CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); - } - - private void closeEnvironment() throws DatabaseException - { - if (_environment != null) - { - if(!_readOnly) - { - // Clean the log before closing. This makes sure it doesn't contain - // redundant data. Closing without doing this means the cleaner may not - // get a chance to finish. - _environment.cleanLog(); - } - _environment.close(); - } - } - - - public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQStoreException - { - stateTransition(State.CONFIGURED, State.RECOVERING); - - CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START()); - - try - { - QueueRecoveryHandler qrh = recoveryHandler.begin(this); - loadQueues(qrh); - - ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); - loadExchanges(erh); - - BindingRecoveryHandler brh = erh.completeExchangeRecovery(); - recoverBindings(brh); - - brh.completeBindingRecovery(); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); - } - - } - - private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException - { - Cursor cursor = null; - - try - { - cursor = _queueDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - TupleBinding binding = _queueTupleBindingFactory.getInstance(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value); - - String queueName = queueRecord.getNameShortString() == null ? null : - queueRecord.getNameShortString().asString(); - String owner = queueRecord.getOwner() == null ? null : - queueRecord.getOwner().asString(); - boolean exclusive = queueRecord.isExclusive(); - - FieldTable arguments = queueRecord.getArguments(); - - qrh.queue(queueName, owner, exclusive, arguments); - } - - } - finally - { - if (cursor != null) - { - cursor.close(); - } - } - } - - - private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException - { - Cursor cursor = null; - - try - { - cursor = _exchangeDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - TupleBinding binding = new ExchangeTB(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - ExchangeRecord exchangeRec = (ExchangeRecord) binding.entryToObject(value); - - String exchangeName = exchangeRec.getNameShortString() == null ? null : - exchangeRec.getNameShortString().asString(); - String type = exchangeRec.getType() == null ? null : - exchangeRec.getType().asString(); - boolean autoDelete = exchangeRec.isAutoDelete(); - - erh.exchange(exchangeName, type, autoDelete); - } - } - finally - { - if (cursor != null) - { - cursor.close(); - } - } - - } - - private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException - { - Cursor cursor = null; - try - { - cursor = _queueBindingsDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - TupleBinding binding = _bindingTupleBindingFactory.getInstance(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - //yes, this is retrieving all the useful information from the key only. - //For table compatibility it shall currently be left as is - BindingKey bindingRecord = (BindingKey) binding.entryToObject(key); - - String exchangeName = bindingRecord.getExchangeName() == null ? null : - bindingRecord.getExchangeName().asString(); - String queueName = bindingRecord.getQueueName() == null ? null : - bindingRecord.getQueueName().asString(); - String routingKey = bindingRecord.getRoutingKey() == null ? null : - bindingRecord.getRoutingKey().asString(); - ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null : - java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes())); - - brh.binding(exchangeName, queueName, routingKey, argumentsBB); - } - } - finally - { - if (cursor != null) - { - cursor.close(); - } - } - - } - - private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException - { - StoredMessageRecoveryHandler mrh = msrh.begin(); - - Cursor cursor = null; - try - { - cursor = _messageMetaDataDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);; - - DatabaseEntry value = new DatabaseEntry(); - EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance(); - - long maxId = 0; - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - long messageId = (Long) keyBinding.entryToObject(key); - StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value); - - StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false); - mrh.message(message); - - maxId = Math.max(maxId, messageId); - } - - _messageId.set(maxId); - } - catch (DatabaseException e) - { - _log.error("Database Error: " + e.getMessage(), e); - throw e; - } - finally - { - if (cursor != null) - { - cursor.close(); - } - } - } - - private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) - throws DatabaseException - { - QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this); - - ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); - - Cursor cursor = null; - try - { - cursor = _deliveryDb.openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = new QueueEntryTB(); - - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - QueueEntryKey qek = (QueueEntryKey) keyBinding.entryToObject(key); - - entries.add(qek); - } - - try - { - cursor.close(); - } - finally - { - cursor = null; - } - - for(QueueEntryKey entry : entries) - { - AMQShortString queueName = entry.getQueueName(); - long messageId = entry.getMessageId(); - - qerh.queueEntry(queueName.asString(),messageId); - } - } - catch (DatabaseException e) - { - _log.error("Database Error: " + e.getMessage(), e); - throw e; - } - finally - { - if (cursor != null) - { - cursor.close(); - } - } - - qerh.completeQueueEntryRecovery(); - } - - /** - * Removes the specified message from the store. - * - * @param messageId Identifies the message to remove. - * - * @throws AMQInternalException If the operation fails for any reason. - */ - public void removeMessage(Long messageId) throws AMQStoreException - { - // _log.debug("public void removeMessage(Long messageId = " + messageId): called"); - - com.sleepycat.je.Transaction tx = null; - - Cursor cursor = null; - try - { - tx = _environment.beginTransaction(null, null); - - //remove the message meta data from the store - DatabaseEntry key = new DatabaseEntry(); - EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class); - metaKeyBindingTuple.objectToEntry(messageId, key); - - if (_log.isDebugEnabled()) - { - _log.debug("Removing message id " + messageId); - } - - - OperationStatus status = _messageMetaDataDb.delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - tx.abort(); - - throw new AMQStoreException("Message metadata not found for message id " + messageId); - } - - if (_log.isDebugEnabled()) - { - _log.debug("Deleted metadata for message " + messageId); - } - - //now remove the content data from the store if there is any. - - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - MessageContentKey_5 mck = new MessageContentKey_5(messageId,0); - - TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); - contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); - - //Use a partial record for the value to prevent retrieving the - //data itself as we only need the key to identify what to remove. - DatabaseEntry value = new DatabaseEntry(); - value.setPartial(0, 0, true); - - cursor = _messageContentDb.openCursor(tx, null); - - status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW); - while (status == OperationStatus.SUCCESS) - { - mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); - - if(mck.getMessageId() != messageId) - { - //we have exhausted all chunks for this message id, break - break; - } - else - { - status = cursor.delete(); - - if(status == OperationStatus.NOTFOUND) - { - cursor.close(); - cursor = null; - - tx.abort(); - throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId); - } - - if (_log.isDebugEnabled()) - { - _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId); - } - } - - status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); - } - - cursor.close(); - cursor = null; - - commit(tx, true); - } - catch (DatabaseException e) - { - e.printStackTrace(); - - if (tx != null) - { - try - { - if(cursor != null) - { - cursor.close(); - cursor = null; - } - - tx.abort(); - } - catch (DatabaseException e1) - { - throw new AMQStoreException("Error aborting transaction " + e1, e1); - } - } - - throw new AMQStoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e); - } - finally - { - if(cursor != null) - { - try - { - cursor.close(); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e); - } - } - } - } - - /** - * @see DurableConfigurationStore#createExchange(Exchange) - */ - public void createExchange(Exchange exchange) throws AMQStoreException - { - if (_state != State.RECOVERING) - { - ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(), - exchange.getTypeShortString(), exchange.isAutoDelete()); - - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = new AMQShortStringTB(); - keyBinding.objectToEntry(exchange.getNameShortString(), key); - - DatabaseEntry value = new DatabaseEntry(); - TupleBinding exchangeBinding = new ExchangeTB(); - exchangeBinding.objectToEntry(exchangeRec, value); - - try - { - _exchangeDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing Exchange with name " + exchange.getName() + " to database: " + e.getMessage(), e); - } - } - } - - /** - * @see DurableConfigurationStore#removeExchange(Exchange) - */ - public void removeExchange(Exchange exchange) throws AMQStoreException - { - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = new AMQShortStringTB(); - keyBinding.objectToEntry(exchange.getNameShortString(), key); - try - { - OperationStatus status = _exchangeDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Exchange " + exchange.getName() + " not found"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing deleting with name " + exchange.getName() + " from database: " + e.getMessage(), e); - } - } - - - - - /** - * @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable) - */ - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException - { - // _log.debug("public void bindQueue(Exchange exchange = " + exchange + ", AMQShortString routingKey = " + routingKey - // + ", AMQQueue queue = " + queue + ", FieldTable args = " + args + "): called"); - - if (_state != State.RECOVERING) - { - BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(), - queue.getNameShortString(), routingKey, args); - - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance(); - - keyBinding.objectToEntry(bindingRecord, key); - - //yes, this is writing out 0 as a value and putting all the - //useful info into the key, don't ask me why. For table - //compatibility it shall currently be left as is - DatabaseEntry value = new DatabaseEntry(); - ByteBinding.byteToEntry((byte) 0, value); - - try - { - _queueBindingsDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange " - + exchange.getName() + " to database: " + e.getMessage(), e); - } - } - } - - /** - * @see DurableConfigurationStore#unbindQueue(Exchange, AMQShortString, AMQQueue, FieldTable) - */ - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) - throws AMQStoreException - { - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance(); - keyBinding.objectToEntry(new BindingKey(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key); - - try - { - OperationStatus status = _queueBindingsDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Queue binding for queue with name " + queue.getName() + " to exchange " - + exchange.getName() + " not found"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange " - + exchange.getName() + " from database: " + e.getMessage(), e); - } - } - - /** - * @see DurableConfigurationStore#createQueue(AMQQueue) - */ - public void createQueue(AMQQueue queue) throws AMQStoreException - { - createQueue(queue, null); - } - - /** - * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable) - */ - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException - { - if (_log.isDebugEnabled()) - { - _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called"); - } - - QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(), - queue.getOwner(), queue.isExclusive(), arguments); - - createQueue(queueRecord); - } - - /** - * Makes the specified queue persistent. - * - * Only intended for direct use during store upgrades. - * - * @param queueRecord Details of the queue to store. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - protected void createQueue(QueueRecord queueRecord) throws AMQStoreException - { - if (_state != State.RECOVERING) - { - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = new AMQShortStringTB(); - keyBinding.objectToEntry(queueRecord.getNameShortString(), key); - - DatabaseEntry value = new DatabaseEntry(); - TupleBinding queueBinding = _queueTupleBindingFactory.getInstance(); - - queueBinding.objectToEntry(queueRecord, value); - try - { - _queueDb.put(null, key, value); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing AMQQueue with name " + queueRecord.getNameShortString().asString() - + " to database: " + e.getMessage(), e); - } - } - } - - /** - * Updates the specified queue in the persistent store, IF it is already present. If the queue - * is not present in the store, it will not be added. - * - * NOTE: Currently only updates the exclusivity. - * - * @param queue The queue to update the entry for. - * @throws AMQStoreException If the operation fails for any reason. - */ - public void updateQueue(final AMQQueue queue) throws AMQStoreException - { - if (_log.isDebugEnabled()) - { - _log.debug("Updating queue: " + queue.getName()); - } - - try - { - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = new AMQShortStringTB(); - keyBinding.objectToEntry(queue.getNameShortString(), key); - - DatabaseEntry value = new DatabaseEntry(); - DatabaseEntry newValue = new DatabaseEntry(); - TupleBinding queueBinding = _queueTupleBindingFactory.getInstance(); - - OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT); - if(status == OperationStatus.SUCCESS) - { - //read the existing record and apply the new exclusivity setting - QueueRecord queueRecord = (QueueRecord) queueBinding.entryToObject(value); - queueRecord.setExclusive(queue.isExclusive()); - - //write the updated entry to the store - queueBinding.objectToEntry(queueRecord, newValue); - - _queueDb.put(null, key, newValue); - } - else if(status != OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Error updating queue details within the store: " + status); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error updating queue details within the store: " + e,e); - } - } - - /** - * Removes the specified queue from the persistent store. - * - * @param queue The queue to remove. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - public void removeQueue(final AMQQueue queue) throws AMQStoreException - { - AMQShortString name = queue.getNameShortString(); - - if (_log.isDebugEnabled()) - { - _log.debug("public void removeQueue(AMQShortString name = " + name + "): called"); - } - - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = new AMQShortStringTB(); - keyBinding.objectToEntry(name, key); - try - { - OperationStatus status = _queueDb.delete(null, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Queue " + name + " not found"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing deleting with name " + name + " from database: " + e.getMessage(), e); - } - } - - /** - * Places a message onto a specified queue, in a given transaction. - * - * @param tx The transaction for the operation. - * @param queue The the queue to place the message on. - * @param messageId The message to enqueue. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException - { - // _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called"); - - AMQShortString name = new AMQShortString(queue.getResourceName()); - - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = new QueueEntryTB(); - QueueEntryKey dd = new QueueEntryKey(name, messageId); - keyBinding.objectToEntry(dd, key); - DatabaseEntry value = new DatabaseEntry(); - ByteBinding.byteToEntry((byte) 0, value); - - try - { - if (_log.isDebugEnabled()) - { - _log.debug("Enqueuing message " + messageId + " on queue " + name + " [Transaction" + tx + "]"); - } - _deliveryDb.put(tx, key, value); - } - catch (DatabaseException e) - { - _log.error("Failed to enqueue: " + e.getMessage(), e); - throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name - + " to database", e); - } - } - - /** - * Extracts a message from a specified queue, in a given transaction. - * - * @param tx The transaction for the operation. - * @param queue The name queue to take the message from. - * @param messageId The message to dequeue. - * - * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. - */ - public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException - { - AMQShortString name = new AMQShortString(queue.getResourceName()); - - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = new QueueEntryTB(); - QueueEntryKey dd = new QueueEntryKey(name, messageId); - - keyBinding.objectToEntry(dd, key); - - if (_log.isDebugEnabled()) - { - _log.debug("Dequeue message id " + messageId); - } - - try - { - - OperationStatus status = _deliveryDb.delete(tx, key); - if (status == OperationStatus.NOTFOUND) - { - throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name); - } - else if (status != OperationStatus.SUCCESS) - { - throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name); - } - - if (_log.isDebugEnabled()) - { - _log.debug("Removed message " + messageId + ", " + name + " from delivery db"); - - } - } - catch (DatabaseException e) - { - - _log.error("Failed to dequeue message " + messageId + ": " + e.getMessage(), e); - _log.error(tx); - - throw new AMQStoreException("Error accessing database while dequeuing message: " + e.getMessage(), e); - } - } - - /** - * Commits all operations performed within a given transaction. - * - * @param tx The transaction to commit all operations for. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException - { - //if (_log.isDebugEnabled()) - //{ - // _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")"); - //} - - if (tx == null) - { - throw new AMQStoreException("Fatal internal error: transactional is null at commitTran"); - } - - StoreFuture result; - try - { - result = commit(tx, syncCommit); - - if (_log.isDebugEnabled()) - { - _log.debug("commitTranImpl completed for [Transaction:" + tx + "]"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error commit tx: " + e.getMessage(), e); - } - - return result; - } - - /** - * Abandons all operations performed within a given transaction. - * - * @param tx The transaction to abandon. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - public void abortTran(final com.sleepycat.je.Transaction tx) throws AMQStoreException - { - if (_log.isDebugEnabled()) - { - _log.debug("abortTran called for [Transaction:" + tx + "]"); - } - - try - { - tx.abort(); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e); - } - } - - /** - * Primarily for testing purposes. - * - * @param queueName - * - * @return a list of message ids for messages enqueued for a particular queue - */ - List<Long> getEnqueuedMessages(AMQShortString queueName) throws AMQStoreException - { - Cursor cursor = null; - try - { - cursor = _deliveryDb.openCursor(null, null); - - DatabaseEntry key = new DatabaseEntry(); - - QueueEntryKey dd = new QueueEntryKey(queueName, 0); - - EntryBinding keyBinding = new QueueEntryTB(); - keyBinding.objectToEntry(dd, key); - - DatabaseEntry value = new DatabaseEntry(); - - LinkedList<Long> messageIds = new LinkedList<Long>(); - - OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT); - dd = (QueueEntryKey) keyBinding.entryToObject(key); - - while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName)) - { - - messageIds.add(dd.getMessageId()); - status = cursor.getNext(key, value, LockMode.DEFAULT); - if (status == OperationStatus.SUCCESS) - { - dd = (QueueEntryKey) keyBinding.entryToObject(key); - } - } - - return messageIds; - } - catch (DatabaseException e) - { - throw new AMQStoreException("Database error: " + e.getMessage(), e); - } - finally - { - if (cursor != null) - { - try - { - cursor.close(); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e); - } - } - } - } - - /** - * Return a valid, currently unused message id. - * - * @return A fresh message id. - */ - public Long getNewMessageId() - { - return _messageId.incrementAndGet(); - } - - /** - * Stores a chunk of message data. - * - * @param tx The transaction for the operation. - * @param messageId The message to store the data for. - * @param offset The offset of the data chunk in the message. - * @param contentBody The content of the data chunk. - * - * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. - */ - protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset, - ByteBuffer contentBody) throws AMQStoreException - { - DatabaseEntry key = new DatabaseEntry(); - TupleBinding<MessageContentKey> keyBinding = new MessageContentKeyTB_5(); - keyBinding.objectToEntry(new MessageContentKey_5(messageId, offset), key); - DatabaseEntry value = new DatabaseEntry(); - TupleBinding<ByteBuffer> messageBinding = new ContentTB(); - messageBinding.objectToEntry(contentBody, value); - try - { - OperationStatus status = _messageContentDb.put(tx, key, value); - if (status != OperationStatus.SUCCESS) - { - throw new AMQStoreException("Error adding content chunk offset" + offset + " for message id " + messageId + ": " - + status); - } - - if (_log.isDebugEnabled()) - { - _log.debug("Storing content chunk offset" + offset + " for message " + messageId + "[Transaction" + tx + "]"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - /** - * Stores message meta-data. - * - * @param tx The transaction for the operation. - * @param messageId The message to store the data for. - * @param messageMetaData The message meta data to store. - * - * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. - */ - private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData) - throws AMQStoreException - { - if (_log.isDebugEnabled()) - { - _log.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = " - + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called"); - } - - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class); - keyBinding.objectToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - - TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance(); - messageBinding.objectToEntry(messageMetaData, value); - try - { - _messageMetaDataDb.put(tx, key, value); - if (_log.isDebugEnabled()) - { - _log.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]"); - } - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e); - } - } - - /** - * Retrieves message meta-data. - * - * @param messageId The message to get the meta-data for. - * - * @return The message meta data. - * - * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. - */ - public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQStoreException - { - if (_log.isDebugEnabled()) - { - _log.debug("public MessageMetaData getMessageMetaData(Long messageId = " - + messageId + "): called"); - } - - DatabaseEntry key = new DatabaseEntry(); - EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class); - keyBinding.objectToEntry(messageId, key); - DatabaseEntry value = new DatabaseEntry(); - TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance(); - - try - { - OperationStatus status = _messageMetaDataDb.get(null, key, value, LockMode.READ_UNCOMMITTED); - if (status != OperationStatus.SUCCESS) - { - throw new AMQStoreException("Metadata not found for message with id " + messageId); - } - - StorableMessageMetaData mdd = (StorableMessageMetaData) messageBinding.entryToObject(value); - - return mdd; - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e); - } - } - - /** - * Fills the provided ByteBuffer with as much content for the specified message as possible, starting - * from the specified offset in the message. - * - * @param messageId The message to get the data for. - * @param offset The offset of the data within the message. - * @param dst The destination of the content read back - * - * @return The number of bytes inserted into the destination - * - * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. - */ - public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQStoreException - { - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - - //Start from 0 offset and search for the starting chunk. - MessageContentKey_5 mck = new MessageContentKey_5(messageId, 0); - TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5(); - contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); - DatabaseEntry value = new DatabaseEntry(); - TupleBinding<ByteBuffer> contentTupleBinding = new ContentTB(); - - if (_log.isDebugEnabled()) - { - _log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset); - } - - int written = 0; - int seenSoFar = 0; - - Cursor cursor = null; - try - { - cursor = _messageContentDb.openCursor(null, null); - - OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED); - - while (status == OperationStatus.SUCCESS) - { - mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); - long id = mck.getMessageId(); - - if(id != messageId) - { - //we have exhausted all chunks for this message id, break - break; - } - - int offsetInMessage = mck.getOffset(); - ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value); - - final int size = (int) buf.limit(); - - seenSoFar += size; - - if(seenSoFar >= offset) - { - byte[] dataAsBytes = buf.array(); - - int posInArray = offset + written - offsetInMessage; - int count = size - posInArray; - if(count > dst.remaining()) - { - count = dst.remaining(); - } - dst.put(dataAsBytes,posInArray,count); - written+=count; - - if(dst.remaining() == 0) - { - break; - } - } - - status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); - } - - return written; - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); - } - finally - { - if(cursor != null) - { - try - { - cursor.close(); - } - catch (DatabaseException e) - { - throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e); - } - } - } - } - - public boolean isPersistent() - { - return true; - } - - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) - { - if(metaData.isPersistent()) - { - return new StoredBDBMessage(getNewMessageId(), metaData); - } - else - { - return new StoredMemoryMessage(getNewMessageId(), metaData); - } - } - - - //protected getters for the TupleBindingFactories - - protected QueueTupleBindingFactory getQueueTupleBindingFactory() - { - return _queueTupleBindingFactory; - } - - protected BindingTupleBindingFactory getBindingTupleBindingFactory() - { - return _bindingTupleBindingFactory; - } - - protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory() - { - return _metaDataTupleBindingFactory; - } - - //Package getters for the various databases used by the Store - - Database getMetaDataDb() - { - return _messageMetaDataDb; - } - - Database getContentDb() - { - return _messageContentDb; - } - - Database getQueuesDb() - { - return _queueDb; - } - - Database getDeliveryDb() - { - return _deliveryDb; - } - - Database getExchangesDb() - { - return _exchangeDb; - } - - Database getBindingsDb() - { - return _queueBindingsDb; - } - - void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException - { - visitDatabase(_messageMetaDataDb, visitor); - } - - void visitContentDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException - { - visitDatabase(_messageContentDb, visitor); - } - - void visitQueues(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException - { - visitDatabase(_queueDb, visitor); - } - - void visitDelivery(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException - { - visitDatabase(_deliveryDb, visitor); - } - - void visitExchanges(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException - { - visitDatabase(_exchangeDb, visitor); - } - - void visitBindings(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException - { - visitDatabase(_queueBindingsDb, visitor); - } - - /** - * Generic visitDatabase allows iteration through the specified database. - * - * @param database The database to visit - * @param visitor The visitor to give each entry to. - * - * @throws DatabaseException If there is a problem with the Database structure - * @throws AMQStoreException If there is a problem with the Database contents - */ - void visitDatabase(Database database, DatabaseVisitor visitor) throws DatabaseException, AMQStoreException - { - Cursor cursor = database.openCursor(null, null); - - try - { - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - visitor.visit(key, value); - } - } - finally - { - if (cursor != null) - { - cursor.close(); - } - } - } - - private StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException - { - // _log.debug("void commit(Transaction tx = " + tx + ", sync = " + syncCommit + "): called"); - - tx.commitNoSync(); - - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); - commitFuture.commit(); - - return commitFuture; - } - - public void startCommitThread() - { - _commitThread.start(); - } - - private static final class BDBCommitFuture implements StoreFuture - { - // private static final Logger _log = Logger.getLogger(BDBCommitFuture.class); - - private final CommitThread _commitThread; - private final com.sleepycat.je.Transaction _tx; - private DatabaseException _databaseException; - private boolean _complete; - private boolean _syncCommit; - - public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit) - { - // _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx - // + "): called"); - - _commitThread = commitThread; - _tx = tx; - _syncCommit = syncCommit; - } - - public synchronized void complete() - { - if (_log.isDebugEnabled()) - { - _log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); - } - - _complete = true; - - notifyAll(); - } - - public synchronized void abort(DatabaseException databaseException) - { - // _log.debug("public synchronized void abort(DatabaseException databaseException = " + databaseException - // + "): called"); - - _complete = true; - _databaseException = databaseException; - - notifyAll(); - } - - public void commit() throws DatabaseException - { - //_log.debug("public void commit(): called"); - - _commitThread.addJob(this); - - if(!_syncCommit) - { - _log.debug("CommitAsync was requested, returning immediately."); - return; - } - - synchronized (BDBCommitFuture.this) - { - while (!_complete) - { - try - { - wait(250); - } - catch (InterruptedException e) - { - // _log.error("Unexpected thread interruption: " + e, e); - throw new RuntimeException(e); - } - } - - // _log.debug("Commit completed, _databaseException = " + _databaseException); - - if (_databaseException != null) - { - throw _databaseException; - } - } - } - - public synchronized boolean isComplete() - { - return _complete; - } - - public void waitForCompletion() - { - while (!isComplete()) - { - try - { - wait(250); - } - catch (InterruptedException e) - { - //TODO Should we ignore, or throw a 'StoreException'? - throw new RuntimeException(e); - } - } - } - } - - /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations - * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before - * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. - * - * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collarations </table> - */ - private class CommitThread extends Thread - { - // private final Logger _log = Logger.getLogger(CommitThread.class); - - private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final AtomicReference<Queue<BDBCommitFuture>> _jobQueue = new AtomicReference<Queue<BDBCommitFuture>>(new ConcurrentLinkedQueue<BDBCommitFuture>()); - private final CheckpointConfig _config = new CheckpointConfig(); - private final Object _lock = new Object(); - - public CommitThread(String name) - { - super(name); - _config.setForce(true); - - } - - public void run() - { - while (!_stopped.get()) - { - synchronized (_lock) - { - while (!_stopped.get() && !hasJobs()) - { - try - { - // RHM-7 Periodically wake up and check, just in case we - // missed a notification. Don't want to lock the broker hard. - _lock.wait(250); - } - catch (InterruptedException e) - { - // _log.info(getName() + " interrupted. "); - } - } - } - processJobs(); - } - } - - private void processJobs() - { - // _log.debug("private void processJobs(): called"); - - // we replace the old queue atomically with a new one and this avoids any need to - // copy elements out of the queue - Queue<BDBCommitFuture> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<BDBCommitFuture>()); - - try - { - // _environment.checkpoint(_config); - _environment.sync(); - - for (BDBCommitFuture commit : jobs) - { - commit.complete(); - } - } - catch (DatabaseException e) - { - for (BDBCommitFuture commit : jobs) - { - commit.abort(e); - } - } - - } - - private boolean hasJobs() - { - return !_jobQueue.get().isEmpty(); - } - - public void addJob(BDBCommitFuture commit) - { - synchronized (_lock) - { - _jobQueue.get().add(commit); - _lock.notifyAll(); - } - } - - public void close() - { - synchronized (_lock) - { - _stopped.set(true); - _lock.notifyAll(); - } - } - } - - - private class StoredBDBMessage implements StoredMessage - { - - private final long _messageId; - private volatile SoftReference<StorableMessageMetaData> _metaDataRef; - private com.sleepycat.je.Transaction _txn; - - StoredBDBMessage(long messageId, StorableMessageMetaData metaData) - { - this(messageId, metaData, true); - } - - - StoredBDBMessage(long messageId, - StorableMessageMetaData metaData, boolean persist) - { - try - { - _messageId = messageId; - - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - if(persist) - { - _txn = _environment.beginTransaction(null, null); - storeMetaData(_txn, messageId, metaData); - } - } - catch (DatabaseException e) - { - throw new RuntimeException(e); - } - catch (AMQStoreException e) - { - throw new RuntimeException(e); - } - - } - - public StorableMessageMetaData getMetaData() - { - StorableMessageMetaData metaData = _metaDataRef.get(); - if(metaData == null) - { - try - { - metaData = BDBMessageStore.this.getMessageMetaData(_messageId); - } - catch (AMQStoreException e) - { - throw new RuntimeException(e); - } - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - } - - return metaData; - } - - public long getMessageNumber() - { - return _messageId; - } - - public void addContent(int offsetInMessage, java.nio.ByteBuffer src) - { - try - { - BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src); - } - catch (AMQStoreException e) - { - throw new RuntimeException(e); - } - } - - public int getContent(int offsetInMessage, java.nio.ByteBuffer dst) - { - try - { - return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); - } - catch (AMQStoreException e) - { - throw new RuntimeException(e); - } - } - - public StoreFuture flushToStore() - { - try - { - if(_txn != null) - { - //if(_log.isDebugEnabled()) - //{ - // _log.debug("Flushing message " + _messageId + " to store"); - //} - BDBMessageStore.this.commitTranImpl(_txn, true); - } - } - catch (AMQStoreException e) - { - throw new RuntimeException(e); - } - finally - { - _txn = null; - } - return IMMEDIATE_FUTURE; - } - - public void remove() - { - flushToStore(); - try - { - BDBMessageStore.this.removeMessage(_messageId); - } - catch (AMQStoreException e) - { - throw new RuntimeException(e); - } - } - } - - private class BDBTransaction implements Transaction - { - private com.sleepycat.je.Transaction _txn; - - private BDBTransaction() - { - try - { - _txn = _environment.beginTransaction(null, null); - } - catch (DatabaseException e) - { - throw new RuntimeException(e); - } - } - - public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException - { - BDBMessageStore.this.enqueueMessage(_txn, queue, messageId); - } - - public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException - { - BDBMessageStore.this.dequeueMessage(_txn, queue, messageId); - - } - - public void commitTran() throws AMQStoreException - { - BDBMessageStore.this.commitTranImpl(_txn, true); - } - - public StoreFuture commitTranAsync() throws AMQStoreException - { - return BDBMessageStore.this.commitTranImpl(_txn, false); - } - - public void abortTran() throws AMQStoreException - { - BDBMessageStore.this.abortTran(_txn); - } - } - -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java deleted file mode 100644 index 211c025dcd..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java +++ /dev/null @@ -1,1125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; -import org.apache.qpid.server.store.berkeleydb.BindingKey; -import org.apache.qpid.server.store.berkeleydb.ContentTB; -import org.apache.qpid.server.store.berkeleydb.DatabaseVisitor; -import org.apache.qpid.server.store.berkeleydb.ExchangeTB; -import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4; -import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; -import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; -import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; -import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_4; -import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5; -import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.BrokerActor; -import org.apache.qpid.server.logging.NullRootMessageLogger; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.util.FileUtils; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; - -import java.io.File; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.io.IOException; -import java.io.FileNotFoundException; -import java.nio.ByteBuffer; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map.Entry; - -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Database; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.bind.tuple.TupleBinding; - -/** - * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V4 Store to a V5 Store. - * - * Currently upgrade is fixed from v4 -> v5 - * - * Improvments: - * - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against new DBs being added. - * - A version in the store would allow automated upgrade or later with more available versions interactive upgrade. - * - Add process logging and disable all Store and Qpid logging. - */ -public class BDBStoreUpgrade -{ - private static final Logger _logger = LoggerFactory.getLogger(BDBStoreUpgrade.class); - /** The Store Directory that needs upgrading */ - File _fromDir; - /** The Directory that will be made to contain the upgraded store */ - File _toDir; - /** The Directory that will be made to backup the original store if required */ - File _backupDir; - - /** The Old Store */ - BDBMessageStore _oldMessageStore; - /** The New Store */ - BDBMessageStore _newMessageStore; - /** The file ending that is used by BDB Store Files */ - private static final String BDB_FILE_ENDING = ".jdb"; - - static final Options _options = new Options(); - static CommandLine _commandLine; - private boolean _interactive; - private boolean _force; - - private static final String VERSION = "3.0"; - private static final String USER_ABORTED_PROCESS = "User aborted process"; - private static final int LOWEST_SUPPORTED_STORE_VERSION = 4; - private static final String PREVIOUS_STORE_VERSION_UNSUPPORTED = "Store upgrade from version {0} is not supported." - + " You must first run the previous store upgrade tool."; - private static final String FOLLOWING_STORE_VERSION_UNSUPPORTED = "Store version {0} is newer than this tool supports. " - + "You must use a newer version of the store upgrade tool"; - private static final String STORE_ALREADY_UPGRADED = "Store has already been upgraded to version {0}."; - - private static final String OPTION_INPUT_SHORT = "i"; - private static final String OPTION_INPUT = "input"; - private static final String OPTION_OUTPUT_SHORT = "o"; - private static final String OPTION_OUTPUT = "output"; - private static final String OPTION_BACKUP_SHORT = "b"; - private static final String OPTION_BACKUP = "backup"; - private static final String OPTION_QUIET_SHORT = "q"; - private static final String OPTION_QUIET = "quiet"; - private static final String OPTION_FORCE_SHORT = "f"; - private static final String OPTION_FORCE = "force"; - private boolean _inplace = false; - - public BDBStoreUpgrade(String fromDir, String toDir, String backupDir, boolean interactive, boolean force) - { - _interactive = interactive; - _force = force; - - _fromDir = new File(fromDir); - if (!_fromDir.exists()) - { - throw new IllegalArgumentException("BDBStore path '" + fromDir + "' could not be read. " - + "Ensure the path is correct and that the permissions are correct."); - } - - if (!isDirectoryAStoreDir(_fromDir)) - { - throw new IllegalArgumentException("Specified directory '" + fromDir + "' does not contain a valid BDBMessageStore."); - } - - if (toDir == null) - { - _inplace = true; - _toDir = new File(fromDir+"-Inplace"); - } - else - { - _toDir = new File(toDir); - } - - if (_toDir.exists()) - { - if (_interactive) - { - if (toDir == null) - { - System.out.println("Upgrading in place:" + fromDir); - } - else - { - System.out.println("Upgrade destination: '" + toDir + "'"); - } - - if (userInteract("Upgrade destination exists do you wish to replace it?")) - { - if (!FileUtils.delete(_toDir, true)) - { - throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'"); - } - } - else - { - throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. "); - } - } - else - { - if (_force) - { - if (!FileUtils.delete(_toDir, true)) - { - throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'"); - } - } - else - { - throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. "); - } - } - } - - if (!_toDir.mkdirs()) - { - throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' could not be created. " - + "Ensure the path is correct and that the permissions are correct."); - } - - if (backupDir != null) - { - if (backupDir.equals("")) - { - _backupDir = new File(_fromDir.getAbsolutePath().toString() + "-Backup"); - } - else - { - _backupDir = new File(backupDir); - } - } - else - { - _backupDir = null; - } - } - - private static String ANSWER_OPTIONS = " Yes/No/Abort? "; - private static String ANSWER_NO = "no"; - private static String ANSWER_N = "n"; - private static String ANSWER_YES = "yes"; - private static String ANSWER_Y = "y"; - private static String ANSWER_ABORT = "abort"; - private static String ANSWER_A = "a"; - - /** - * Interact with the user via System.in and System.out. If the user wishes to Abort then a RuntimeException is thrown. - * Otherwise the method will return based on their response true=yes false=no. - * - * @param message Message to print out - * - * @return boolean response from user if they wish to proceed - */ - private boolean userInteract(String message) - { - System.out.print(message + ANSWER_OPTIONS); - BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); - - String input = ""; - try - { - input = br.readLine(); - } - catch (IOException e) - { - input = ""; - } - - if (input.equalsIgnoreCase(ANSWER_Y) || input.equalsIgnoreCase(ANSWER_YES)) - { - return true; - } - else - { - if (input.equalsIgnoreCase(ANSWER_N) || input.equalsIgnoreCase(ANSWER_NO)) - { - return false; - } - else - { - if (input.equalsIgnoreCase(ANSWER_A) || input.equalsIgnoreCase(ANSWER_ABORT)) - { - throw new RuntimeException(USER_ABORTED_PROCESS); - } - } - } - - return userInteract(message); - } - - /** - * Upgrade a Store of a specified version to the latest version. - * - * @param version the version of the current store - * - * @throws Exception - */ - public void upgradeFromVersion(int version) throws Exception - { - upgradeFromVersion(version, _fromDir, _toDir, _backupDir, _force, - _inplace); - } - - /** - * Upgrade a Store of a specified version to the latest version. - * - * @param version the version of the current store - * @param fromDir the directory with the old Store - * @param toDir the directrory to hold the newly Upgraded Store - * @param backupDir the directrory to backup to if required - * @param force suppress all questions - * @param inplace replace the from dir with the upgraded result in toDir - * - * @throws Exception due to Virtualhost/MessageStore.close() being - * rather poor at exception handling - * @throws DatabaseException if there is a problem with the store formats - * @throws AMQException if there is an issue creating Qpid data structures - */ - public void upgradeFromVersion(int version, File fromDir, File toDir, - File backupDir, boolean force, - boolean inplace) throws Exception - { - _logger.info("Located store to upgrade at '" + fromDir + "'"); - - // Verify user has created a backup, giving option to perform backup - if (_interactive) - { - if (!userInteract("Have you performed a DB backup of this store.")) - { - File backup; - if (backupDir == null) - { - backup = new File(fromDir.getAbsolutePath().toString() + "-Backup"); - } - else - { - backup = backupDir; - } - - if (userInteract("Do you wish to perform a DB backup now? " + - "(Store will be backed up to '" + backup.getName() + "')")) - { - performDBBackup(fromDir, backup, force); - } - else - { - if (!userInteract("Are you sure wish to proceed with DB migration without backup? " + - "(For more details of the consequences check the Qpid/BDB Message Store Wiki).")) - { - throw new IllegalArgumentException("Upgrade stopped at user request as no DB Backup performed."); - } - } - } - else - { - if (!inplace) - { - _logger.info("Upgrade will create a new store at '" + toDir + "'"); - } - - _logger.info("Using the contents in the Message Store '" + fromDir + "'"); - - if (!userInteract("Do you wish to proceed?")) - { - throw new IllegalArgumentException("Upgrade stopped as did not wish to proceed"); - } - } - } - else - { - if (backupDir != null) - { - performDBBackup(fromDir, backupDir, force); - } - } - - CurrentActor.set(new BrokerActor(new NullRootMessageLogger())); - - //Create a new messageStore - _newMessageStore = new BDBMessageStore(); - _newMessageStore.configure(toDir, false); - _newMessageStore.start(); - - try - { - //Load the old MessageStore - switch (version) - { - default: - case 4: - _oldMessageStore = new BDBMessageStore(4); - _oldMessageStore.configure(fromDir, true); - _oldMessageStore.start(); - upgradeFromVersion_4(); - break; - case 3: - case 2: - case 1: - throw new IllegalArgumentException(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED, - new Object[] { Integer.toString(version) })); - } - } - finally - { - _newMessageStore.close(); - if (_oldMessageStore != null) - { - _oldMessageStore.close(); - } - // if we are running inplace then swap fromDir and toDir - if (inplace) - { - // Remove original copy - if (FileUtils.delete(fromDir, true)) - { - // Rename upgraded store - toDir.renameTo(fromDir); - } - else - { - throw new RuntimeException("Unable to upgrade inplace as " + - "unable to delete source '" - +fromDir+"', Store upgrade " + - "successfully performed to :" - +toDir); - } - } - } - } - - private void upgradeFromVersion_4() throws AMQException, DatabaseException - { - _logger.info("Starting store upgrade from version 4"); - - //Migrate _exchangeDb; - _logger.info("Exchanges"); - - moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange"); - - final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>(); - final TupleBinding exchangeTB = new ExchangeTB(); - - DatabaseVisitor exchangeListVisitor = new DatabaseVisitor() - { - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value); - AMQShortString type = exchangeRec.getType(); - - if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type)) - { - topicExchanges.add(exchangeRec.getNameShortString()); - } - } - }; - _oldMessageStore.visitExchanges(exchangeListVisitor); - - - //Migrate _queueBindingsDb; - _logger.info("Queue Bindings"); - moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding"); - - //Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those - //which have a colon in their name and are bound to the Topic exchanges above - final List<AMQShortString> durableSubQueues = new ArrayList<AMQShortString>(); - final TupleBinding<BindingKey> bindingTB = _oldMessageStore.getBindingTupleBindingFactory().getInstance(); - - DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor() - { - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - BindingKey bindingRec = (BindingKey) bindingTB.entryToObject(key); - AMQShortString queueName = bindingRec.getQueueName(); - AMQShortString exchangeName = bindingRec.getExchangeName(); - - if (topicExchanges.contains(exchangeName) && queueName.asString().contains(":")) - { - durableSubQueues.add(queueName); - } - } - }; - _oldMessageStore.visitBindings(durSubQueueListVisitor); - - - //Migrate _queueDb; - _logger.info("Queues"); - - // hold the list of existing queue names - final List<AMQShortString> existingQueues = new ArrayList<AMQShortString>(); - - final TupleBinding<QueueRecord> queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance(); - - DatabaseVisitor queueVisitor = new DatabaseVisitor() - { - public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException - { - QueueRecord queueRec = (QueueRecord) queueTupleBinding.entryToObject(value); - AMQShortString queueName = queueRec.getNameShortString(); - - //if the queue name is in the gathered list then set its exclusivity true - if (durableSubQueues.contains(queueName)) - { - _logger.info("Marking as possible DurableSubscription backing queue: " + queueName); - queueRec.setExclusive(true); - } - - //The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as - //the extra 'exclusive' property in v3 will be defaulted to false in the record creation. - _newMessageStore.createQueue(queueRec); - - _count++; - existingQueues.add(queueName); - } - }; - _oldMessageStore.visitQueues(queueVisitor); - - logCount(queueVisitor.getVisitedCount(), "Queue"); - - - // Look for persistent messages stored for non-durable queues - _logger.info("Checking for messages previously sent to non-durable queues"); - - // track all message delivery to existing queues - final HashSet<Long> queueMessages = new HashSet<Long>(); - - // hold all non existing queues and their messages IDs - final HashMap<String, HashSet<Long>> phantomMessageQueues = new HashMap<String, HashSet<Long>>(); - - // delivery DB visitor to check message delivery and identify non existing queues - final QueueEntryTB queueEntryTB = new QueueEntryTB(); - DatabaseVisitor messageDeliveryCheckVisitor = new DatabaseVisitor() - { - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key); - Long messageId = entryKey.getMessageId(); - AMQShortString queueName = entryKey.getQueueName(); - if (!existingQueues.contains(queueName)) - { - String name = queueName.asString(); - HashSet<Long> messages = phantomMessageQueues.get(name); - if (messages == null) - { - messages = new HashSet<Long>(); - phantomMessageQueues.put(name, messages); - } - messages.add(messageId); - _count++; - } - else - { - queueMessages.add(messageId); - } - } - }; - _oldMessageStore.visitDelivery(messageDeliveryCheckVisitor); - - if (phantomMessageQueues.isEmpty()) - { - _logger.info("No such messages were found"); - } - else - { - _logger.info("Found " + messageDeliveryCheckVisitor.getVisitedCount()+ " such messages in total"); - - for (Entry<String, HashSet<Long>> phantomQueue : phantomMessageQueues.entrySet()) - { - String queueName = phantomQueue.getKey(); - HashSet<Long> messages = phantomQueue.getValue(); - - _logger.info(MessageFormat.format("There are {0} messages which were previously delivered to non-durable queue ''{1}''",messages.size(), queueName)); - - boolean createQueue; - if(!_interactive) - { - createQueue = true; - _logger.info("Running in batch-mode, marking queue as durable to ensure retention of the messages."); - } - else - { - createQueue = userInteract("Do you want to make this queue durable?\n" - + "NOTE: Answering No will result in these messages being discarded!"); - } - - if (createQueue) - { - for (Long messageId : messages) - { - queueMessages.add(messageId); - } - AMQShortString name = new AMQShortString(queueName); - existingQueues.add(name); - QueueRecord record = new QueueRecord(name, null, false, null); - _newMessageStore.createQueue(record); - } - } - } - - - //Migrate _messageMetaDataDb; - _logger.info("Message MetaData"); - - final Database newMetaDataDB = _newMessageStore.getMetaDataDb(); - final TupleBinding<Object> oldMetaDataTupleBinding = _oldMessageStore.getMetaDataTupleBindingFactory().getInstance(); - final TupleBinding<Object> newMetaDataTupleBinding = _newMessageStore.getMetaDataTupleBindingFactory().getInstance(); - - DatabaseVisitor metaDataVisitor = new DatabaseVisitor() - { - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - _count++; - MessageMetaData metaData = (MessageMetaData) oldMetaDataTupleBinding.entryToObject(value); - - // get message id - Long messageId = TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key); - - // ONLY copy data if message is delivered to existing queue - if (!queueMessages.contains(messageId)) - { - return; - } - DatabaseEntry newValue = new DatabaseEntry(); - newMetaDataTupleBinding.objectToEntry(metaData, newValue); - - newMetaDataDB.put(null, key, newValue); - } - }; - _oldMessageStore.visitMetaDataDb(metaDataVisitor); - - logCount(metaDataVisitor.getVisitedCount(), "Message MetaData"); - - - //Migrate _messageContentDb; - _logger.info("Message Contents"); - final Database newContentDB = _newMessageStore.getContentDb(); - - final TupleBinding<MessageContentKey> oldContentKeyTupleBinding = new MessageContentKeyTB_4(); - final TupleBinding<MessageContentKey> newContentKeyTupleBinding = new MessageContentKeyTB_5(); - final TupleBinding contentTB = new ContentTB(); - - DatabaseVisitor contentVisitor = new DatabaseVisitor() - { - long _prevMsgId = -1; //Initialise to invalid value - int _bytesSeenSoFar = 0; - - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - _count++; - - //determine the msgId of the current entry - MessageContentKey_4 contentKey = (MessageContentKey_4) oldContentKeyTupleBinding.entryToObject(key); - long msgId = contentKey.getMessageId(); - - // ONLY copy data if message is delivered to existing queue - if (!queueMessages.contains(msgId)) - { - return; - } - //if this is a new message, restart the byte offset count. - if(_prevMsgId != msgId) - { - _bytesSeenSoFar = 0; - } - - //determine the content size - ByteBuffer content = (ByteBuffer) contentTB.entryToObject(value); - int contentSize = content.limit(); - - //create the new key: id + previously seen data count - MessageContentKey_5 newKey = new MessageContentKey_5(msgId, _bytesSeenSoFar); - DatabaseEntry newKeyEntry = new DatabaseEntry(); - newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry); - - DatabaseEntry newValueEntry = new DatabaseEntry(); - contentTB.objectToEntry(content, newValueEntry); - - newContentDB.put(null, newKeyEntry, newValueEntry); - - _prevMsgId = msgId; - _bytesSeenSoFar += contentSize; - } - }; - _oldMessageStore.visitContentDb(contentVisitor); - - logCount(contentVisitor.getVisitedCount(), "Message Content"); - - - //Migrate _deliveryDb; - _logger.info("Delivery Records"); - final Database deliveryDb =_newMessageStore.getDeliveryDb(); - DatabaseVisitor deliveryDbVisitor = new DatabaseVisitor() - { - - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - _count++; - - // get message id from entry key - QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key); - AMQShortString queueName = entryKey.getQueueName(); - - // ONLY copy data if message queue exists - if (existingQueues.contains(queueName)) - { - deliveryDb.put(null, key, value); - } - } - }; - _oldMessageStore.visitDelivery(deliveryDbVisitor); - logCount(contentVisitor.getVisitedCount(), "Delivery Record"); - } - - /** - * Log the specified count for item in a user friendly way. - * - * @param count of items to log - * @param item description of what is being logged. - */ - private void logCount(int count, String item) - { - _logger.info(" " + count + " " + item + " " + (count == 1 ? "entry" : "entries")); - } - - /** - * @param oldDatabase The old MessageStoreDB to perform the visit on - * @param newDatabase The new MessageStoreDB to copy the data to. - * @param contentName The string name of the content for display purposes. - * - * @throws AMQException Due to createQueue thorwing AMQException - * @throws DatabaseException If there is a problem with the loading of the data - */ - private void moveContents(Database oldDatabase, final Database newDatabase, String contentName) throws AMQException, DatabaseException - { - - DatabaseVisitor moveVisitor = new DatabaseVisitor() - { - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - _count++; - newDatabase.put(null, key, value); - } - }; - - _oldMessageStore.visitDatabase(oldDatabase, moveVisitor); - - logCount(moveVisitor.getVisitedCount(), contentName); - } - - private static void usage() - { - System.out.println("usage: BDBStoreUpgrade:\n [-h|--help] [-q|--quiet] [-f|--force] [-b|--backup <Path to backup-db>] " + - "-i|--input <Path to input-db> [-o|--output <Path to upgraded-db>]"); - } - - private static void help() - { - System.out.println("usage: BDBStoreUpgrade:"); - System.out.println("Required:"); - for (Object obj : _options.getOptions()) - { - Option option = (Option) obj; - if (option.isRequired()) - { - System.out.println("-" + option.getOpt() + "|--" + option.getLongOpt() + "\t\t-\t" + option.getDescription()); - } - } - - System.out.println("\nOptions:"); - for (Object obj : _options.getOptions()) - { - Option option = (Option) obj; - if (!option.isRequired()) - { - System.out.println("--" + option.getLongOpt() + "|-" + option.getOpt() + "\t\t-\t" + option.getDescription()); - } - } - } - - static boolean isDirectoryAStoreDir(File directory) - { - if (directory.isFile()) - { - return false; - } - - for (File file : directory.listFiles()) - { - if (file.isFile()) - { - if (file.getName().endsWith(BDB_FILE_ENDING)) - { - return true; - } - } - } - return false; - } - - static File[] discoverDBStores(File fromDir) - { - if (!fromDir.exists()) - { - throw new IllegalArgumentException("'" + fromDir + "' does not exist unable to upgrade."); - } - - // Ensure we are given a directory - if (fromDir.isFile()) - { - throw new IllegalArgumentException("'" + fromDir + "' is not a directory unable to upgrade."); - } - - // Check to see if we have been given a single directory - if (isDirectoryAStoreDir(fromDir)) - { - return new File[]{fromDir}; - } - - // Check to see if we have been give a directory containing stores. - List<File> stores = new LinkedList<File>(); - - for (File directory : fromDir.listFiles()) - { - if (directory.isDirectory()) - { - if (isDirectoryAStoreDir(directory)) - { - stores.add(directory); - } - } - } - - return stores.toArray(new File[stores.size()]); - } - - private static void performDBBackup(File source, File backup, boolean force) throws Exception - { - if (backup.exists()) - { - if (force) - { - _logger.info("Backup location exists. Forced to remove."); - FileUtils.delete(backup, true); - } - else - { - throw new IllegalArgumentException("Unable to perform backup a backup already exists."); - } - } - - try - { - _logger.info("Backing up '" + source + "' to '" + backup + "'"); - FileUtils.copyRecursive(source, backup); - } - catch (FileNotFoundException e) - { - //Throwing IAE here as this will be reported as a Backup not started - throw new IllegalArgumentException("Unable to perform backup:" + e.getMessage()); - } - catch (FileUtils.UnableToCopyException e) - { - //Throwing exception here as this will be reported as a Failed Backup - throw new Exception("Unable to perform backup due to:" + e.getMessage()); - } - } - - public static void main(String[] args) throws ParseException - { - setOptions(_options); - - final Options helpOptions = new Options(); - setHelpOptions(helpOptions); - - //Display help - boolean displayHelp = false; - try - { - if (new PosixParser().parse(helpOptions, args).hasOption("h")) - { - showHelp(); - } - } - catch (ParseException pe) - { - displayHelp = true; - } - - //Parse commandline for required arguments - try - { - _commandLine = new PosixParser().parse(_options, args); - } - catch (ParseException mae) - { - if (displayHelp) - { - showHelp(); - } - else - { - fatalError(mae.getMessage()); - } - } - - String fromDir = _commandLine.getOptionValue(OPTION_INPUT_SHORT); - String toDir = _commandLine.getOptionValue(OPTION_OUTPUT_SHORT); - String backupDir = _commandLine.getOptionValue(OPTION_BACKUP_SHORT); - - if (backupDir == null && _commandLine.hasOption(OPTION_BACKUP_SHORT)) - { - backupDir = ""; - } - - //Attempt to locate possible Store to upgrade on input path - File[] stores = new File[0]; - try - { - stores = discoverDBStores(new File(fromDir)); - } - catch (IllegalArgumentException iae) - { - fatalError(iae.getMessage()); - } - - boolean interactive = !_commandLine.hasOption(OPTION_QUIET_SHORT); - boolean force = _commandLine.hasOption(OPTION_FORCE_SHORT); - - try{ - for (File store : stores) - { - - // if toDir is null then we are upgrading inplace so we don't need - // to provide an upgraded toDir when upgrading multiple stores. - if (toDir == null || - // Check to see if we are upgrading a store specified in - // fromDir or if the directories are nested. - (stores.length > 0 - && stores[0].toString().length() == fromDir.length())) - { - upgrade(store, toDir, backupDir, interactive, force); - } - else - { - // Add the extra part of path from store to the toDir - upgrade(store, toDir + File.separator + store.toString().substring(fromDir.length()), backupDir, interactive, force); - } - } - } - catch (RuntimeException re) - { - if (!(USER_ABORTED_PROCESS).equals(re.getMessage())) - { - re.printStackTrace(); - _logger.error("Upgrade Failed: " + re.getMessage()); - } - else - { - _logger.error("Upgrade stopped : User aborted"); - } - } - - } - - @SuppressWarnings("static-access") - private static void setOptions(Options options) - { - Option input = - OptionBuilder.isRequired().hasArg().withDescription("Location (Path) of store to upgrade.").withLongOpt(OPTION_INPUT) - .create(OPTION_INPUT_SHORT); - - Option output = - OptionBuilder.hasArg().withDescription("Location (Path) for the upgraded-db to be written.").withLongOpt(OPTION_OUTPUT) - .create(OPTION_OUTPUT_SHORT); - - Option quiet = new Option(OPTION_QUIET_SHORT, OPTION_QUIET, false, "Disable interactive options."); - - Option force = new Option(OPTION_FORCE_SHORT, OPTION_FORCE, false, "Force upgrade removing any existing upgrade target."); - Option backup = - OptionBuilder.hasOptionalArg().withDescription("Location (Path) for the backup-db to be written.").withLongOpt(OPTION_BACKUP) - .create(OPTION_BACKUP_SHORT); - - options.addOption(input); - options.addOption(output); - options.addOption(quiet); - options.addOption(force); - options.addOption(backup); - setHelpOptions(options); - } - - private static void setHelpOptions(Options options) - { - options.addOption(new Option("h", "help", false, "Show this help.")); - } - - static void upgrade(File fromDir, String toDir, String backupDir, boolean interactive, boolean force) - { - - _logger.info("Running BDB Message Store upgrade tool: v" + VERSION); - int version = getStoreVersion(fromDir); - if (!isVersionUpgradable(version)) - { - return; - } - try - { - new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(version); - - _logger.info("Upgrade complete."); - } - catch (IllegalArgumentException iae) - { - _logger.error("Upgrade not started due to: " + iae.getMessage()); - } - catch (DatabaseException de) - { - de.printStackTrace(); - _logger.error("Upgrade Failed: " + de.getMessage()); - } - catch (RuntimeException re) - { - if (!(USER_ABORTED_PROCESS).equals(re.getMessage())) - { - re.printStackTrace(); - _logger.error("Upgrade Failed: " + re.getMessage()); - } - else - { - throw re; - } - } - catch (Exception e) - { - e.printStackTrace(); - _logger.error("Upgrade Failed: " + e.getMessage()); - } - } - - /** - * Utility method to verify if store of given version can be upgraded. - * - * @param version - * store version to verify - * @return true if store can be upgraded, false otherwise - */ - protected static boolean isVersionUpgradable(int version) - { - boolean storeUpgradable = false; - if (version == 0) - { - _logger.error("Existing store version is undefined!"); - } - else if (version < LOWEST_SUPPORTED_STORE_VERSION) - { - _logger.error(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED, - new Object[] { Integer.toString(version) })); - } - else if (version == BDBMessageStore.DATABASE_FORMAT_VERSION) - { - _logger.error(MessageFormat.format(STORE_ALREADY_UPGRADED, new Object[] { Integer.toString(version) })); - } - else if (version > BDBMessageStore.DATABASE_FORMAT_VERSION) - { - _logger.error(MessageFormat.format(FOLLOWING_STORE_VERSION_UNSUPPORTED, - new Object[] { Integer.toString(version) })); - } - else - { - _logger.info("Existing store version is " + version); - storeUpgradable = true; - } - return storeUpgradable; - } - - /** - * Detects existing store version by checking list of database in store - * environment - * - * @param fromDir - * store folder - * @return version - */ - public static int getStoreVersion(File fromDir) - { - int version = 0; - EnvironmentConfig envConfig = new EnvironmentConfig(); - envConfig.setAllowCreate(false); - envConfig.setTransactional(false); - envConfig.setReadOnly(true); - Environment environment = null; - try - { - - environment = new Environment(fromDir, envConfig); - List<String> databases = environment.getDatabaseNames(); - for (String name : databases) - { - if (name.startsWith("exchangeDb")) - { - if (name.startsWith("exchangeDb_v")) - { - version = Integer.parseInt(name.substring(12)); - } - else - { - version = 1; - } - break; - } - } - } - catch (Exception e) - { - _logger.error("Failure to open existing database: " + e.getMessage()); - } - finally - { - if (environment != null) - { - try - { - environment.close(); - } - catch (Exception e) - { - // ignoring. It should never happen. - } - } - } - return version; - } - - private static void fatalError(String message) - { - System.out.println(message); - usage(); - System.exit(1); - } - - private static void showHelp() - { - help(); - System.exit(0); - } - -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java deleted file mode 100644 index 396f0ed817..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public class BindingKey extends Object -{ - private final AMQShortString _exchangeName; - private final AMQShortString _queueName; - private final AMQShortString _routingKey; - private final FieldTable _arguments; - - public BindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments) - { - _exchangeName = exchangeName; - _queueName = queueName; - _routingKey = routingKey; - _arguments = arguments; - } - - - public AMQShortString getExchangeName() - { - return _exchangeName; - } - - public AMQShortString getQueueName() - { - return _queueName; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - public FieldTable getArguments() - { - return _arguments; - } - -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java deleted file mode 100644 index 5ea3e9c2e8..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.nio.ByteBuffer; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; - -public class ContentTB extends TupleBinding -{ - public Object entryToObject(TupleInput tupleInput) - { - - final int size = tupleInput.readInt(); - byte[] underlying = new byte[size]; - tupleInput.readFast(underlying); - return ByteBuffer.wrap(underlying); - } - - public void objectToEntry(Object object, TupleOutput tupleOutput) - { - ByteBuffer src = (ByteBuffer) object; - - src = src.slice(); - - byte[] chunkData = new byte[src.limit()]; - src.duplicate().get(chunkData); - - tupleOutput.writeInt(chunkData.length); - tupleOutput.writeFast(chunkData); - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java deleted file mode 100644 index 9bd879025f..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.qpid.AMQStoreException; - -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; - -/** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */ -public abstract class DatabaseVisitor -{ - protected int _count; - - abstract public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQStoreException, DatabaseException; - - public int getVisitedCount() - { - return _count; - } - - public void resetVisitCount() - { - _count = 0; - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java deleted file mode 100644 index f9c7828bef..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; - -public class ExchangeTB extends TupleBinding -{ - private static final Logger _log = Logger.getLogger(ExchangeTB.class); - - public ExchangeTB() - { - } - - public Object entryToObject(TupleInput tupleInput) - { - - AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput); - - boolean autoDelete = tupleInput.readBoolean(); - - return new ExchangeRecord(name, typeName, autoDelete); - } - - public void objectToEntry(Object object, TupleOutput tupleOutput) - { - ExchangeRecord exchange = (ExchangeRecord) object; - - AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput); - AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput); - - tupleOutput.writeBoolean(exchange.isAutoDelete()); - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java deleted file mode 100644 index c09498cce3..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.qpid.framing.FieldTable; - -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import com.sleepycat.je.DatabaseException; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; - -public class FieldTableEncoding -{ - public static FieldTable readFieldTable(TupleInput tupleInput) throws DatabaseException - { - long length = tupleInput.readLong(); - if (length <= 0) - { - return null; - } - else - { - - byte[] data = new byte[(int)length]; - tupleInput.readFast(data); - - try - { - return new FieldTable(new DataInputStream(new ByteArrayInputStream(data)),length); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - - } - - } - - public static void writeFieldTable(FieldTable fieldTable, TupleOutput tupleOutput) - { - - if (fieldTable == null) - { - tupleOutput.writeLong(0); - } - else - { - tupleOutput.writeLong(fieldTable.getEncodedSize()); - tupleOutput.writeFast(fieldTable.getDataAsBytes()); - } - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java deleted file mode 100644 index 005e8d4604..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -public class MessageContentKey -{ - private long _messageId; - - public MessageContentKey(long messageId) - { - _messageId = messageId; - } - - - public long getMessageId() - { - return _messageId; - } - - public void setMessageId(long messageId) - { - this._messageId = messageId; - } -}
\ No newline at end of file diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java deleted file mode 100644 index c274fdec8c..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.qpid.framing.AMQShortString; - -public class QueueEntryKey -{ - private AMQShortString _queueName; - private long _messageId; - - - public QueueEntryKey(AMQShortString queueName, long messageId) - { - _queueName = queueName; - _messageId = messageId; - } - - - public AMQShortString getQueueName() - { - return _queueName; - } - - - public long getMessageId() - { - return _messageId; - } - -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java deleted file mode 100644 index 30357c97d4..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.keys; - -import org.apache.qpid.server.store.berkeleydb.MessageContentKey; - -public class MessageContentKey_4 extends MessageContentKey -{ - private int _chunkNum; - - public MessageContentKey_4(long messageId, int chunkNo) - { - super(messageId); - _chunkNum = chunkNo; - } - - public int getChunk() - { - return _chunkNum; - } - - public void setChunk(int chunk) - { - this._chunkNum = chunk; - } -}
\ No newline at end of file diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java deleted file mode 100644 index a1a7fe80b5..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.keys; - -import org.apache.qpid.server.store.berkeleydb.MessageContentKey; - -public class MessageContentKey_5 extends MessageContentKey -{ - private int _offset; - - public MessageContentKey_5(long messageId, int chunkNo) - { - super(messageId); - _offset = chunkNo; - } - - public int getOffset() - { - return _offset; - } - - public void setOffset(int chunk) - { - this._offset = chunk; - } -}
\ No newline at end of file diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java deleted file mode 100644 index f20367e33b..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.records; - -import org.apache.qpid.framing.AMQShortString; - -public class ExchangeRecord extends Object -{ - private final AMQShortString _exchangeName; - private final AMQShortString _exchangeType; - private final boolean _autoDelete; - - public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete) - { - _exchangeName = exchangeName; - _exchangeType = exchangeType; - _autoDelete = autoDelete; - } - - public AMQShortString getNameShortString() - { - return _exchangeName; - } - - public AMQShortString getType() - { - return _exchangeType; - } - - public boolean isAutoDelete() - { - return _autoDelete; - } - -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java deleted file mode 100644 index fbe10433ca..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.records; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public class QueueRecord extends Object -{ - private final AMQShortString _queueName; - private final AMQShortString _owner; - private final FieldTable _arguments; - private boolean _exclusive; - - public QueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments) - { - _queueName = queueName; - _owner = owner; - _exclusive = exclusive; - _arguments = arguments; - } - - public AMQShortString getNameShortString() - { - return _queueName; - } - - public AMQShortString getOwner() - { - return _owner; - } - - public boolean isExclusive() - { - return _exclusive; - } - - public void setExclusive(boolean exclusive) - { - _exclusive = exclusive; - } - - public FieldTable getArguments() - { - return _arguments; - } - -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java deleted file mode 100644 index f6344b3d7d..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.testclient; - -import org.apache.log4j.Logger; - -import org.apache.qpid.ping.PingDurableClient; -import org.apache.qpid.server.store.berkeleydb.BDBBackup; -import org.apache.qpid.util.CommandLineParser; - -import java.util.Properties; - -/** - * BackupTestClient extends {@link PingDurableClient} with an action that takes a BDB backup when a configurable - * message count is reached. This enables a test user to restore this beackup, knowing how many committed but undelivered - * messages were in the backup, in order to check that all are re-delivered when the backup is retored. - * - * <p><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Perform BDB Backup on configurable message count. - * </table> - */ -public class BackupTestClient extends PingDurableClient -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(BackupTestClient.class); - - /** Holds the from directory to take backups from. */ - private String fromDir; - - /** Holds the to directory to store backups in. */ - private String toDir; - - /** - * Default constructor, passes all property overrides to the parent. - * - * @param overrides Any property overrides to apply to the defaults. - * - * @throws Exception Any underlying exception is allowed to fall through. - */ - BackupTestClient(Properties overrides) throws Exception - { - super(overrides); - } - - /** - * Starts the ping/wait/receive process. From and to directory locations for the BDB backups must be specified - * on the command line: - * - * <p/><table><caption>Command Line</caption> - * <tr><th> Option <th> Comment - * <tr><td> -fromdir <td> The path to the directory to back the bdb log file from. - * <tr><td> -todir <td> The path to the directory to save the backed up bdb log files to. - * </table> - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - // Use the same command line format as BDBBackup utility, (compulsory from and to directories). - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(BDBBackup.COMMAND_LINE_SPEC), - System.getProperties()); - BackupTestClient pingProducer = new BackupTestClient(options); - - // Keep the from and to directories for backups. - pingProducer.fromDir = options.getProperty("fromdir"); - pingProducer.toDir = options.getProperty("todir"); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - // pingProducer.getConnection().setExceptionListener(pingProducer); - - // Run the test procedure. - int sent = pingProducer.send(); - pingProducer.waitForUser("Press return to begin receiving the pings."); - pingProducer.receive(sent); - - System.exit(0); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(1); - } - } - - /** - * Supplies a triggered action extension, based on message count. This action takes a BDB log file backup. - */ - public void takeAction() - { - BDBBackup backupUtil = new BDBBackup(); - backupUtil.takeBackupNoLock(fromDir, toDir); - System.out.println("Took backup of BDB log files from directory: " + fromDir); - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java deleted file mode 100644 index 301ee417c5..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -public interface BindingTuple -{ -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java deleted file mode 100644 index 8e17f074d7..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import org.apache.qpid.server.store.berkeleydb.BindingKey; - -import com.sleepycat.bind.tuple.TupleBinding; - -public class BindingTupleBindingFactory extends TupleBindingFactory<BindingKey> -{ - public BindingTupleBindingFactory(int version) - { - super(version); - } - - public TupleBinding<BindingKey> getInstance() - { - switch (_version) - { - default: - case 5: - //no change from v4 - case 4: - return new BindingTuple_4(); - } - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java deleted file mode 100644 index 52b131a7f2..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.BindingKey; -import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import com.sleepycat.je.DatabaseException; - -public class BindingTuple_4 extends TupleBinding<BindingKey> implements BindingTuple -{ - protected static final Logger _log = Logger.getLogger(BindingTuple.class); - - public BindingTuple_4() - { - super(); - } - - public BindingKey entryToObject(TupleInput tupleInput) - { - AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput); - - FieldTable arguments; - - // Addition for Version 2 of this table - try - { - arguments = FieldTableEncoding.readFieldTable(tupleInput); - } - catch (DatabaseException e) - { - _log.error("Unable to create binding: " + e, e); - return null; - } - - return new BindingKey(exchangeName, queueName, routingKey, arguments); - } - - public void objectToEntry(BindingKey binding, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput); - AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput); - AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput); - - // Addition for Version 2 of this table - FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput); - } - -}
\ No newline at end of file diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java deleted file mode 100644 index f5ba6bbce3..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import org.apache.qpid.server.store.berkeleydb.MessageContentKey; -import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; - -public class MessageContentKeyTB_4 extends TupleBinding<MessageContentKey> -{ - - public MessageContentKey entryToObject(TupleInput tupleInput) - { - long messageId = tupleInput.readLong(); - int chunk = tupleInput.readInt(); - return new MessageContentKey_4(messageId, chunk); - } - - public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput) - { - final MessageContentKey_4 mk = (MessageContentKey_4) object; - tupleOutput.writeLong(mk.getMessageId()); - tupleOutput.writeInt(mk.getChunk()); - } - -}
\ No newline at end of file diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java deleted file mode 100644 index e6a2fd23a8..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import org.apache.qpid.server.store.berkeleydb.MessageContentKey; -import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; - -public class MessageContentKeyTB_5 extends TupleBinding<MessageContentKey> -{ - public MessageContentKey entryToObject(TupleInput tupleInput) - { - long messageId = tupleInput.readLong(); - int offset = tupleInput.readInt(); - return new MessageContentKey_5(messageId, offset); - } - - public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput) - { - final MessageContentKey_5 mk = (MessageContentKey_5) object; - tupleOutput.writeLong(mk.getMessageId()); - tupleOutput.writeInt(mk.getOffset()); - } - -}
\ No newline at end of file diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java deleted file mode 100644 index 76ee4f66e4..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import org.apache.qpid.server.store.berkeleydb.MessageContentKey; - -import com.sleepycat.bind.tuple.TupleBinding; - -public class MessageContentKeyTupleBindingFactory extends TupleBindingFactory<MessageContentKey> -{ - public MessageContentKeyTupleBindingFactory(int version) - { - super(version); - } - - public TupleBinding<MessageContentKey> getInstance() - { - switch (_version) - { - default: - case 5: - return new MessageContentKeyTB_5(); - case 4: - return new MessageContentKeyTB_4(); - } - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java deleted file mode 100644 index e26b544e38..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; - -import java.io.*; - -/** - * Handles the mapping to and from 0-8/0-9 message meta data - */ -public class MessageMetaDataTB_4 extends TupleBinding<Object> -{ - private static final Logger _log = Logger.getLogger(MessageMetaDataTB_4.class); - - public MessageMetaDataTB_4() - { - } - - public Object entryToObject(TupleInput tupleInput) - { - try - { - final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput); - final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput); - final int contentChunkCount = tupleInput.readInt(); - - return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount); - } - catch (Exception e) - { - _log.error("Error converting entry to object: " + e, e); - // annoyingly just have to return null since we cannot throw - return null; - } - } - - public void objectToEntry(Object object, TupleOutput tupleOutput) - { - MessageMetaData message = (MessageMetaData) object; - try - { - writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput); - } - catch (AMQException e) - { - // can't do anything else since the BDB interface precludes throwing any exceptions - // in practice we should never get an exception - throw new RuntimeException("Error converting object to entry: " + e, e); - } - writeContentHeader(message.getContentHeaderBody(), tupleOutput); - tupleOutput.writeInt(message.getContentChunkCount()); - } - - private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput) - { - - final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput); - final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput); - final boolean mandatory = tupleInput.readBoolean(); - final boolean immediate = tupleInput.readBoolean(); - - return new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return exchange; - } - - public void setExchange(AMQShortString exchange) - { - - } - - public boolean isImmediate() - { - return immediate; - } - - public boolean isMandatory() - { - return mandatory; - } - - public AMQShortString getRoutingKey() - { - return routingKey; - } - } ; - - } - - private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException - { - int bodySize = tupleInput.readInt(); - byte[] underlying = new byte[bodySize]; - tupleInput.readFast(underlying); - - try - { - return ContentHeaderBody.createFromBuffer(new DataInputStream(new ByteArrayInputStream(underlying)), bodySize); - } - catch (IOException e) - { - throw new AMQFrameDecodingException(null, e.getMessage(), e); - } - } - - private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException - { - - AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput); - AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput); - tupleOutput.writeBoolean(publishBody.isMandatory()); - tupleOutput.writeBoolean(publishBody.isImmediate()); - } - - private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput) - { - // write out the content header body - final int bodySize = headerBody.getSize(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(bodySize); - try - { - headerBody.writePayload(new DataOutputStream(baos)); - tupleOutput.writeInt(bodySize); - tupleOutput.writeFast(baos.toByteArray()); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java deleted file mode 100644 index 6dc041cb23..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import org.apache.log4j.Logger; -import org.apache.qpid.server.store.MessageMetaDataType; -import org.apache.qpid.server.store.StorableMessageMetaData; - -/** - * Handles the mapping to and from message meta data - */ -public class MessageMetaDataTB_5 extends MessageMetaDataTB_4 -{ - private static final Logger _log = Logger.getLogger(MessageMetaDataTB_5.class); - - @Override - public Object entryToObject(TupleInput tupleInput) - { - try - { - final int bodySize = tupleInput.readInt(); - byte[] dataAsBytes = new byte[bodySize]; - tupleInput.readFast(dataAsBytes); - - java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes); - buf.position(1); - buf = buf.slice(); - MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; - StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); - - return metaData; - } - catch (Exception e) - { - _log.error("Error converting entry to object: " + e, e); - // annoyingly just have to return null since we cannot throw - return null; - } - } - - @Override - public void objectToEntry(Object object, TupleOutput tupleOutput) - { - StorableMessageMetaData metaData = (StorableMessageMetaData) object; - - final int bodySize = 1 + metaData.getStorableSize(); - byte[] underlying = new byte[bodySize]; - underlying[0] = (byte) metaData.getType().ordinal(); - java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying); - buf.position(1); - buf = buf.slice(); - - metaData.writeToBuffer(0, buf); - tupleOutput.writeInt(bodySize); - tupleOutput.writeFast(underlying); - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java deleted file mode 100644 index 40153c13ea..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import com.sleepycat.bind.tuple.TupleBinding; - -public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory<Object> -{ - public MessageMetaDataTupleBindingFactory(int version) - { - super(version); - } - - public TupleBinding<Object> getInstance() - { - switch (_version) - { - default: - case 5: - return new MessageMetaDataTB_5(); - case 4: - return new MessageMetaDataTB_4(); - } - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java deleted file mode 100644 index 975e558874..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.QueueEntryKey; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; - -public class QueueEntryTB extends TupleBinding<QueueEntryKey> -{ - public QueueEntryKey entryToObject(TupleInput tupleInput) - { - AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); - Long messageId = tupleInput.readLong(); - - return new QueueEntryKey(queueName, messageId); - } - - public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput); - tupleOutput.writeLong(mk.getMessageId()); - } -}
\ No newline at end of file diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java deleted file mode 100644 index affa9a271d..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -public interface QueueTuple -{ -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java deleted file mode 100644 index 512e319f96..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; - -import com.sleepycat.bind.tuple.TupleBinding; - -public class QueueTupleBindingFactory extends TupleBindingFactory<QueueRecord> -{ - - public QueueTupleBindingFactory(int version) - { - super(version); - } - - public TupleBinding<QueueRecord> getInstance() - { - switch (_version) - { - default: - case 5: - return new QueueTuple_5(); - case 4: - return new QueueTuple_4(); - } - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java deleted file mode 100644 index 347eecf08e..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import com.sleepycat.je.DatabaseException; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; -import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public class QueueTuple_4 extends TupleBinding<QueueRecord> implements QueueTuple -{ - protected static final Logger _logger = Logger.getLogger(QueueTuple_4.class); - - protected FieldTable _arguments; - - public QueueTuple_4() - { - super(); - } - - public QueueRecord entryToObject(TupleInput tupleInput) - { - try - { - AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput); - // Addition for Version 2 of this table, read the queue arguments - FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); - - return new QueueRecord(name, owner, false, arguments); - } - catch (DatabaseException e) - { - _logger.error("Unable to create binding: " + e, e); - return null; - } - - } - - public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput); - AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput); - // Addition for Version 2 of this table, store the queue arguments - FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput); - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java deleted file mode 100644 index 0f293b79b7..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; -import com.sleepycat.je.DatabaseException; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; -import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public class QueueTuple_5 extends QueueTuple_4 -{ - protected static final Logger _logger = Logger.getLogger(QueueTuple_5.class); - - protected FieldTable _arguments; - - public QueueTuple_5() - { - super(); - } - - public QueueRecord entryToObject(TupleInput tupleInput) - { - try - { - AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); - AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput); - // Addition for Version 2 of this table, read the queue arguments - FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); - // Addition for Version 3 of this table, read the queue exclusivity - boolean exclusive = tupleInput.readBoolean(); - - return new QueueRecord(name, owner, exclusive, arguments); - } - catch (DatabaseException e) - { - _logger.error("Unable to create binding: " + e, e); - return null; - } - - } - - public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput) - { - AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput); - AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput); - // Addition for Version 2 of this table, store the queue arguments - FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput); - // Addition for Version 3 of this table, store the queue exclusivity - tupleOutput.writeBoolean(queue.isExclusive()); - } -} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java deleted file mode 100644 index 2adac1f9a3..0000000000 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.tuples; - -import com.sleepycat.bind.tuple.TupleBinding; - -public abstract class TupleBindingFactory<E> -{ - protected int _version; - - public TupleBindingFactory(int version) - { - _version = version; - } - - public abstract TupleBinding<E> getInstance(); -} diff --git a/java/bdbstore/src/resources/backup-log4j.xml b/java/bdbstore/src/resources/backup-log4j.xml deleted file mode 100644 index 6b0619f0b6..0000000000 --- a/java/bdbstore/src/resources/backup-log4j.xml +++ /dev/null @@ -1,65 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - - - Licensed to the Apache Software Foundation (ASF) under one - - or more contributor license agreements. See the NOTICE file - - distributed with this work for additional information - - regarding copyright ownership. The ASF licenses this file - - to you under the Apache License, Version 2.0 (the - - "License"); you may not use this file except in compliance - - with the License. You may obtain a copy of the License at - - - - http://www.apache.org/licenses/LICENSE-2.0 - - - - Unless required by applicable law or agreed to in writing, - - software distributed under the License is distributed on an - - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - - KIND, either express or implied. See the License for the - - specific language governing permissions and limitations - - under the License. - - - --> -<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> - -<!-- =============================================================================== --> -<!-- This is a Log4j configuration specially created for the BDB Backup utility, --> -<!-- it outputs logging to the console for specifically designated console loggers --> -<!-- at info level or above only. This avoids spamming the user with any internals --> -<!-- of the Qpid code. --> -<!-- Use a different logging set up to capture debugging output to diagnose errors. --> -<!-- =============================================================================== --> - -<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false"> - - <!-- ====================================================== --> - <!-- Append messages to the console at info level or above. --> - <!-- ====================================================== --> - - <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender"> - <param name="Target" value="System.out"/> - <param name="Threshold" value="info"/> - - <layout class="org.apache.log4j.PatternLayout"> - <!-- The default pattern: Date Priority [Category] Message\n --> - <param name="ConversionPattern" value="%m%n"/> - </layout> - - </appender> - - <!-- ================ --> - <!-- Limit categories --> - <!-- ================ --> - - <category name="org.apache.qpid.server.store.berkeleydb.BDBBackup"> - <priority value="info"/> - </category> - - <!-- ======================= --> - <!-- Setup the Root category --> - <!-- ======================= --> - - <root> - <appender-ref ref="CONSOLE"/> - </root> - -</log4j:configuration> diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java deleted file mode 100644 index d076babc61..0000000000 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.qpid.framing.AMQShortString; - -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; - -import junit.framework.TestCase; - -/** - * Tests for {@code AMQShortStringEncoding} including corner cases when string - * is null or over 127 characters in length - */ -public class AMQShortStringEncodingTest extends TestCase -{ - - public void testWriteReadNullValues() - { - // write into tuple output - TupleOutput tupleOutput = new TupleOutput(); - AMQShortStringEncoding.writeShortString(null, tupleOutput); - byte[] data = tupleOutput.getBufferBytes(); - - // read from tuple input - TupleInput tupleInput = new TupleInput(data); - AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput); - assertNull("Expected null but got " + result, result); - } - - public void testWriteReadShortStringWithLengthOver127() - { - AMQShortString value = createString('a', 128); - - // write into tuple output - TupleOutput tupleOutput = new TupleOutput(); - AMQShortStringEncoding.writeShortString(value, tupleOutput); - byte[] data = tupleOutput.getBufferBytes(); - - // read from tuple input - TupleInput tupleInput = new TupleInput(data); - AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput); - assertEquals("Expected " + value + " but got " + result, value, result); - } - - public void testWriteReadShortStringWithLengthLess127() - { - AMQShortString value = new AMQShortString("test"); - - // write into tuple output - TupleOutput tupleOutput = new TupleOutput(); - AMQShortStringEncoding.writeShortString(value, tupleOutput); - byte[] data = tupleOutput.getBufferBytes(); - - // read from tuple input - TupleInput tupleInput = new TupleInput(data); - AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput); - assertEquals("Expected " + value + " but got " + result, value, result); - } - - private AMQShortString createString(char ch, int length) - { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < length; i++) - { - sb.append(ch); - } - return new AMQShortString(sb.toString()); - } - -} diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java deleted file mode 100644 index ef31b78cfe..0000000000 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ /dev/null @@ -1,470 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import java.io.File; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; - -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.message.MessageMetaData_0_10; -import org.apache.qpid.server.store.MessageMetaDataType; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; - -/** - * Subclass of MessageStoreTest which runs the standard tests from the superclass against - * the BDB Store as well as additional tests specific to the DBB store-implementation. - */ -public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest -{ - /** - * Tests that message metadata and content are successfully read back from a - * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to - * verify their ability to co-exist within the store and be successful retrieved. - */ - public void testBDBMessagePersistence() throws Exception - { - MessageStore store = getVirtualHost().getMessageStore(); - - BDBMessageStore bdbStore = assertBDBStore(store); - - // Create content ByteBuffers. - // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. - // Use a single chunk for the 0-10 message as per broker behaviour. - String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf"; - - ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes()); - ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes()); - - ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes()); - int bodySize = completeContentBody_0_10.limit(); - - /* - * Create and insert a 0-8 message (metadata and multi-chunk content) - */ - MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); - BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); - - ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0); - StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); - - long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - storedMessage_0_8.addContent(0, firstContentBytes_0_8); - storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); - storedMessage_0_8.flushToStore(); - - /* - * Create and insert a 0-10 message (metadata and content) - */ - MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); - DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); - Header header_0_10 = new Header(msgProps_0_10, delProps_0_10); - - MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); - - MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); - StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10); - - long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); - long messageid_0_10 = storedMessage_0_10.getMessageNumber(); - - storedMessage_0_10.addContent(0, completeContentBody_0_10); - storedMessage_0_10.flushToStore(); - - /* - * reload the store only (read-only) - */ - bdbStore = reloadStoreReadOnly(bdbStore); - - /* - * Read back and validate the 0-8 message metadata and content - */ - StorableMessageMetaData storeableMMD_0_8 = bdbStore.getMessageMetaData(messageid_0_8); - - assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_8, storeableMMD_0_8.getType()); - assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData); - MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8; - - assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime()); - - MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo(); - assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange()); - assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate()); - assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory()); - assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey()); - - ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody(); - assertEquals("ContentHeader ClassID has changed", chb_0_8.classId, returnedHeaderBody_0_8.classId); - assertEquals("ContentHeader weight has changed", chb_0_8.weight, returnedHeaderBody_0_8.weight); - assertEquals("ContentHeader bodySize has changed", chb_0_8.bodySize, returnedHeaderBody_0_8.bodySize); - - BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties(); - assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString()); - assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString()); - - ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.bodySize) ; - long recoveredCount_0_8 = bdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8); - assertEquals("Incorrect amount of payload data recovered", chb_0_8.bodySize, recoveredCount_0_8); - String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8); - - /* - * Read back and validate the 0-10 message metadata and content - */ - StorableMessageMetaData storeableMMD_0_10 = bdbStore.getMessageMetaData(messageid_0_10); - - assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_10, storeableMMD_0_10.getType()); - assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10); - MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10; - - assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime()); - - DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class); - assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10); - assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate()); - assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey()); - assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange()); - assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration()); - assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority()); - - MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class); - assertNotNull("MessageProperties were not returned", returnedMsgProps); - assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId())); - assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength()); - assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType()); - - ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ; - long recoveredCount = bdbStore.getContent(messageid_0_10, 0, recoveredContent); - assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount); - - String returnedPayloadString_0_10 = new String(recoveredContent.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10); - } - - private DeliveryProperties createDeliveryProperties_0_10() - { - DeliveryProperties delProps_0_10 = new DeliveryProperties(); - - delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT); - delProps_0_10.setImmediate(true); - delProps_0_10.setExchange("exchange12345"); - delProps_0_10.setRoutingKey("routingKey12345"); - delProps_0_10.setExpiration(5); - delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE); - - return delProps_0_10; - } - - private MessageProperties createMessageProperties_0_10(int bodySize) - { - MessageProperties msgProps_0_10 = new MessageProperties(); - msgProps_0_10.setContentLength(bodySize); - msgProps_0_10.setCorrelationId("qwerty".getBytes()); - msgProps_0_10.setContentType("text/html"); - - return msgProps_0_10; - } - - /** - * Close the provided store and create a new (read-only) store to read back the data. - * - * Use this method instead of reloading the virtual host like other tests in order - * to avoid the recovery handler deleting the message for not being on a queue. - */ - private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception - { - messageStore.close(); - File storePath = new File(String.valueOf(_config.getProperty("store.environment-path"))); - - BDBMessageStore newStore = new BDBMessageStore(); - newStore.configure(storePath, false); - newStore.start(); - - return newStore; - } - - private MessagePublishInfo createPublishInfoBody_0_8() - { - return new MessagePublishInfo() - { - public AMQShortString getExchange() - { - return new AMQShortString("exchange12345"); - } - - public void setExchange(AMQShortString exchange) - { - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return true; - } - - public AMQShortString getRoutingKey() - { - return new AMQShortString("routingKey12345"); - } - }; - - } - - private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length) - { - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); - int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); - return new ContentHeaderBody(classForBasic, 1, props, length); - } - - private BasicContentHeaderProperties createContentHeaderProperties_0_8() - { - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); - props.setContentType("text/html"); - props.getHeaders().setString("Test", "MST"); - return props; - } - - /** - * Tests that messages which are added to the store and then removed using the - * public MessageStore interfaces are actually removed from the store by then - * interrogating the store with its own implementation methods and verifying - * expected exceptions are thrown to indicate the message is not present. - */ - public void testMessageCreationAndRemoval() throws Exception - { - MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); - - StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - //remove the message in the fashion the broker normally would - storedMessage_0_8.remove(); - - //verify the removal using the BDB store implementation methods directly - try - { - // the next line should throw since the message id should not be found - bdbStore.getMessageMetaData(messageid_0_8); - fail("No exception thrown when message id not found getting metadata"); - } - catch (AMQStoreException e) - { - // pass since exception expected - } - - //expecting no content, allocate a 1 byte - ByteBuffer dst = ByteBuffer.allocate(1); - - assertEquals("Retrieved content when none was expected", - 0, bdbStore.getContent(messageid_0_8, 0, dst)); - } - - private BDBMessageStore assertBDBStore(Object store) - { - if(!(store instanceof BDBMessageStore)) - { - fail("Test requires an instance of BDBMessageStore to proceed"); - } - - return (BDBMessageStore) store; - } - - private StoredMessage<MessageMetaData> createAndStoreMultiChunkMessage_0_8(MessageStore store) - { - byte[] body10Bytes = "0123456789".getBytes(); - byte[] body5Bytes = "01234".getBytes(); - - ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes); - ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes); - - int bodySize = body10Bytes.length + body5Bytes.length; - - //create and store the message using the MessageStore interface - MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); - BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); - - ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0); - StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8); - - storedMessage_0_8.addContent(0, chunk1); - storedMessage_0_8.addContent(chunk1.limit(), chunk2); - storedMessage_0_8.flushToStore(); - - return storedMessage_0_8; - } - - /** - * Tests transaction commit by utilising the enqueue and dequeue methods available - * in the TransactionLog interface implemented by the store, and verifying the - * behaviour using BDB implementation methods. - */ - public void testTranCommit() throws Exception - { - TransactionLog log = getVirtualHost().getTransactionLog(); - - BDBMessageStore bdbStore = assertBDBStore(log); - - final AMQShortString mockQueueName = new AMQShortString("queueName"); - - TransactionLogResource mockQueue = new TransactionLogResource() - { - public String getResourceName() - { - return mockQueueName.asString(); - } - }; - - TransactionLog.Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, 1L); - txn.enqueueMessage(mockQueue, 5L); - txn.commitTran(); - - List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 1L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 5L, val.longValue()); - } - - - /** - * Tests transaction rollback before a commit has occurred by utilising the - * enqueue and dequeue methods available in the TransactionLog interface - * implemented by the store, and verifying the behaviour using BDB - * implementation methods. - */ - public void testTranRollbackBeforeCommit() throws Exception - { - TransactionLog log = getVirtualHost().getTransactionLog(); - - BDBMessageStore bdbStore = assertBDBStore(log); - - final AMQShortString mockQueueName = new AMQShortString("queueName"); - - TransactionLogResource mockQueue = new TransactionLogResource() - { - public String getResourceName() - { - return mockQueueName.asString(); - } - }; - - TransactionLog.Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, 21L); - txn.abortTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, 22L); - txn.enqueueMessage(mockQueue, 23L); - txn.commitTran(); - - List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 22L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 23L, val.longValue()); - } - - /** - * Tests transaction rollback after a commit has occurred by utilising the - * enqueue and dequeue methods available in the TransactionLog interface - * implemented by the store, and verifying the behaviour using BDB - * implementation methods. - */ - public void testTranRollbackAfterCommit() throws Exception - { - TransactionLog log = getVirtualHost().getTransactionLog(); - - BDBMessageStore bdbStore = assertBDBStore(log); - - final AMQShortString mockQueueName = new AMQShortString("queueName"); - - TransactionLogResource mockQueue = new TransactionLogResource() - { - public String getResourceName() - { - return mockQueueName.asString(); - } - }; - - TransactionLog.Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, 30L); - txn.commitTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, 31L); - txn.abortTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, 32L); - txn.commitTran(); - - List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 30L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 32L, val.longValue()); - } - -} diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java deleted file mode 100644 index cc19bcf5d8..0000000000 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; - -import junit.framework.TestCase; - -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.url.URLSyntaxException; - -/** - * Prepares an older version brokers BDB store with the required - * contents for use in the BDBStoreUpgradeTest. - * - * The store will then be used to verify that the upgraded is - * completed properly and that once upgraded it functions as - * expected with the new broker. - */ -public class BDBStoreUpgradeTestPreparer extends TestCase -{ - public static final String TOPIC_NAME="myUpgradeTopic"; - public static final String SUB_NAME="myDurSubName"; - public static final String QUEUE_NAME="myUpgradeQueue"; - - private static AMQConnectionFactory _connFac; - private static final String CONN_URL = - "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'"; - - /** - * Create a BDBStoreUpgradeTestPreparer instance - */ - public BDBStoreUpgradeTestPreparer () throws URLSyntaxException - { - _connFac = new AMQConnectionFactory(CONN_URL); - } - - /** - * Utility test method to allow running the preparation tool - * using the test framework - */ - public void testPrepareBroker() throws Exception - { - prepareBroker(); - } - - private void prepareBroker() throws Exception - { - prepareQueues(); - prepareDurableSubscription(); - } - - /** - * Prepare a queue for use in testing message and binding recovery - * after the upgrade is performed. - * - * - Create a transacted session on the connection. - * - Use a consumer to create the (durable by default) queue. - * - Send 5 large messages to test (multi-frame) content recovery. - * - Send 1 small message to test (single-frame) content recovery. - * - Commit the session. - * - Send 5 small messages to test that uncommitted messages are not recovered. - * following the upgrade. - * - Close the session. - */ - private void prepareQueues() throws Exception - { - // Create a connection - Connection connection = _connFac.createConnection(); - connection.start(); - connection.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException e) - { - e.printStackTrace(); - } - }); - // Create a session on the connection, transacted to confirm delivery - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue(QUEUE_NAME); - // Create a consumer to ensure the queue gets created - // (and enter it into the store, as queues are made durable by default) - MessageConsumer messageConsumer = session.createConsumer(queue); - messageConsumer.close(); - - // Create a Message producer - MessageProducer messageProducer = session.createProducer(queue); - - // Publish 5 persistent messages, 256k chars to ensure they are multi-frame - sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 5); - // Publish 5 persistent messages, 1k chars to ensure they are single-frame - sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5); - - session.commit(); - - // Publish 5 persistent messages which will NOT be committed and so should be 'lost' - sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5); - - session.close(); - connection.close(); - } - - /** - * Prepare a DurableSubscription backing queue for use in testing selector - * recovery and queue exclusivity marking during the upgrade process. - * - * - Create a transacted session on the connection. - * - Open and close a DurableSubscription with selector to create the backing queue. - * - Send a message which matches the selector. - * - Send a message which does not match the selector. - * - Send a message which matches the selector but will remain uncommitted. - * - Close the session. - */ - private void prepareDurableSubscription() throws Exception - { - - // Create a connection - TopicConnection connection = _connFac.createTopicConnection(); - connection.start(); - connection.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException e) - { - e.printStackTrace(); - } - }); - // Create a session on the connection, transacted to confirm delivery - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Topic topic = session.createTopic(TOPIC_NAME); - - // Create and register a durable subscriber with selector and then close it - TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false); - durSub1.close(); - - // Create a publisher and send a persistent message which matches the selector - // followed by one that does not match, and another which matches but is not - // committed and so should be 'lost' - TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); - TopicPublisher publisher = pubSession.createPublisher(topic); - - publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); - publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); - pubSession.commit(); - publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); - - publisher.close(); - pubSession.close(); - - } - - public static void sendMessages(Session session, MessageProducer messageProducer, - Destination dest, int deliveryMode, int length, int numMesages) throws JMSException - { - for (int i = 1; i <= numMesages; i++) - { - Message message = session.createTextMessage(generateString(length)); - message.setIntProperty("ID", i); - messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - } - } - - public static void publishMessages(Session session, TopicPublisher publisher, - Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException - { - for (int i = 1; i <= numMesages; i++) - { - Message message = session.createTextMessage(generateString(length)); - message.setIntProperty("ID", i); - message.setStringProperty("testprop", selectorProperty); - publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - } - } - - /** - * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2. - * - * @param length number of characters in the string - * @return string sequence of the given length - */ - public static String generateString(int length) - { - char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'}; - char[] chars = new char[length]; - for (int i = 0; i < (length); i++) - { - chars[i] = base_chars[i % 10]; - } - return new String(chars); - } - - /** - * Run the preparation tool. - * @param args Command line arguments. - */ - public static void main(String[] args) throws Exception - { - BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer(); - producer.prepareBroker(); - } -}
\ No newline at end of file diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java deleted file mode 100644 index 4861e007af..0000000000 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java +++ /dev/null @@ -1,540 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME; -import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.management.common.mbeans.ManagedQueue; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4; -import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory; -import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.util.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.je.DatabaseEntry; - -/** - * Tests upgrading a BDB store and using it with the new broker - * after the required contents are entered into the store using - * an old broker with the BDBStoreUpgradeTestPreparer. The store - * will then be used to verify that the upgraded is completed - * properly and that once upgraded it functions as expected with - * the new broker. - */ -public class BDBUpgradeTest extends QpidBrokerTestCase -{ - protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class); - - private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024); - private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256); - private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK"); - private static final String QPID_HOME = System.getProperty("QPID_HOME"); - private static final int VERSION_4 = 4; - - private String _fromDir; - private String _toDir; - private String _toDirTwice; - - @Override - public void setUp() throws Exception - { - assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); - assertNotNull("QPID_HOME must be set", QPID_HOME); - - if(! isExternalBroker()) - { - //override QPID_WORK to add the InVM port used so the store - //output from the upgrade tool can be found by the broker - setSystemProperty("QPID_WORK", QPID_WORK_ORIG + "/" + getPort()); - } - - _fromDir = QPID_HOME + "/bdbstore-to-upgrade/test-store"; - _toDir = getWorkDirBaseDir() + "/bdbstore/test-store"; - _toDirTwice = getWorkDirBaseDir() + "/bdbstore-upgraded-twice"; - - //Clear the two target directories if they exist. - File directory = new File(_toDir); - if (directory.exists() && directory.isDirectory()) - { - FileUtils.delete(directory, true); - } - directory = new File(_toDirTwice); - if (directory.exists() && directory.isDirectory()) - { - FileUtils.delete(directory, true); - } - - //Upgrade the test store. - upgradeBrokerStore(_fromDir, _toDir); - - //override the broker config used and then start the broker with the updated store - _configFile = new File("build/etc/config-systests-bdb.xml"); - setConfigurationProperty("management.enabled", "true"); - - super.setUp(); - } - - private String getWorkDirBaseDir() - { - return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort()); - } - - /** - * Tests that the core upgrade method of the store upgrade tool passes through the exception - * from the BDBMessageStore indicating that the data on disk can't be loaded as the previous - * version because it has already been upgraded. - * @throws Exception - */ - public void testMultipleUpgrades() throws Exception - { - //stop the broker started by setUp() in order to allow the second upgrade attempt to proceed - stopBroker(); - - try - { - new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(VERSION_4); - fail("Second Upgrade Succeeded"); - } - catch (Exception e) - { - System.err.println("Showing stack trace, we are expecting an 'Unable to load BDBStore' error"); - e.printStackTrace(); - assertTrue("Incorrect Exception Thrown:" + e.getMessage(), - e.getMessage().contains("Unable to load BDBStore as version 4. Store on disk contains version 5 data")); - } - } - - /** - * Test that the selector applied to the DurableSubscription was successfully - * transfered to the new store, and functions as expected with continued use - * by monitoring message count while sending new messages to the topic. - */ - public void testSelectorDurability() throws Exception - { - JMXTestUtils jmxUtils = null; - try - { - jmxUtils = new JMXTestUtils(this, "guest", "guest"); - jmxUtils.open(); - } - catch (Exception e) - { - fail("Unable to establish JMX connection, test cannot proceed"); - } - - try - { - ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); - assertEquals("DurableSubscription backing queue should have 1 message on it initially", - new Integer(1), dursubQueue.getMessageCount()); - - // Create a connection and start it - TopicConnection connection = (TopicConnection) getConnection(); - connection.start(); - - // Send messages which don't match and do match the selector, checking message count - TopicSession pubSession = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED); - Topic topic = pubSession.createTopic(TOPIC_NAME); - TopicPublisher publisher = pubSession.createPublisher(topic); - - BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); - pubSession.commit(); - assertEquals("DurableSubscription backing queue should still have 1 message on it", - new Integer(1), dursubQueue.getMessageCount()); - - BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); - pubSession.commit(); - assertEquals("DurableSubscription backing queue should now have 2 messages on it", - new Integer(2), dursubQueue.getMessageCount()); - - dursubQueue.clearQueue(); - pubSession.close(); - } - finally - { - jmxUtils.close(); - } - } - - /** - * Test that the backing queue for the durable subscription created was successfully - * detected and set as being exclusive during the upgrade process, and that the - * regular queue was not. - */ - public void testQueueExclusivity() throws Exception - { - JMXTestUtils jmxUtils = null; - try - { - jmxUtils = new JMXTestUtils(this, "guest", "guest"); - jmxUtils.open(); - } - catch (Exception e) - { - fail("Unable to establish JMX connection, test cannot proceed"); - } - - try - { - ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME); - assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive()); - - ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); - assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive()); - } - finally - { - jmxUtils.close(); - } - } - - /** - * Test that the upgraded queue continues to function properly when used - * for persistent messaging and restarting the broker. - * - * Sends the new messages to the queue BEFORE consuming those which were - * sent before the upgrade. In doing so, this also serves to test that - * the queue bindings were successfully transitioned during the upgrade. - */ - public void testBindingAndMessageDurabability() throws Exception - { - // Create a connection and start it - TopicConnection connection = (TopicConnection) getConnection(); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(QUEUE_NAME); - MessageProducer messageProducer = session.createProducer(queue); - - // Send a new message - BDBStoreUpgradeTestPreparer.sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1); - - session.close(); - - // Restart the broker - restartBroker(); - - // Drain the queue of all messages - connection = (TopicConnection) getConnection(); - connection.start(); - consumeQueueMessages(connection, true); - } - - /** - * Test that all of the committed persistent messages previously sent to - * the broker are properly received following update of the MetaData and - * Content entries during the store upgrade process. - */ - public void testConsumptionOfUpgradedMessages() throws Exception - { - // Create a connection and start it - Connection connection = getConnection(); - connection.start(); - - consumeDurableSubscriptionMessages(connection); - consumeQueueMessages(connection, false); - } - - /** - * Tests store migration containing messages for non-existing queue. - * - * @throws Exception - */ - public void testMigrationOfMessagesForNonExistingQueues() throws Exception - { - stopBroker(); - - // copy store data into a new location for adding of phantom message - File storeLocation = new File(_fromDir); - File target = new File(_toDirTwice); - if (!target.exists()) - { - target.mkdirs(); - } - FileUtils.copyRecursive(storeLocation, target); - - // delete migrated data - File directory = new File(_toDir); - if (directory.exists() && directory.isDirectory()) - { - FileUtils.delete(directory, true); - } - - // test data - String nonExistingQueueName = getTestQueueName(); - String messageText = "Test Phantom Message"; - - // add message - addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText); - - String[] inputs = { "Yes", "Yes", "Yes" }; - upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs); - - // start broker - startBroker(); - - // Create a connection and start it - Connection connection = getConnection(); - connection.start(); - - // consume a message for non-existing store - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(nonExistingQueueName); - MessageConsumer messageConsumer = session.createConsumer(queue); - Message message = messageConsumer.receive(1000); - - // assert consumed message - assertNotNull("Message was not migrated!", message); - assertTrue("Unexpected message received!", message instanceof TextMessage); - String text = ((TextMessage) message).getText(); - assertEquals("Message migration failed!", messageText, text); - } - - /** - * An utility method to upgrade broker with simulation user interactions - * - * @param fromDir - * location of the store to migrate - * @param toDir - * location of where migrated data will be stored - * @param inputs - * user answers on upgrade tool questions - * @throws Exception - */ - private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs) - throws Exception - { - // save to restore system.in after data migration - InputStream stdin = System.in; - - // set fake system in to simulate user interactions - // FIXME: it is a quite dirty simulator of system input but it does the job - System.setIn(new InputStream() - { - - int counter = 0; - - public synchronized int read(byte b[], int off, int len) - { - byte[] src = (inputs[counter] + "\n").getBytes(); - System.arraycopy(src, 0, b, off, src.length); - counter++; - return src.length; - } - - @Override - public int read() throws IOException - { - return -1; - } - }); - - try - { - // Upgrade the test store. - new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4); - } - finally - { - // restore system in - System.setIn(stdin); - } - } - - @SuppressWarnings("unchecked") - private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName, - String messageText) throws Exception - { - final AMQShortString queueName = new AMQShortString(nonExistingQueueName); - BDBMessageStore store = new BDBMessageStore(storeVersion); - store.configure(storeLocation, false); - try - { - store.start(); - - // store message objects - ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8")); - long bodySize = completeContentBody.limit(); - MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false, - false, queueName); - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); - props.setContentType("text/plain"); - props.setType("text/plain"); - props.setMessageId("whatever"); - props.setEncoding("UTF-8"); - props.getHeaders().setString("Test", "MST"); - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); - int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); - ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize); - - // add content entry to database - long messageId = store.getNewMessageId(); - TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance(); - MessageContentKey contentKey = null; - if (storeVersion == VERSION_4) - { - contentKey = new MessageContentKey_4(messageId, 0); - } - else - { - throw new Exception(storeVersion + " is not supported"); - } - DatabaseEntry key = new DatabaseEntry(); - contentKeyTB.objectToEntry(contentKey, key); - DatabaseEntry data = new DatabaseEntry(); - ContentTB contentTB = new ContentTB(); - contentTB.objectToEntry(completeContentBody, data); - store.getContentDb().put(null, key, data); - - // add meta data entry to database - TupleBinding<Long> longTB = TupleBinding.getPrimitiveBinding(Long.class); - TupleBinding<Object> metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance(); - key = new DatabaseEntry(); - data = new DatabaseEntry(); - longTB.objectToEntry(new Long(messageId), key); - MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1); - metaDataTB.objectToEntry(metaData, data); - store.getMetaDataDb().put(null, key, data); - - // add delivery entry to database - TransactionLogResource mockQueue = new TransactionLogResource() - { - public String getResourceName() - { - return queueName.asString(); - } - }; - TransactionLog log = (TransactionLog) store; - TransactionLog.Transaction txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, messageId); - txn.commitTran(); - } - finally - { - // close store - store.close(); - } - } - - private void consumeDurableSubscriptionMessages(Connection connection) throws Exception - { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(TOPIC_NAME); - - TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false); - - // Retrieve the matching message - Message m = durSub.receive(2000); - assertNotNull("Failed to receive an expected message", m); - assertEquals("Selector property did not match", "true", m.getStringProperty("testprop")); - assertEquals("ID property did not match", 1, m.getIntProperty("ID")); - assertEquals("Message content was not as expected",BDBStoreUpgradeTestPreparer.generateString(1024) , ((TextMessage)m).getText()); - - // Verify that neither the non-matching or uncommitted message are received - m = durSub.receive(1000); - assertNull("No more messages should have been recieved", m); - - durSub.close(); - session.close(); - } - - private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception - { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(QUEUE_NAME); - - MessageConsumer consumer = session.createConsumer(queue); - Message m; - - // Retrieve the initial pre-upgrade messages - for (int i=1; i <= 5 ; i++) - { - m = consumer.receive(2000); - assertNotNull("Failed to receive an expected message", m); - assertEquals("ID property did not match", i, m.getIntProperty("ID")); - assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); - } - for (int i=1; i <= 5 ; i++) - { - m = consumer.receive(2000); - assertNotNull("Failed to receive an expected message", m); - assertEquals("ID property did not match", i, m.getIntProperty("ID")); - assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText()); - } - - if(extraMessage) - { - //verify that the extra message is received - m = consumer.receive(2000); - assertNotNull("Failed to receive an expected message", m); - assertEquals("ID property did not match", 1, m.getIntProperty("ID")); - assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); - } - - // Verify that no more messages are received - m = consumer.receive(1000); - assertNull("No more messages should have been recieved", m); - - consumer.close(); - session.close(); - } - - private void upgradeBrokerStore(String fromDir, String toDir) throws Exception - { - new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_4); - } -} diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb Binary files differdeleted file mode 100644 index c4e4e6c306..0000000000 --- a/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb +++ /dev/null |
