summaryrefslogtreecommitdiff
path: root/java/bdbstore
diff options
context:
space:
mode:
Diffstat (limited to 'java/bdbstore')
-rwxr-xr-xjava/bdbstore/bin/backup.sh40
-rwxr-xr-xjava/bdbstore/bin/storeUpgrade.sh43
-rw-r--r--java/bdbstore/build.xml84
-rwxr-xr-xjava/bdbstore/etc/scripts/bdbbackuptest.sh44
-rwxr-xr-xjava/bdbstore/etc/scripts/bdbtest.sh43
-rw-r--r--java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml52
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java59
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java48
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java344
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java2124
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java1125
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java62
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java52
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java44
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java58
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java74
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java42
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java49
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java44
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java44
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java53
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java66
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java120
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java25
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java45
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java76
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java47
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java46
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java45
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java162
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java77
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java43
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java46
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java25
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java46
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java72
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java75
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java35
-rw-r--r--java/bdbstore/src/resources/backup-log4j.xml65
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java88
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java470
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java232
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java540
-rw-r--r--java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdbbin1330321 -> 0 bytes
44 files changed, 0 insertions, 6974 deletions
diff --git a/java/bdbstore/bin/backup.sh b/java/bdbstore/bin/backup.sh
deleted file mode 100755
index 0e2f0fda09..0000000000
--- a/java/bdbstore/bin/backup.sh
+++ /dev/null
@@ -1,40 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Parse arguments taking all - prefixed args as JAVA_OPTS
-for arg in "$@"; do
- if [[ $arg == -java:* ]]; then
- JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
- else
- ARGS="${ARGS}$arg "
- fi
-done
-
-WHEREAMI=`dirname $0`
-if [ -z "$QPID_HOME" ]; then
- export QPID_HOME=`cd $WHEREAMI/../ && pwd`
-fi
-VERSION=0.13
-
-LIBS=$QPID_HOME/lib/je-4.0.103.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar
-
-
-echo "Starting Hot Backup Script"
-java -Dlog4j.configuration=backup-log4j.xml ${JAVA_OPTS} -cp $LIBS org.apache.qpid.server.store.berkeleydb.BDBBackup ${ARGS}
diff --git a/java/bdbstore/bin/storeUpgrade.sh b/java/bdbstore/bin/storeUpgrade.sh
deleted file mode 100755
index 076b9d3f7e..0000000000
--- a/java/bdbstore/bin/storeUpgrade.sh
+++ /dev/null
@@ -1,43 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Parse arguements taking all - prefixed args as JAVA_OPTS
-for arg in "$@"; do
- if [[ $arg == -java:* ]]; then
- JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
- else
- ARGS="${ARGS}$arg "
- fi
-done
-
-if [ -z "$QPID_HOME" ]; then
- export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
- export PATH=${PATH}:${QPID_HOME}/bin
-fi
-
-if [ -z "$BDB_HOME" ]; then
- export BDB_HOME=$(dirname $(dirname $(readlink -f $0)))
-fi
-
-VERSION=0.13
-
-LIBS=$BDB_HOME/lib/je-4.0.103.jar:$BDB_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar
-
-java -Xms256m -Dlog4j.configuration=BDBStoreUpgrade.log4j.xml -Xmx256m -Damqj.logging.level=warn ${JAVA_OPTS} -cp $LIBS org.apache.qpid.server.store.berkeleydb.BDBStoreUpgrade ${ARGS}
diff --git a/java/bdbstore/build.xml b/java/bdbstore/build.xml
deleted file mode 100644
index 9355358e6c..0000000000
--- a/java/bdbstore/build.xml
+++ /dev/null
@@ -1,84 +0,0 @@
-<!--
- - Licensed to the Apache Software Foundation (ASF) under one
- - or more contributor license agreements. See the NOTICE file
- - distributed with this work for additional information
- - regarding copyright ownership. The ASF licenses this file
- - to you under the Apache License, Version 2.0 (the
- - "License"); you may not use this file except in compliance
- - with the License. You may obtain a copy of the License at
- -
- - http://www.apache.org/licenses/LICENSE-2.0
- -
- - Unless required by applicable law or agreed to in writing,
- - software distributed under the License is distributed on an
- - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- - KIND, either express or implied. See the License for the
- - specific language governing permissions and limitations
- - under the License.
- -->
-<project name="bdbstore" default="build">
- <property name="module.depends" value="common client management/common broker perftests systests" />
- <property name="module.test.depends" value="test common/test broker/test management/common perftests systests" />
-
- <import file="../module.xml" />
-
- <property name="bdb.lib.dir" value="${project.root}/lib/bdbstore" />
- <property name="bdb.version" value="4.0.103" />
- <property name="bdb.download.url" value="http://download.oracle.com/maven/com/sleepycat/je/${bdb.version}/je-${bdb.version}.jar" />
- <property name="bdb.jar.file" value="${bdb.lib.dir}/je-${bdb.version}.jar" />
-
- <!--check whether the BDB jar is present, possibly after download-->
- <target name="check-bdb-jar">
- <available file="${bdb.jar.file}" type="file" property="bdb.jar.available"/>
- </target>
-
- <!--echo that BDB is required if it isnt present, with associated licencing note-->
- <target name="bdb-jar-required" depends="bdb-licence-note-optional" unless="bdb.jar.available">
- <echo>The BDB JE library is required to use this optional module.
-
-The jar file may be downloaded by either:
-
- Seperately running the following command from the qpid/java/bdbstore dir: ant download-bdb
-
- OR
-
- Adding -Ddownload-bdb=true to your regular build command.</echo>
- <fail>The BDB JE library was not found</fail>
- </target>
-
- <!--issue BDB licencing note if BDB isnt already present-->
- <target name="bdb-licence-note-optional" depends="check-bdb-jar" unless="bdb.jar.available">
- <antcall target="bdb-licence-note"/>
- </target>
-
- <!--issue BDB licencing note-->
- <target name="bdb-licence-note">
- <echo>*NOTE* The BDB JE library required by this optional module is licensed under the Sleepycat Licence, which is not compatible with the Apache Licence v2.0.
-
-For a copy of the Sleepycat Licence, please see:
-http://www.oracle.com/technetwork/database/berkeleydb/downloads/jeoslicense-086837.html</echo>
- </target>
-
- <!--check if an inline BDB download was requested with the build-->
- <target name="check-request-props" if="download-bdb">
- <antcall target="download-bdb"/>
- </target>
-
- <!--download BDB, with licencing note-->
- <target name="download-bdb" depends="bdb-licence-note">
- <mkdir dir="${bdb.lib.dir}"/>
- <echo>Downloading BDB JE</echo>
- <get src="${bdb.download.url}" dest="${bdb.jar.file}" usetimestamp="true" />
- </target>
-
- <target name="build" depends="check-request-props, bdb-jar-required, module.build" />
-
- <target name="postbuild" depends="copy-store-to-upgrade" />
-
- <target name="copy-store-to-upgrade" description="copy the upgrade tool resource folder contents into the build tree">
- <copy todir="${qpid.home}" failonerror="true">
- <fileset dir="src/test/resources/upgrade"/>
- </copy>
- </target>
-
-</project>
diff --git a/java/bdbstore/etc/scripts/bdbbackuptest.sh b/java/bdbstore/etc/scripts/bdbbackuptest.sh
deleted file mode 100755
index 2a0b72e5ad..0000000000
--- a/java/bdbstore/etc/scripts/bdbbackuptest.sh
+++ /dev/null
@@ -1,44 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-if [ -z "$QPID_HOME" ]; then
- export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
- export PATH=${PATH}:${QPID_HOME}/bin
-fi
-
-# Parse arguements taking all - prefixed args as JAVA_OPTS
-for arg in "$@"; do
- if [[ $arg == -java:* ]]; then
- JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
- else
- ARGS="${ARGS}$arg "
- fi
-done
-
-VERSION=0.5
-
-# Set classpath to include Qpid jar with all required jars in manifest
-QPID_LIBS=$QPID_HOME/lib/qpid-all.jar:$QPID_HOME/lib/qpid-junit-toolkit-$VERSION.jar:$QPID_HOME/lib/junit-3.8.1.jar:$QPID_HOME/lib/log4j-1.2.12.jar:$QPID_HOME/lib/qpid-systests-$VERSION.jar:$QPID_HOME/lib/qpid-perftests-$VERSION.jar:$QPID_HOME/lib/slf4j-log4j12-1.4.0.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar
-
-# Set other variables used by the qpid-run script before calling
-export JAVA=java JAVA_MEM=-Xmx256m QPID_CLASSPATH=$QPID_LIBS
-
-. qpid-run -Dlog4j.configuration=perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} org.apache.qpid.server.store.berkeleydb.testclient.BackupTestClient -o $QPID_WORK/results numMessagesToAction=55 ${ARGS}
-
diff --git a/java/bdbstore/etc/scripts/bdbtest.sh b/java/bdbstore/etc/scripts/bdbtest.sh
deleted file mode 100755
index eafdae9710..0000000000
--- a/java/bdbstore/etc/scripts/bdbtest.sh
+++ /dev/null
@@ -1,43 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-if [ -z "$QPID_HOME" ]; then
- export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
- export PATH=${PATH}:${QPID_HOME}/bin
-fi
-
-# Parse arguements taking all - prefixed args as JAVA_OPTS
-for arg in "$@"; do
- if [[ $arg == -java:* ]]; then
- JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
- else
- ARGS="${ARGS}$arg "
- fi
-done
-
-VERSION=0.5
-
-# Set classpath to include Qpid jar with all required jars in manifest
-QPID_LIBS=$QPID_HOME/lib/qpid-all.jar:$QPID_HOME/lib/qpid-junit-toolkit-$VERSION.jar:$QPID_HOME/lib/junit-3.8.1.jar:$QPID_HOME/lib/log4j-1.2.12.jar:$QPID_HOME/lib/qpid-systests-$VERSION.jar:$QPID_HOME/lib/qpid-perftests-$VERSION.jar:$QPID_HOME/lib/slf4j-log4j12-1.4.0.jar
-
-# Set other variables used by the qpid-run script before calling
-export JAVA=java JAVA_MEM=-Xmx256m QPID_CLASSPATH=$QPID_LIBS
-
-. qpid-run -Dlog4j.configuration=perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} org.apache.qpid.ping.PingDurableClient -o $QPID_WORK/results ${ARGS}
diff --git a/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml b/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml
deleted file mode 100644
index 4d71963ea7..0000000000
--- a/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<?xml version="1.0"?>
-<!--
- -
- - Licensed to the Apache Software Foundation (ASF) under one
- - or more contributor license agreements. See the NOTICE file
- - distributed with this work for additional information
- - regarding copyright ownership. The ASF licenses this file
- - to you under the Apache License, Version 2.0 (the
- - "License"); you may not use this file except in compliance
- - with the License. You may obtain a copy of the License at
- -
- - http://www.apache.org/licenses/LICENSE-2.0
- -
- - Unless required by applicable law or agreed to in writing,
- - software distributed under the License is distributed on an
- - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- - KIND, either express or implied. See the License for the
- - specific language governing permissions and limitations
- - under the License.
- -
- -->
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-
- <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
-
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p - %m%n"/>
- </layout>
- </appender>
-
- <category name="org.apache.qpid.server.store.berkeleydb.BDBStoreUpgrade">
- <priority value="info"/>
- </category>
-
- <!-- Only show errors from the BDB Store -->
- <category name="org.apache.qpid.server.store.berkeleydb.berkeleydb.BDBMessageStore">
- <priority value="error"/>
- </category>
-
- <!-- Provide warnings to standard output -->
- <category name="org.apache.qpid">
- <priority value="error"/>
- </category>
-
- <!-- Log all info events to file -->
- <root>
- <priority value="info"/>
- <appender-ref ref="STDOUT"/>
- </root>
-
-</log4j:configuration>
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
deleted file mode 100644
index 8b887b1876..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.framing.AMQShortString;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-public class AMQShortStringEncoding
-{
- public static AMQShortString readShortString(TupleInput tupleInput)
- {
- int length = (int) tupleInput.readShort();
- if (length < 0)
- {
- return null;
- }
- else
- {
- byte[] stringBytes = new byte[length];
- tupleInput.readFast(stringBytes);
- return new AMQShortString(stringBytes);
- }
-
- }
-
- public static void writeShortString(AMQShortString shortString, TupleOutput tupleOutput)
- {
-
- if (shortString == null)
- {
- tupleOutput.writeShort(-1);
- }
- else
- {
- tupleOutput.writeShort(shortString.length());
- tupleOutput.writeFast(shortString.getBytes(), 0, shortString.length());
- }
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
deleted file mode 100644
index 81ae315fe2..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQShortString;
-
-public class AMQShortStringTB extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(AMQShortStringTB.class);
-
-
- public AMQShortStringTB()
- {
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
- return AMQShortStringEncoding.readShortString(tupleInput);
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput);
- }
-
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java
deleted file mode 100644
index c515ca5d8e..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.util.DbBackup;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.util.CommandLineParser;
-import org.apache.qpid.util.FileUtils;
-
-import java.io.*;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * BDBBackup is a utility for taking hot backups of the current state of a BDB transaction log database.
- *
- * <p/>This utility makes the following assumptions/performs the following actions:
- *
- * <p/><ul> <li>The from and to directory locations will already exist. This scripts does not create them. <li>If this
- * script fails to complete in one minute it will terminate. <li>This script always exits with code 1 on error, code 0
- * on success (standard unix convention). <li>This script will log out at info level, when it starts and ends and a list
- * of all files backed up. <li>This script logs all errors at error level. <li>This script does not perform regular
- * backups, wrap its calling script in a cron job or similar to do this. </ul>
- *
- * <p/>This utility is build around the BDB provided backup helper utility class, DbBackup. This utility class provides
- * an ability to force BDB to stop writing to the current log file set, whilst the backup is taken, to ensure that a
- * consistent snapshot is acquired. Preventing BDB from writing to the current log file set, does not stop BDB from
- * continuing to run concurrently while the backup is running, it simply moves onto a new set of log files; this
- * provides a 'hot' backup facility.
- *
- * <p/>DbBackup can also help with incremental backups, by providing the number of the last log file backed up.
- * Subsequent backups can be taken, from later log files only. In a messaging application, messages are not expected to
- * be long-lived in most cases, so the log files will usually have been completely turned over between backups. This
- * utility does not support incremental backups for this reason.
- *
- * <p/>If the database is locked by BDB, as is required when using transactions, and therefore will always be the case
- * in Qpid, this utility cannot make use of the DbBackup utility in a seperate process. DbBackup, needs to ensure that
- * the BDB envinronment used to take the backup has exclusive write access to the log files. This utility can take a
- * backup as a standalone utility against log files, when a broker is not running, using the {@link #takeBackup(String,
- *String,com.sleepycat.je.Environment)} method.
- *
- * <p/>A seperate backup machanism is provided by the {@link #takeBackupNoLock(String,String)} method which can take a
- * hot backup against a running broker. This works by finding out the set of files to copy, and then opening them all to
- * read, and repeating this process until a consistent set of open files is obtained. This is done to avoid the
- * situation where the BDB cleanup thread deletes a file, between the directory listing and opening of the file to copy.
- * All consistently opened files are copied. This is the default mechanism the the {@link #main} method of this utility
- * uses.
- *
- * <p/><table id="crc><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Hot copy all
- * BDB log files from one directory to another. </table>
- */
-public class BDBBackup
-{
- /** Used for debugging. */
- private static final Logger log = Logger.getLogger(BDBBackup.class);
-
- /** Used for communicating with the user. */
- private static final Logger console = Logger.getLogger("Console");
-
- /** Defines the suffix used to identify BDB log files. */
- private static final String LOG_FILE_SUFFIX = ".jdb";
-
- /** Defines the command line format for this utility. */
- public static final String[][] COMMAND_LINE_SPEC =
- new String[][]
- {
- { "fromdir", "The path to the directory to back the bdb log file from.", "dir", "true" },
- { "todir", "The path to the directory to save the backed up bdb log files to.", "dir", "true" }
- };
-
- /** Defines the timeout to terminate the backup operation on if it fails to complete. One minte. */
- public static final long TIMEOUT = 60000;
-
- /**
- * Runs a backup of the BDB log files in a specified directory, copying the backed up files to another specified
- * directory.
- *
- * <p/>The following arguments must be specified:
- *
- * <p/><table><caption>Command Line</caption> <tr><th> Option <th> Comment <tr><td> -fromdir <td> The path to the
- * directory to back the bdb log file from. <tr><td> -todir <td> The path to the directory to save the backed up
- * bdb log files to. </table>
- *
- * @param args The command line arguments.
- */
- public static void main(String[] args)
- {
- // Process the command line using standard handling (errors and usage followed by System.exit when it is wrong).
- Properties options =
- CommandLineParser.processCommandLine(args, new CommandLineParser(COMMAND_LINE_SPEC), System.getProperties());
-
- // Extract the from and to directory locations and perform a backup between them.
- try
- {
- String fromDir = options.getProperty("fromdir");
- String toDir = options.getProperty("todir");
-
- log.info("BDBBackup Utility: Starting Hot Backup.");
-
- BDBBackup bdbBackup = new BDBBackup();
- String[] backedUpFiles = bdbBackup.takeBackupNoLock(fromDir, toDir);
-
- if (log.isInfoEnabled())
- {
- log.info("BDBBackup Utility: Hot Backup Completed. Files backed up: " + backedUpFiles);
- }
- }
- catch (Exception e)
- {
- console.info("Backup script encountered an error and has failed: " + e.getMessage());
- log.error("Backup script got exception: " + e.getMessage(), e);
- System.exit(1);
- }
- }
-
- /**
- * Creates a backup of the BDB log files in the source directory, copying them to the destination directory.
- *
- * @param fromdir The source directory path.
- * @param todir The destination directory path.
- * @param environment An open BDB environment to perform the back up.
- *
- * @throws DatabaseException Any underlying execeptions from BDB are allowed to fall through.
- */
- public void takeBackup(String fromdir, String todir, Environment environment) throws DatabaseException
- {
- DbBackup backupHelper = null;
-
- try
- {
- backupHelper = new DbBackup(environment);
-
- // Prevent BDB from writing to its log files while the backup it taken.
- backupHelper.startBackup();
-
- // Back up the BDB log files to the destination directory.
- String[] filesForBackup = backupHelper.getLogFilesInBackupSet();
-
- for (int i = 0; i < filesForBackup.length; i++)
- {
- File sourceFile = new File(fromdir + File.separator + filesForBackup[i]);
- File destFile = new File(todir + File.separator + filesForBackup[i]);
- FileUtils.copy(sourceFile, destFile);
- }
- }
- finally
- {
- // Remember to exit backup mode, or all log files won't be cleaned and disk usage will bloat.
- if (backupHelper != null)
- {
- backupHelper.endBackup();
- }
- }
- }
-
- /**
- * Takes a hot backup when another process has locked the BDB database.
- *
- * @param fromdir The source directory path.
- * @param todir The destination directory path.
- *
- * @return A list of all of the names of the files succesfully backed up.
- */
- public String[] takeBackupNoLock(String fromdir, String todir)
- {
- if (log.isDebugEnabled())
- {
- log.debug("public void takeBackupNoLock(String fromdir = " + fromdir + ", String todir = " + todir
- + "): called");
- }
-
- File fromDirFile = new File(fromdir);
-
- if (!fromDirFile.isDirectory())
- {
- throw new IllegalArgumentException("The specified fromdir(" + fromdir
- + ") must be the directory containing your bdbstore.");
- }
-
- File toDirFile = new File(todir);
-
- if (!toDirFile.exists())
- {
- // Create directory if it doesn't exist
- toDirFile.mkdirs();
-
- if (log.isDebugEnabled())
- {
- log.debug("Created backup directory:" + toDirFile);
- }
- }
-
- if (!toDirFile.isDirectory())
- {
- throw new IllegalArgumentException("The specified todir(" + todir + ") must be a directory.");
- }
-
- // Repeat until manage to open consistent set of files for reading.
- boolean consistentSet = false;
- FileInputStream[] fileInputStreams = new FileInputStream[0];
- File[] fileSet = new File[0];
- long start = System.currentTimeMillis();
-
- while (!consistentSet)
- {
- // List all .jdb files in the directory.
- fileSet = fromDirFile.listFiles(new FilenameFilter()
- {
- public boolean accept(File dir, String name)
- {
- return name.endsWith(LOG_FILE_SUFFIX);
- }
- });
-
- // Open them all for reading.
- fileInputStreams = new FileInputStream[fileSet.length];
-
- if (fileSet.length == 0)
- {
- throw new RuntimeException("There are no BDB log files to backup in the " + fromdir + " directory.");
- }
-
- for (int i = 0; i < fileSet.length; i++)
- {
- try
- {
- fileInputStreams[i] = new FileInputStream(fileSet[i]);
- }
- catch (FileNotFoundException e)
- {
- // Close any files opened for reading so far.
- for (int j = 0; j < i; j++)
- {
- if (fileInputStreams[j] != null)
- {
- try
- {
- fileInputStreams[j].close();
- }
- catch (IOException ioEx)
- {
- // Rethrow this as a runtime exception, as something strange has happened.
- throw new RuntimeException(ioEx);
- }
- }
- }
-
- // Could not open a consistent file set so try again.
- break;
- }
-
- // A consistent set has been opened if all files were sucesfully opened for reading.
- if (i == (fileSet.length - 1))
- {
- consistentSet = true;
- }
- }
-
- // Check that the script has not timed out, and raise an error if it has.
- long now = System.currentTimeMillis();
- if ((now - start) > TIMEOUT)
- {
- throw new RuntimeException("Hot backup script failed to complete in " + (TIMEOUT / 1000) + " seconds.");
- }
- }
-
- // Copy the consistent set of open files.
- List<String> backedUpFileNames = new LinkedList<String>();
-
- for (int j = 0; j < fileSet.length; j++)
- {
- File destFile = new File(todir + File.separator + fileSet[j].getName());
- try
- {
- FileUtils.copy(fileSet[j], destFile);
- }
- catch (RuntimeException re)
- {
- Throwable cause = re.getCause();
- if ((cause != null) && (cause instanceof IOException))
- {
- throw new RuntimeException(re.getMessage() + " fromDir:" + fromdir + " toDir:" + toDirFile, cause);
- }
- else
- {
- throw re;
- }
- }
-
- backedUpFileNames.add(destFile.getName());
-
- // Close all of the files.
- try
- {
- fileInputStreams[j].close();
- }
- catch (IOException e)
- {
- // Rethrow this as a runtime exception, as something strange has happened.
- throw new RuntimeException(e);
- }
- }
-
- return backedUpFileNames.toArray(new String[backedUpFileNames.size()]);
- }
-
- /*
- * Creates an environment for the bdb log files in the specified directory. This envrinonment can only be used
- * to backup these files, if they are not locked by another database instance.
- *
- * @param fromdir The path to the directory to create the environment for.
- *
- * @throws DatabaseException Any underlying exceptions from BDB are allowed to fall through.
- */
- private Environment createSourceDirEnvironment(String fromdir) throws DatabaseException
- {
- // Initialize the BDB backup utility on the source directory.
- return new Environment(new File(fromdir), new EnvironmentConfig());
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
deleted file mode 100644
index f900159808..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ /dev/null
@@ -1,2124 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.File;
-import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoredMemoryMessage;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
-import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
-import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
-
-import com.sleepycat.bind.EntryBinding;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.TransactionConfig;
-
-/**
- * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
- *
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept
- * transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove
- * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
- * dequeue messages to queues. <tr><td> Generate message identifiers. </table>
- */
-@SuppressWarnings({"unchecked"})
-public class BDBMessageStore implements MessageStore
-{
- private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
-
- static final int DATABASE_FORMAT_VERSION = 5;
- private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
- public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
-
- private Environment _environment;
-
- private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
- private String MESSAGECONTENTDB_NAME = "messageContentDb";
- private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
- private String DELIVERYDB_NAME = "deliveryDb";
- private String EXCHANGEDB_NAME = "exchangeDb";
- private String QUEUEDB_NAME = "queueDb";
- private Database _messageMetaDataDb;
- private Database _messageContentDb;
- private Database _queueBindingsDb;
- private Database _deliveryDb;
- private Database _exchangeDb;
- private Database _queueDb;
-
- /* =======
- * Schema:
- * =======
- *
- * Queue:
- * name(AMQShortString) - name(AMQShortString), owner(AMQShortString),
- * arguments(FieldTable encoded as binary), exclusive (boolean)
- *
- * Exchange:
- * name(AMQShortString) - name(AMQShortString), typeName(AMQShortString), autodelete (boolean)
- *
- * Binding:
- * exchangeName(AMQShortString), queueName(AMQShortString), routingKey(AMQShortString),
- * arguments (FieldTable encoded as binary) - 0 (zero)
- *
- * QueueEntry:
- * queueName(AMQShortString), messageId (long) - 0 (zero)
- *
- * Message (MetaData):
- * messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary)
- *
- * Message (Content):
- * messageId (long), byteOffset (integer) - dataLength(integer), data(binary);
- */
-
- private LogSubject _logSubject;
-
- private final AtomicLong _messageId = new AtomicLong(0);
-
- private final CommitThread _commitThread = new CommitThread("Commit-Thread");
-
- // Factory Classes to create the TupleBinding objects that reflect the version instance of this BDBStore
- private MessageMetaDataTupleBindingFactory _metaDataTupleBindingFactory;
- private QueueTupleBindingFactory _queueTupleBindingFactory;
- private BindingTupleBindingFactory _bindingTupleBindingFactory;
-
- /** The data version this store should run with */
- private int _version;
- private enum State
- {
- INITIAL,
- CONFIGURING,
- CONFIGURED,
- RECOVERING,
- STARTED,
- CLOSING,
- CLOSED
- }
-
- private State _state = State.INITIAL;
-
- private TransactionConfig _transactionConfig = new TransactionConfig();
-
- private boolean _readOnly = false;
-
- private boolean _configured;
-
-
- public BDBMessageStore()
- {
- this(DATABASE_FORMAT_VERSION);
- }
-
- public BDBMessageStore(int version)
- {
- _version = version;
- }
-
- private void setDatabaseNames(int version)
- {
- if (version > 1)
- {
- MESSAGEMETADATADB_NAME += "_v" + version;
-
- MESSAGECONTENTDB_NAME += "_v" + version;
-
- QUEUEDB_NAME += "_v" + version;
-
- DELIVERYDB_NAME += "_v" + version;
-
- EXCHANGEDB_NAME += "_v" + version;
-
- QUEUEBINDINGSDB_NAME += "_v" + version;
- }
- }
-
- public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception
- {
- _logSubject = logSubject;
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
-
- if(_configured)
- {
- throw new Exception("ConfigStore already configured");
- }
-
- configure(name,storeConfiguration);
-
- _configured = true;
- stateTransition(State.CONFIGURING, State.CONFIGURED);
-
- recover(recoveryHandler);
- stateTransition(State.RECOVERING, State.STARTED);
- }
-
- public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception
- {
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
-
- if(!_configured)
- {
- throw new Exception("ConfigStore not configured");
- }
-
- recoverMessages(recoveryHandler);
- }
-
- public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
- Configuration storeConfiguration, LogSubject logSubject) throws Exception
- {
- CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
-
- if(!_configured)
- {
- throw new Exception("ConfigStore not configured");
- }
-
- recoverQueueEntries(recoveryHandler);
-
-
- }
-
- public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction()
- {
- return new BDBTransaction();
- }
-
-
- /**
- * Called after instantiation in order to configure the message store.
- *
- * @param name The name of the virtual host using this store
- * @return whether a new store environment was created or not (to indicate whether recovery is necessary)
- *
- * @throws Exception If any error occurs that means the store is unable to configure itself.
- */
- public boolean configure(String name, Configuration storeConfig) throws Exception
- {
- File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
- System.getProperty("QPID_WORK") + "/bdbstore/" + name));
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
- {
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
- }
-
- CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
-
- _version = storeConfig.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
-
- return configure(environmentPath, false);
- }
-
- /**
- * @param environmentPath location for the store to be created in/recovered from
- * @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists
- * @return whether or not a new store environment was created
- * @throws AMQStoreException
- * @throws DatabaseException
- */
- protected boolean configure(File environmentPath, boolean readonly) throws AMQStoreException, DatabaseException
- {
- _readOnly = readonly;
- stateTransition(State.INITIAL, State.CONFIGURING);
-
- _log.info("Configuring BDB message store");
-
- createTupleBindingFactories(_version);
-
- setDatabaseNames(_version);
-
- return setupStore(environmentPath, readonly);
- }
-
- private void createTupleBindingFactories(int version)
- {
- _bindingTupleBindingFactory = new BindingTupleBindingFactory(version);
- _queueTupleBindingFactory = new QueueTupleBindingFactory(version);
- _metaDataTupleBindingFactory = new MessageMetaDataTupleBindingFactory(version);
- }
-
- /**
- * Move the store state from CONFIGURING to STARTED.
- *
- * This is required if you do not want to perform recovery of the store data
- *
- * @throws AMQStoreException if the store is not in the correct state
- */
- public void start() throws AMQStoreException
- {
- stateTransition(State.CONFIGURING, State.STARTED);
- }
-
- private boolean setupStore(File storePath, boolean readonly) throws DatabaseException, AMQStoreException
- {
- checkState(State.CONFIGURING);
-
- boolean newEnvironment = createEnvironment(storePath, readonly);
-
- verifyVersionByTables();
-
- openDatabases(readonly);
-
- if (!readonly)
- {
- _commitThread.start();
- }
-
- return newEnvironment;
- }
-
- private void verifyVersionByTables() throws DatabaseException
- {
- for (String s : _environment.getDatabaseNames())
- {
- int versionIndex = s.indexOf("_v");
-
- // lack of _v index suggests DB is v1
- // so if _version is not v1 then error
- if (versionIndex == -1)
- {
- if (_version != 1)
- {
- closeEnvironment();
- throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version
- + ". Store on disk contains version 1 data.");
- }
- else // DB is v1 and _version is v1
- {
- continue;
- }
- }
-
- // Otherwise Check Versions
- int version = Integer.parseInt(s.substring(versionIndex + 2));
-
- if (version != _version)
- {
- closeEnvironment();
- throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version
- + ". Store on disk contains version " + version + " data.");
- }
- }
- }
-
- private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException
- {
- if (_state != requiredState)
- {
- throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
- + "; currently in state: " + _state);
- }
-
- _state = newState;
- }
-
- private void checkState(State requiredState) throws AMQStoreException
- {
- if (_state != requiredState)
- {
- throw new AMQStoreException("Unexpected state: " + _state + "; required state: " + requiredState);
- }
- }
-
- private boolean createEnvironment(File environmentPath, boolean readonly) throws DatabaseException
- {
- _log.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
- EnvironmentConfig envConfig = new EnvironmentConfig();
- // This is what allows the creation of the store if it does not already exist.
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setConfigParam("je.lock.nLockTables", "7");
-
- // Restore 500,000 default timeout.
- //envConfig.setLockTimeout(15000);
-
- // Added to help diagnosis of Deadlock issue
- // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
- if (Boolean.getBoolean("qpid.bdb.lock.debug"))
- {
- envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
- envConfig.setConfigParam("je.txn.dumpLocks", "true");
- }
-
- // Set transaction mode
- _transactionConfig.setReadCommitted(true);
-
- //This prevents background threads running which will potentially update the store.
- envConfig.setReadOnly(readonly);
- try
- {
- _environment = new Environment(environmentPath, envConfig);
- return false;
- }
- catch (DatabaseException de)
- {
- if (de.getMessage().contains("Environment.setAllowCreate is false"))
- {
- //Allow the creation this time
- envConfig.setAllowCreate(true);
- if (_environment != null )
- {
- _environment.cleanLog();
- _environment.close();
- }
- _environment = new Environment(environmentPath, envConfig);
-
- return true;
- }
- else
- {
- throw de;
- }
- }
- }
-
- private void openDatabases(boolean readonly) throws DatabaseException
- {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
-
- //This is required if we are wanting read only access.
- dbConfig.setReadOnly(readonly);
-
- _messageMetaDataDb = _environment.openDatabase(null, MESSAGEMETADATADB_NAME, dbConfig);
- _queueDb = _environment.openDatabase(null, QUEUEDB_NAME, dbConfig);
- _exchangeDb = _environment.openDatabase(null, EXCHANGEDB_NAME, dbConfig);
- _queueBindingsDb = _environment.openDatabase(null, QUEUEBINDINGSDB_NAME, dbConfig);
- _messageContentDb = _environment.openDatabase(null, MESSAGECONTENTDB_NAME, dbConfig);
- _deliveryDb = _environment.openDatabase(null, DELIVERYDB_NAME, dbConfig);
-
- }
-
- /**
- * Called to close and cleanup any resources used by the message store.
- *
- * @throws Exception If the close fails.
- */
- public void close() throws Exception
- {
- if (_state != State.STARTED)
- {
- return;
- }
-
- _state = State.CLOSING;
-
- _commitThread.close();
- _commitThread.join();
-
- if (_messageMetaDataDb != null)
- {
- _log.info("Closing message metadata database");
- _messageMetaDataDb.close();
- }
-
- if (_messageContentDb != null)
- {
- _log.info("Closing message content database");
- _messageContentDb.close();
- }
-
- if (_exchangeDb != null)
- {
- _log.info("Closing exchange database");
- _exchangeDb.close();
- }
-
- if (_queueBindingsDb != null)
- {
- _log.info("Closing bindings database");
- _queueBindingsDb.close();
- }
-
- if (_queueDb != null)
- {
- _log.info("Closing queue database");
- _queueDb.close();
- }
-
- if (_deliveryDb != null)
- {
- _log.info("Close delivery database");
- _deliveryDb.close();
- }
-
- closeEnvironment();
-
- _state = State.CLOSED;
-
- CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
- }
-
- private void closeEnvironment() throws DatabaseException
- {
- if (_environment != null)
- {
- if(!_readOnly)
- {
- // Clean the log before closing. This makes sure it doesn't contain
- // redundant data. Closing without doing this means the cleaner may not
- // get a chance to finish.
- _environment.cleanLog();
- }
- _environment.close();
- }
- }
-
-
- public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQStoreException
- {
- stateTransition(State.CONFIGURED, State.RECOVERING);
-
- CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START());
-
- try
- {
- QueueRecoveryHandler qrh = recoveryHandler.begin(this);
- loadQueues(qrh);
-
- ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
- loadExchanges(erh);
-
- BindingRecoveryHandler brh = erh.completeExchangeRecovery();
- recoverBindings(brh);
-
- brh.completeBindingRecovery();
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
-
- }
-
- private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _queueDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = _queueTupleBindingFactory.getInstance();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value);
-
- String queueName = queueRecord.getNameShortString() == null ? null :
- queueRecord.getNameShortString().asString();
- String owner = queueRecord.getOwner() == null ? null :
- queueRecord.getOwner().asString();
- boolean exclusive = queueRecord.isExclusive();
-
- FieldTable arguments = queueRecord.getArguments();
-
- qrh.queue(queueName, owner, exclusive, arguments);
- }
-
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
- }
-
-
- private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException
- {
- Cursor cursor = null;
-
- try
- {
- cursor = _exchangeDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = new ExchangeTB();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- ExchangeRecord exchangeRec = (ExchangeRecord) binding.entryToObject(value);
-
- String exchangeName = exchangeRec.getNameShortString() == null ? null :
- exchangeRec.getNameShortString().asString();
- String type = exchangeRec.getType() == null ? null :
- exchangeRec.getType().asString();
- boolean autoDelete = exchangeRec.isAutoDelete();
-
- erh.exchange(exchangeName, type, autoDelete);
- }
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- }
-
- private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException
- {
- Cursor cursor = null;
- try
- {
- cursor = _queueBindingsDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = _bindingTupleBindingFactory.getInstance();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- //yes, this is retrieving all the useful information from the key only.
- //For table compatibility it shall currently be left as is
- BindingKey bindingRecord = (BindingKey) binding.entryToObject(key);
-
- String exchangeName = bindingRecord.getExchangeName() == null ? null :
- bindingRecord.getExchangeName().asString();
- String queueName = bindingRecord.getQueueName() == null ? null :
- bindingRecord.getQueueName().asString();
- String routingKey = bindingRecord.getRoutingKey() == null ? null :
- bindingRecord.getRoutingKey().asString();
- ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null :
- java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes()));
-
- brh.binding(exchangeName, queueName, routingKey, argumentsBB);
- }
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- }
-
- private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
- {
- StoredMessageRecoveryHandler mrh = msrh.begin();
-
- Cursor cursor = null;
- try
- {
- cursor = _messageMetaDataDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);;
-
- DatabaseEntry value = new DatabaseEntry();
- EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
-
- long maxId = 0;
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- long messageId = (Long) keyBinding.entryToObject(key);
- StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value);
-
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
- mrh.message(message);
-
- maxId = Math.max(maxId, messageId);
- }
-
- _messageId.set(maxId);
- }
- catch (DatabaseException e)
- {
- _log.error("Database Error: " + e.getMessage(), e);
- throw e;
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
- }
-
- private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
- throws DatabaseException
- {
- QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
-
- ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
-
- Cursor cursor = null;
- try
- {
- cursor = _deliveryDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryTB();
-
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- QueueEntryKey qek = (QueueEntryKey) keyBinding.entryToObject(key);
-
- entries.add(qek);
- }
-
- try
- {
- cursor.close();
- }
- finally
- {
- cursor = null;
- }
-
- for(QueueEntryKey entry : entries)
- {
- AMQShortString queueName = entry.getQueueName();
- long messageId = entry.getMessageId();
-
- qerh.queueEntry(queueName.asString(),messageId);
- }
- }
- catch (DatabaseException e)
- {
- _log.error("Database Error: " + e.getMessage(), e);
- throw e;
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- qerh.completeQueueEntryRecovery();
- }
-
- /**
- * Removes the specified message from the store.
- *
- * @param messageId Identifies the message to remove.
- *
- * @throws AMQInternalException If the operation fails for any reason.
- */
- public void removeMessage(Long messageId) throws AMQStoreException
- {
- // _log.debug("public void removeMessage(Long messageId = " + messageId): called");
-
- com.sleepycat.je.Transaction tx = null;
-
- Cursor cursor = null;
- try
- {
- tx = _environment.beginTransaction(null, null);
-
- //remove the message meta data from the store
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class);
- metaKeyBindingTuple.objectToEntry(messageId, key);
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Removing message id " + messageId);
- }
-
-
- OperationStatus status = _messageMetaDataDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- tx.abort();
-
- throw new AMQStoreException("Message metadata not found for message id " + messageId);
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted metadata for message " + messageId);
- }
-
- //now remove the content data from the store if there is any.
-
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
-
- TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
-
- //Use a partial record for the value to prevent retrieving the
- //data itself as we only need the key to identify what to remove.
- DatabaseEntry value = new DatabaseEntry();
- value.setPartial(0, 0, true);
-
- cursor = _messageContentDb.openCursor(tx, null);
-
- status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
- while (status == OperationStatus.SUCCESS)
- {
- mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
-
- if(mck.getMessageId() != messageId)
- {
- //we have exhausted all chunks for this message id, break
- break;
- }
- else
- {
- status = cursor.delete();
-
- if(status == OperationStatus.NOTFOUND)
- {
- cursor.close();
- cursor = null;
-
- tx.abort();
- throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
- }
- }
-
- status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
- }
-
- cursor.close();
- cursor = null;
-
- commit(tx, true);
- }
- catch (DatabaseException e)
- {
- e.printStackTrace();
-
- if (tx != null)
- {
- try
- {
- if(cursor != null)
- {
- cursor.close();
- cursor = null;
- }
-
- tx.abort();
- }
- catch (DatabaseException e1)
- {
- throw new AMQStoreException("Error aborting transaction " + e1, e1);
- }
- }
-
- throw new AMQStoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
- }
- finally
- {
- if(cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e);
- }
- }
- }
- }
-
- /**
- * @see DurableConfigurationStore#createExchange(Exchange)
- */
- public void createExchange(Exchange exchange) throws AMQStoreException
- {
- if (_state != State.RECOVERING)
- {
- ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(),
- exchange.getTypeShortString(), exchange.isAutoDelete());
-
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(exchange.getNameShortString(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding exchangeBinding = new ExchangeTB();
- exchangeBinding.objectToEntry(exchangeRec, value);
-
- try
- {
- _exchangeDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing Exchange with name " + exchange.getName() + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- /**
- * @see DurableConfigurationStore#removeExchange(Exchange)
- */
- public void removeExchange(Exchange exchange) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(exchange.getNameShortString(), key);
- try
- {
- OperationStatus status = _exchangeDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Exchange " + exchange.getName() + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing deleting with name " + exchange.getName() + " from database: " + e.getMessage(), e);
- }
- }
-
-
-
-
- /**
- * @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
- */
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
- {
- // _log.debug("public void bindQueue(Exchange exchange = " + exchange + ", AMQShortString routingKey = " + routingKey
- // + ", AMQQueue queue = " + queue + ", FieldTable args = " + args + "): called");
-
- if (_state != State.RECOVERING)
- {
- BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(),
- queue.getNameShortString(), routingKey, args);
-
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
-
- keyBinding.objectToEntry(bindingRecord, key);
-
- //yes, this is writing out 0 as a value and putting all the
- //useful info into the key, don't ask me why. For table
- //compatibility it shall currently be left as is
- DatabaseEntry value = new DatabaseEntry();
- ByteBinding.byteToEntry((byte) 0, value);
-
- try
- {
- _queueBindingsDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- /**
- * @see DurableConfigurationStore#unbindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
- */
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
- throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
- keyBinding.objectToEntry(new BindingKey(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key);
-
- try
- {
- OperationStatus status = _queueBindingsDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Queue binding for queue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " from database: " + e.getMessage(), e);
- }
- }
-
- /**
- * @see DurableConfigurationStore#createQueue(AMQQueue)
- */
- public void createQueue(AMQQueue queue) throws AMQStoreException
- {
- createQueue(queue, null);
- }
-
- /**
- * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable)
- */
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
- }
-
- QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
- queue.getOwner(), queue.isExclusive(), arguments);
-
- createQueue(queueRecord);
- }
-
- /**
- * Makes the specified queue persistent.
- *
- * Only intended for direct use during store upgrades.
- *
- * @param queueRecord Details of the queue to store.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- protected void createQueue(QueueRecord queueRecord) throws AMQStoreException
- {
- if (_state != State.RECOVERING)
- {
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(queueRecord.getNameShortString(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
-
- queueBinding.objectToEntry(queueRecord, value);
- try
- {
- _queueDb.put(null, key, value);
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing AMQQueue with name " + queueRecord.getNameShortString().asString()
- + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- /**
- * Updates the specified queue in the persistent store, IF it is already present. If the queue
- * is not present in the store, it will not be added.
- *
- * NOTE: Currently only updates the exclusivity.
- *
- * @param queue The queue to update the entry for.
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void updateQueue(final AMQQueue queue) throws AMQStoreException
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Updating queue: " + queue.getName());
- }
-
- try
- {
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(queue.getNameShortString(), key);
-
- DatabaseEntry value = new DatabaseEntry();
- DatabaseEntry newValue = new DatabaseEntry();
- TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
-
- OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT);
- if(status == OperationStatus.SUCCESS)
- {
- //read the existing record and apply the new exclusivity setting
- QueueRecord queueRecord = (QueueRecord) queueBinding.entryToObject(value);
- queueRecord.setExclusive(queue.isExclusive());
-
- //write the updated entry to the store
- queueBinding.objectToEntry(queueRecord, newValue);
-
- _queueDb.put(null, key, newValue);
- }
- else if(status != OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Error updating queue details within the store: " + status);
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error updating queue details within the store: " + e,e);
- }
- }
-
- /**
- * Removes the specified queue from the persistent store.
- *
- * @param queue The queue to remove.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void removeQueue(final AMQQueue queue) throws AMQStoreException
- {
- AMQShortString name = queue.getNameShortString();
-
- if (_log.isDebugEnabled())
- {
- _log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
- }
-
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(name, key);
- try
- {
- OperationStatus status = _queueDb.delete(null, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Queue " + name + " not found");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing deleting with name " + name + " from database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Places a message onto a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The the queue to place the message on.
- * @param messageId The message to enqueue.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
- {
- // _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called");
-
- AMQShortString name = new AMQShortString(queue.getResourceName());
-
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryTB();
- QueueEntryKey dd = new QueueEntryKey(name, messageId);
- keyBinding.objectToEntry(dd, key);
- DatabaseEntry value = new DatabaseEntry();
- ByteBinding.byteToEntry((byte) 0, value);
-
- try
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Enqueuing message " + messageId + " on queue " + name + " [Transaction" + tx + "]");
- }
- _deliveryDb.put(tx, key, value);
- }
- catch (DatabaseException e)
- {
- _log.error("Failed to enqueue: " + e.getMessage(), e);
- throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name
- + " to database", e);
- }
- }
-
- /**
- * Extracts a message from a specified queue, in a given transaction.
- *
- * @param tx The transaction for the operation.
- * @param queue The name queue to take the message from.
- * @param messageId The message to dequeue.
- *
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
- {
- AMQShortString name = new AMQShortString(queue.getResourceName());
-
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryTB();
- QueueEntryKey dd = new QueueEntryKey(name, messageId);
-
- keyBinding.objectToEntry(dd, key);
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Dequeue message id " + messageId);
- }
-
- try
- {
-
- OperationStatus status = _deliveryDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
- }
- else if (status != OperationStatus.SUCCESS)
- {
- throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name);
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Removed message " + messageId + ", " + name + " from delivery db");
-
- }
- }
- catch (DatabaseException e)
- {
-
- _log.error("Failed to dequeue message " + messageId + ": " + e.getMessage(), e);
- _log.error(tx);
-
- throw new AMQStoreException("Error accessing database while dequeuing message: " + e.getMessage(), e);
- }
- }
-
- /**
- * Commits all operations performed within a given transaction.
- *
- * @param tx The transaction to commit all operations for.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException
- {
- //if (_log.isDebugEnabled())
- //{
- // _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")");
- //}
-
- if (tx == null)
- {
- throw new AMQStoreException("Fatal internal error: transactional is null at commitTran");
- }
-
- StoreFuture result;
- try
- {
- result = commit(tx, syncCommit);
-
- if (_log.isDebugEnabled())
- {
- _log.debug("commitTranImpl completed for [Transaction:" + tx + "]");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
- }
-
- return result;
- }
-
- /**
- * Abandons all operations performed within a given transaction.
- *
- * @param tx The transaction to abandon.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- public void abortTran(final com.sleepycat.je.Transaction tx) throws AMQStoreException
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("abortTran called for [Transaction:" + tx + "]");
- }
-
- try
- {
- tx.abort();
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
- }
- }
-
- /**
- * Primarily for testing purposes.
- *
- * @param queueName
- *
- * @return a list of message ids for messages enqueued for a particular queue
- */
- List<Long> getEnqueuedMessages(AMQShortString queueName) throws AMQStoreException
- {
- Cursor cursor = null;
- try
- {
- cursor = _deliveryDb.openCursor(null, null);
-
- DatabaseEntry key = new DatabaseEntry();
-
- QueueEntryKey dd = new QueueEntryKey(queueName, 0);
-
- EntryBinding keyBinding = new QueueEntryTB();
- keyBinding.objectToEntry(dd, key);
-
- DatabaseEntry value = new DatabaseEntry();
-
- LinkedList<Long> messageIds = new LinkedList<Long>();
-
- OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
- dd = (QueueEntryKey) keyBinding.entryToObject(key);
-
- while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName))
- {
-
- messageIds.add(dd.getMessageId());
- status = cursor.getNext(key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
- {
- dd = (QueueEntryKey) keyBinding.entryToObject(key);
- }
- }
-
- return messageIds;
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Database error: " + e.getMessage(), e);
- }
- finally
- {
- if (cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e);
- }
- }
- }
- }
-
- /**
- * Return a valid, currently unused message id.
- *
- * @return A fresh message id.
- */
- public Long getNewMessageId()
- {
- return _messageId.incrementAndGet();
- }
-
- /**
- * Stores a chunk of message data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param offset The offset of the data chunk in the message.
- * @param contentBody The content of the data chunk.
- *
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset,
- ByteBuffer contentBody) throws AMQStoreException
- {
- DatabaseEntry key = new DatabaseEntry();
- TupleBinding<MessageContentKey> keyBinding = new MessageContentKeyTB_5();
- keyBinding.objectToEntry(new MessageContentKey_5(messageId, offset), key);
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding<ByteBuffer> messageBinding = new ContentTB();
- messageBinding.objectToEntry(contentBody, value);
- try
- {
- OperationStatus status = _messageContentDb.put(tx, key, value);
- if (status != OperationStatus.SUCCESS)
- {
- throw new AMQStoreException("Error adding content chunk offset" + offset + " for message id " + messageId + ": "
- + status);
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Storing content chunk offset" + offset + " for message " + messageId + "[Transaction" + tx + "]");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Stores message meta-data.
- *
- * @param tx The transaction for the operation.
- * @param messageId The message to store the data for.
- * @param messageMetaData The message meta data to store.
- *
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData)
- throws AMQStoreException
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = "
- + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
- }
-
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
- keyBinding.objectToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
-
- TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
- messageBinding.objectToEntry(messageMetaData, value);
- try
- {
- _messageMetaDataDb.put(tx, key, value);
- if (_log.isDebugEnabled())
- {
- _log.debug("Storing message metadata for message id " + messageId + "[Transaction" + tx + "]");
- }
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
-
- /**
- * Retrieves message meta-data.
- *
- * @param messageId The message to get the meta-data for.
- *
- * @return The message meta data.
- *
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQStoreException
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("public MessageMetaData getMessageMetaData(Long messageId = "
- + messageId + "): called");
- }
-
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
- keyBinding.objectToEntry(messageId, key);
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
-
- try
- {
- OperationStatus status = _messageMetaDataDb.get(null, key, value, LockMode.READ_UNCOMMITTED);
- if (status != OperationStatus.SUCCESS)
- {
- throw new AMQStoreException("Metadata not found for message with id " + messageId);
- }
-
- StorableMessageMetaData mdd = (StorableMessageMetaData) messageBinding.entryToObject(value);
-
- return mdd;
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
- }
- }
-
- /**
- * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
- * from the specified offset in the message.
- *
- * @param messageId The message to get the data for.
- * @param offset The offset of the data within the message.
- * @param dst The destination of the content read back
- *
- * @return The number of bytes inserted into the destination
- *
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQStoreException
- {
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
-
- //Start from 0 offset and search for the starting chunk.
- MessageContentKey_5 mck = new MessageContentKey_5(messageId, 0);
- TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding<ByteBuffer> contentTupleBinding = new ContentTB();
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
- }
-
- int written = 0;
- int seenSoFar = 0;
-
- Cursor cursor = null;
- try
- {
- cursor = _messageContentDb.openCursor(null, null);
-
- OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
-
- while (status == OperationStatus.SUCCESS)
- {
- mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
- long id = mck.getMessageId();
-
- if(id != messageId)
- {
- //we have exhausted all chunks for this message id, break
- break;
- }
-
- int offsetInMessage = mck.getOffset();
- ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value);
-
- final int size = (int) buf.limit();
-
- seenSoFar += size;
-
- if(seenSoFar >= offset)
- {
- byte[] dataAsBytes = buf.array();
-
- int posInArray = offset + written - offsetInMessage;
- int count = size - posInArray;
- if(count > dst.remaining())
- {
- count = dst.remaining();
- }
- dst.put(dataAsBytes,posInArray,count);
- written+=count;
-
- if(dst.remaining() == 0)
- {
- break;
- }
- }
-
- status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
- }
-
- return written;
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- finally
- {
- if(cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
- }
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
- {
- if(metaData.isPersistent())
- {
- return new StoredBDBMessage(getNewMessageId(), metaData);
- }
- else
- {
- return new StoredMemoryMessage(getNewMessageId(), metaData);
- }
- }
-
-
- //protected getters for the TupleBindingFactories
-
- protected QueueTupleBindingFactory getQueueTupleBindingFactory()
- {
- return _queueTupleBindingFactory;
- }
-
- protected BindingTupleBindingFactory getBindingTupleBindingFactory()
- {
- return _bindingTupleBindingFactory;
- }
-
- protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory()
- {
- return _metaDataTupleBindingFactory;
- }
-
- //Package getters for the various databases used by the Store
-
- Database getMetaDataDb()
- {
- return _messageMetaDataDb;
- }
-
- Database getContentDb()
- {
- return _messageContentDb;
- }
-
- Database getQueuesDb()
- {
- return _queueDb;
- }
-
- Database getDeliveryDb()
- {
- return _deliveryDb;
- }
-
- Database getExchangesDb()
- {
- return _exchangeDb;
- }
-
- Database getBindingsDb()
- {
- return _queueBindingsDb;
- }
-
- void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_messageMetaDataDb, visitor);
- }
-
- void visitContentDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_messageContentDb, visitor);
- }
-
- void visitQueues(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_queueDb, visitor);
- }
-
- void visitDelivery(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_deliveryDb, visitor);
- }
-
- void visitExchanges(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_exchangeDb, visitor);
- }
-
- void visitBindings(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_queueBindingsDb, visitor);
- }
-
- /**
- * Generic visitDatabase allows iteration through the specified database.
- *
- * @param database The database to visit
- * @param visitor The visitor to give each entry to.
- *
- * @throws DatabaseException If there is a problem with the Database structure
- * @throws AMQStoreException If there is a problem with the Database contents
- */
- void visitDatabase(Database database, DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- Cursor cursor = database.openCursor(null, null);
-
- try
- {
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- visitor.visit(key, value);
- }
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
- }
-
- private StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
- {
- // _log.debug("void commit(Transaction tx = " + tx + ", sync = " + syncCommit + "): called");
-
- tx.commitNoSync();
-
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
- commitFuture.commit();
-
- return commitFuture;
- }
-
- public void startCommitThread()
- {
- _commitThread.start();
- }
-
- private static final class BDBCommitFuture implements StoreFuture
- {
- // private static final Logger _log = Logger.getLogger(BDBCommitFuture.class);
-
- private final CommitThread _commitThread;
- private final com.sleepycat.je.Transaction _tx;
- private DatabaseException _databaseException;
- private boolean _complete;
- private boolean _syncCommit;
-
- public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit)
- {
- // _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx
- // + "): called");
-
- _commitThread = commitThread;
- _tx = tx;
- _syncCommit = syncCommit;
- }
-
- public synchronized void complete()
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
- }
-
- _complete = true;
-
- notifyAll();
- }
-
- public synchronized void abort(DatabaseException databaseException)
- {
- // _log.debug("public synchronized void abort(DatabaseException databaseException = " + databaseException
- // + "): called");
-
- _complete = true;
- _databaseException = databaseException;
-
- notifyAll();
- }
-
- public void commit() throws DatabaseException
- {
- //_log.debug("public void commit(): called");
-
- _commitThread.addJob(this);
-
- if(!_syncCommit)
- {
- _log.debug("CommitAsync was requested, returning immediately.");
- return;
- }
-
- synchronized (BDBCommitFuture.this)
- {
- while (!_complete)
- {
- try
- {
- wait(250);
- }
- catch (InterruptedException e)
- {
- // _log.error("Unexpected thread interruption: " + e, e);
- throw new RuntimeException(e);
- }
- }
-
- // _log.debug("Commit completed, _databaseException = " + _databaseException);
-
- if (_databaseException != null)
- {
- throw _databaseException;
- }
- }
- }
-
- public synchronized boolean isComplete()
- {
- return _complete;
- }
-
- public void waitForCompletion()
- {
- while (!isComplete())
- {
- try
- {
- wait(250);
- }
- catch (InterruptedException e)
- {
- //TODO Should we ignore, or throw a 'StoreException'?
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- /**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
- * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
- * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
- *
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collarations </table>
- */
- private class CommitThread extends Thread
- {
- // private final Logger _log = Logger.getLogger(CommitThread.class);
-
- private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final AtomicReference<Queue<BDBCommitFuture>> _jobQueue = new AtomicReference<Queue<BDBCommitFuture>>(new ConcurrentLinkedQueue<BDBCommitFuture>());
- private final CheckpointConfig _config = new CheckpointConfig();
- private final Object _lock = new Object();
-
- public CommitThread(String name)
- {
- super(name);
- _config.setForce(true);
-
- }
-
- public void run()
- {
- while (!_stopped.get())
- {
- synchronized (_lock)
- {
- while (!_stopped.get() && !hasJobs())
- {
- try
- {
- // RHM-7 Periodically wake up and check, just in case we
- // missed a notification. Don't want to lock the broker hard.
- _lock.wait(250);
- }
- catch (InterruptedException e)
- {
- // _log.info(getName() + " interrupted. ");
- }
- }
- }
- processJobs();
- }
- }
-
- private void processJobs()
- {
- // _log.debug("private void processJobs(): called");
-
- // we replace the old queue atomically with a new one and this avoids any need to
- // copy elements out of the queue
- Queue<BDBCommitFuture> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<BDBCommitFuture>());
-
- try
- {
- // _environment.checkpoint(_config);
- _environment.sync();
-
- for (BDBCommitFuture commit : jobs)
- {
- commit.complete();
- }
- }
- catch (DatabaseException e)
- {
- for (BDBCommitFuture commit : jobs)
- {
- commit.abort(e);
- }
- }
-
- }
-
- private boolean hasJobs()
- {
- return !_jobQueue.get().isEmpty();
- }
-
- public void addJob(BDBCommitFuture commit)
- {
- synchronized (_lock)
- {
- _jobQueue.get().add(commit);
- _lock.notifyAll();
- }
- }
-
- public void close()
- {
- synchronized (_lock)
- {
- _stopped.set(true);
- _lock.notifyAll();
- }
- }
- }
-
-
- private class StoredBDBMessage implements StoredMessage
- {
-
- private final long _messageId;
- private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
- private com.sleepycat.je.Transaction _txn;
-
- StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
- {
- this(messageId, metaData, true);
- }
-
-
- StoredBDBMessage(long messageId,
- StorableMessageMetaData metaData, boolean persist)
- {
- try
- {
- _messageId = messageId;
-
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- if(persist)
- {
- _txn = _environment.beginTransaction(null, null);
- storeMetaData(_txn, messageId, metaData);
- }
- }
- catch (DatabaseException e)
- {
- throw new RuntimeException(e);
- }
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public StorableMessageMetaData getMetaData()
- {
- StorableMessageMetaData metaData = _metaDataRef.get();
- if(metaData == null)
- {
- try
- {
- metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
- }
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- }
-
- return metaData;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
- {
- try
- {
- BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src);
- }
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
- {
- try
- {
- return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
- }
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public StoreFuture flushToStore()
- {
- try
- {
- if(_txn != null)
- {
- //if(_log.isDebugEnabled())
- //{
- // _log.debug("Flushing message " + _messageId + " to store");
- //}
- BDBMessageStore.this.commitTranImpl(_txn, true);
- }
- }
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- _txn = null;
- }
- return IMMEDIATE_FUTURE;
- }
-
- public void remove()
- {
- flushToStore();
- try
- {
- BDBMessageStore.this.removeMessage(_messageId);
- }
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- private class BDBTransaction implements Transaction
- {
- private com.sleepycat.je.Transaction _txn;
-
- private BDBTransaction()
- {
- try
- {
- _txn = _environment.beginTransaction(null, null);
- }
- catch (DatabaseException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
- {
- BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
- }
-
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
- {
- BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
-
- }
-
- public void commitTran() throws AMQStoreException
- {
- BDBMessageStore.this.commitTranImpl(_txn, true);
- }
-
- public StoreFuture commitTranAsync() throws AMQStoreException
- {
- return BDBMessageStore.this.commitTranImpl(_txn, false);
- }
-
- public void abortTran() throws AMQStoreException
- {
- BDBMessageStore.this.abortTran(_txn);
- }
- }
-
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
deleted file mode 100644
index 211c025dcd..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
+++ /dev/null
@@ -1,1125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
-import org.apache.qpid.server.store.berkeleydb.BindingKey;
-import org.apache.qpid.server.store.berkeleydb.ContentTB;
-import org.apache.qpid.server.store.berkeleydb.DatabaseVisitor;
-import org.apache.qpid.server.store.berkeleydb.ExchangeTB;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
-import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_4;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.BrokerActor;
-import org.apache.qpid.server.logging.NullRootMessageLogger;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.util.FileUtils;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-
-import java.io.File;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.IOException;
-import java.io.FileNotFoundException;
-import java.nio.ByteBuffer;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map.Entry;
-
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.bind.tuple.TupleBinding;
-
-/**
- * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V4 Store to a V5 Store.
- *
- * Currently upgrade is fixed from v4 -> v5
- *
- * Improvments:
- * - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against new DBs being added.
- * - A version in the store would allow automated upgrade or later with more available versions interactive upgrade.
- * - Add process logging and disable all Store and Qpid logging.
- */
-public class BDBStoreUpgrade
-{
- private static final Logger _logger = LoggerFactory.getLogger(BDBStoreUpgrade.class);
- /** The Store Directory that needs upgrading */
- File _fromDir;
- /** The Directory that will be made to contain the upgraded store */
- File _toDir;
- /** The Directory that will be made to backup the original store if required */
- File _backupDir;
-
- /** The Old Store */
- BDBMessageStore _oldMessageStore;
- /** The New Store */
- BDBMessageStore _newMessageStore;
- /** The file ending that is used by BDB Store Files */
- private static final String BDB_FILE_ENDING = ".jdb";
-
- static final Options _options = new Options();
- static CommandLine _commandLine;
- private boolean _interactive;
- private boolean _force;
-
- private static final String VERSION = "3.0";
- private static final String USER_ABORTED_PROCESS = "User aborted process";
- private static final int LOWEST_SUPPORTED_STORE_VERSION = 4;
- private static final String PREVIOUS_STORE_VERSION_UNSUPPORTED = "Store upgrade from version {0} is not supported."
- + " You must first run the previous store upgrade tool.";
- private static final String FOLLOWING_STORE_VERSION_UNSUPPORTED = "Store version {0} is newer than this tool supports. "
- + "You must use a newer version of the store upgrade tool";
- private static final String STORE_ALREADY_UPGRADED = "Store has already been upgraded to version {0}.";
-
- private static final String OPTION_INPUT_SHORT = "i";
- private static final String OPTION_INPUT = "input";
- private static final String OPTION_OUTPUT_SHORT = "o";
- private static final String OPTION_OUTPUT = "output";
- private static final String OPTION_BACKUP_SHORT = "b";
- private static final String OPTION_BACKUP = "backup";
- private static final String OPTION_QUIET_SHORT = "q";
- private static final String OPTION_QUIET = "quiet";
- private static final String OPTION_FORCE_SHORT = "f";
- private static final String OPTION_FORCE = "force";
- private boolean _inplace = false;
-
- public BDBStoreUpgrade(String fromDir, String toDir, String backupDir, boolean interactive, boolean force)
- {
- _interactive = interactive;
- _force = force;
-
- _fromDir = new File(fromDir);
- if (!_fromDir.exists())
- {
- throw new IllegalArgumentException("BDBStore path '" + fromDir + "' could not be read. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
-
- if (!isDirectoryAStoreDir(_fromDir))
- {
- throw new IllegalArgumentException("Specified directory '" + fromDir + "' does not contain a valid BDBMessageStore.");
- }
-
- if (toDir == null)
- {
- _inplace = true;
- _toDir = new File(fromDir+"-Inplace");
- }
- else
- {
- _toDir = new File(toDir);
- }
-
- if (_toDir.exists())
- {
- if (_interactive)
- {
- if (toDir == null)
- {
- System.out.println("Upgrading in place:" + fromDir);
- }
- else
- {
- System.out.println("Upgrade destination: '" + toDir + "'");
- }
-
- if (userInteract("Upgrade destination exists do you wish to replace it?"))
- {
- if (!FileUtils.delete(_toDir, true))
- {
- throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'");
- }
- }
- else
- {
- throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. ");
- }
- }
- else
- {
- if (_force)
- {
- if (!FileUtils.delete(_toDir, true))
- {
- throw new IllegalArgumentException("Unable to remove upgrade destination '" + _toDir + "'");
- }
- }
- else
- {
- throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' already exists. ");
- }
- }
- }
-
- if (!_toDir.mkdirs())
- {
- throw new IllegalArgumentException("Upgrade destination '" + _toDir + "' could not be created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
-
- if (backupDir != null)
- {
- if (backupDir.equals(""))
- {
- _backupDir = new File(_fromDir.getAbsolutePath().toString() + "-Backup");
- }
- else
- {
- _backupDir = new File(backupDir);
- }
- }
- else
- {
- _backupDir = null;
- }
- }
-
- private static String ANSWER_OPTIONS = " Yes/No/Abort? ";
- private static String ANSWER_NO = "no";
- private static String ANSWER_N = "n";
- private static String ANSWER_YES = "yes";
- private static String ANSWER_Y = "y";
- private static String ANSWER_ABORT = "abort";
- private static String ANSWER_A = "a";
-
- /**
- * Interact with the user via System.in and System.out. If the user wishes to Abort then a RuntimeException is thrown.
- * Otherwise the method will return based on their response true=yes false=no.
- *
- * @param message Message to print out
- *
- * @return boolean response from user if they wish to proceed
- */
- private boolean userInteract(String message)
- {
- System.out.print(message + ANSWER_OPTIONS);
- BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
-
- String input = "";
- try
- {
- input = br.readLine();
- }
- catch (IOException e)
- {
- input = "";
- }
-
- if (input.equalsIgnoreCase(ANSWER_Y) || input.equalsIgnoreCase(ANSWER_YES))
- {
- return true;
- }
- else
- {
- if (input.equalsIgnoreCase(ANSWER_N) || input.equalsIgnoreCase(ANSWER_NO))
- {
- return false;
- }
- else
- {
- if (input.equalsIgnoreCase(ANSWER_A) || input.equalsIgnoreCase(ANSWER_ABORT))
- {
- throw new RuntimeException(USER_ABORTED_PROCESS);
- }
- }
- }
-
- return userInteract(message);
- }
-
- /**
- * Upgrade a Store of a specified version to the latest version.
- *
- * @param version the version of the current store
- *
- * @throws Exception
- */
- public void upgradeFromVersion(int version) throws Exception
- {
- upgradeFromVersion(version, _fromDir, _toDir, _backupDir, _force,
- _inplace);
- }
-
- /**
- * Upgrade a Store of a specified version to the latest version.
- *
- * @param version the version of the current store
- * @param fromDir the directory with the old Store
- * @param toDir the directrory to hold the newly Upgraded Store
- * @param backupDir the directrory to backup to if required
- * @param force suppress all questions
- * @param inplace replace the from dir with the upgraded result in toDir
- *
- * @throws Exception due to Virtualhost/MessageStore.close() being
- * rather poor at exception handling
- * @throws DatabaseException if there is a problem with the store formats
- * @throws AMQException if there is an issue creating Qpid data structures
- */
- public void upgradeFromVersion(int version, File fromDir, File toDir,
- File backupDir, boolean force,
- boolean inplace) throws Exception
- {
- _logger.info("Located store to upgrade at '" + fromDir + "'");
-
- // Verify user has created a backup, giving option to perform backup
- if (_interactive)
- {
- if (!userInteract("Have you performed a DB backup of this store."))
- {
- File backup;
- if (backupDir == null)
- {
- backup = new File(fromDir.getAbsolutePath().toString() + "-Backup");
- }
- else
- {
- backup = backupDir;
- }
-
- if (userInteract("Do you wish to perform a DB backup now? " +
- "(Store will be backed up to '" + backup.getName() + "')"))
- {
- performDBBackup(fromDir, backup, force);
- }
- else
- {
- if (!userInteract("Are you sure wish to proceed with DB migration without backup? " +
- "(For more details of the consequences check the Qpid/BDB Message Store Wiki)."))
- {
- throw new IllegalArgumentException("Upgrade stopped at user request as no DB Backup performed.");
- }
- }
- }
- else
- {
- if (!inplace)
- {
- _logger.info("Upgrade will create a new store at '" + toDir + "'");
- }
-
- _logger.info("Using the contents in the Message Store '" + fromDir + "'");
-
- if (!userInteract("Do you wish to proceed?"))
- {
- throw new IllegalArgumentException("Upgrade stopped as did not wish to proceed");
- }
- }
- }
- else
- {
- if (backupDir != null)
- {
- performDBBackup(fromDir, backupDir, force);
- }
- }
-
- CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
-
- //Create a new messageStore
- _newMessageStore = new BDBMessageStore();
- _newMessageStore.configure(toDir, false);
- _newMessageStore.start();
-
- try
- {
- //Load the old MessageStore
- switch (version)
- {
- default:
- case 4:
- _oldMessageStore = new BDBMessageStore(4);
- _oldMessageStore.configure(fromDir, true);
- _oldMessageStore.start();
- upgradeFromVersion_4();
- break;
- case 3:
- case 2:
- case 1:
- throw new IllegalArgumentException(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED,
- new Object[] { Integer.toString(version) }));
- }
- }
- finally
- {
- _newMessageStore.close();
- if (_oldMessageStore != null)
- {
- _oldMessageStore.close();
- }
- // if we are running inplace then swap fromDir and toDir
- if (inplace)
- {
- // Remove original copy
- if (FileUtils.delete(fromDir, true))
- {
- // Rename upgraded store
- toDir.renameTo(fromDir);
- }
- else
- {
- throw new RuntimeException("Unable to upgrade inplace as " +
- "unable to delete source '"
- +fromDir+"', Store upgrade " +
- "successfully performed to :"
- +toDir);
- }
- }
- }
- }
-
- private void upgradeFromVersion_4() throws AMQException, DatabaseException
- {
- _logger.info("Starting store upgrade from version 4");
-
- //Migrate _exchangeDb;
- _logger.info("Exchanges");
-
- moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
-
- final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>();
- final TupleBinding exchangeTB = new ExchangeTB();
-
- DatabaseVisitor exchangeListVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value);
- AMQShortString type = exchangeRec.getType();
-
- if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type))
- {
- topicExchanges.add(exchangeRec.getNameShortString());
- }
- }
- };
- _oldMessageStore.visitExchanges(exchangeListVisitor);
-
-
- //Migrate _queueBindingsDb;
- _logger.info("Queue Bindings");
- moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding");
-
- //Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those
- //which have a colon in their name and are bound to the Topic exchanges above
- final List<AMQShortString> durableSubQueues = new ArrayList<AMQShortString>();
- final TupleBinding<BindingKey> bindingTB = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
-
- DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- BindingKey bindingRec = (BindingKey) bindingTB.entryToObject(key);
- AMQShortString queueName = bindingRec.getQueueName();
- AMQShortString exchangeName = bindingRec.getExchangeName();
-
- if (topicExchanges.contains(exchangeName) && queueName.asString().contains(":"))
- {
- durableSubQueues.add(queueName);
- }
- }
- };
- _oldMessageStore.visitBindings(durSubQueueListVisitor);
-
-
- //Migrate _queueDb;
- _logger.info("Queues");
-
- // hold the list of existing queue names
- final List<AMQShortString> existingQueues = new ArrayList<AMQShortString>();
-
- final TupleBinding<QueueRecord> queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
-
- DatabaseVisitor queueVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException
- {
- QueueRecord queueRec = (QueueRecord) queueTupleBinding.entryToObject(value);
- AMQShortString queueName = queueRec.getNameShortString();
-
- //if the queue name is in the gathered list then set its exclusivity true
- if (durableSubQueues.contains(queueName))
- {
- _logger.info("Marking as possible DurableSubscription backing queue: " + queueName);
- queueRec.setExclusive(true);
- }
-
- //The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as
- //the extra 'exclusive' property in v3 will be defaulted to false in the record creation.
- _newMessageStore.createQueue(queueRec);
-
- _count++;
- existingQueues.add(queueName);
- }
- };
- _oldMessageStore.visitQueues(queueVisitor);
-
- logCount(queueVisitor.getVisitedCount(), "Queue");
-
-
- // Look for persistent messages stored for non-durable queues
- _logger.info("Checking for messages previously sent to non-durable queues");
-
- // track all message delivery to existing queues
- final HashSet<Long> queueMessages = new HashSet<Long>();
-
- // hold all non existing queues and their messages IDs
- final HashMap<String, HashSet<Long>> phantomMessageQueues = new HashMap<String, HashSet<Long>>();
-
- // delivery DB visitor to check message delivery and identify non existing queues
- final QueueEntryTB queueEntryTB = new QueueEntryTB();
- DatabaseVisitor messageDeliveryCheckVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key);
- Long messageId = entryKey.getMessageId();
- AMQShortString queueName = entryKey.getQueueName();
- if (!existingQueues.contains(queueName))
- {
- String name = queueName.asString();
- HashSet<Long> messages = phantomMessageQueues.get(name);
- if (messages == null)
- {
- messages = new HashSet<Long>();
- phantomMessageQueues.put(name, messages);
- }
- messages.add(messageId);
- _count++;
- }
- else
- {
- queueMessages.add(messageId);
- }
- }
- };
- _oldMessageStore.visitDelivery(messageDeliveryCheckVisitor);
-
- if (phantomMessageQueues.isEmpty())
- {
- _logger.info("No such messages were found");
- }
- else
- {
- _logger.info("Found " + messageDeliveryCheckVisitor.getVisitedCount()+ " such messages in total");
-
- for (Entry<String, HashSet<Long>> phantomQueue : phantomMessageQueues.entrySet())
- {
- String queueName = phantomQueue.getKey();
- HashSet<Long> messages = phantomQueue.getValue();
-
- _logger.info(MessageFormat.format("There are {0} messages which were previously delivered to non-durable queue ''{1}''",messages.size(), queueName));
-
- boolean createQueue;
- if(!_interactive)
- {
- createQueue = true;
- _logger.info("Running in batch-mode, marking queue as durable to ensure retention of the messages.");
- }
- else
- {
- createQueue = userInteract("Do you want to make this queue durable?\n"
- + "NOTE: Answering No will result in these messages being discarded!");
- }
-
- if (createQueue)
- {
- for (Long messageId : messages)
- {
- queueMessages.add(messageId);
- }
- AMQShortString name = new AMQShortString(queueName);
- existingQueues.add(name);
- QueueRecord record = new QueueRecord(name, null, false, null);
- _newMessageStore.createQueue(record);
- }
- }
- }
-
-
- //Migrate _messageMetaDataDb;
- _logger.info("Message MetaData");
-
- final Database newMetaDataDB = _newMessageStore.getMetaDataDb();
- final TupleBinding<Object> oldMetaDataTupleBinding = _oldMessageStore.getMetaDataTupleBindingFactory().getInstance();
- final TupleBinding<Object> newMetaDataTupleBinding = _newMessageStore.getMetaDataTupleBindingFactory().getInstance();
-
- DatabaseVisitor metaDataVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- _count++;
- MessageMetaData metaData = (MessageMetaData) oldMetaDataTupleBinding.entryToObject(value);
-
- // get message id
- Long messageId = TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key);
-
- // ONLY copy data if message is delivered to existing queue
- if (!queueMessages.contains(messageId))
- {
- return;
- }
- DatabaseEntry newValue = new DatabaseEntry();
- newMetaDataTupleBinding.objectToEntry(metaData, newValue);
-
- newMetaDataDB.put(null, key, newValue);
- }
- };
- _oldMessageStore.visitMetaDataDb(metaDataVisitor);
-
- logCount(metaDataVisitor.getVisitedCount(), "Message MetaData");
-
-
- //Migrate _messageContentDb;
- _logger.info("Message Contents");
- final Database newContentDB = _newMessageStore.getContentDb();
-
- final TupleBinding<MessageContentKey> oldContentKeyTupleBinding = new MessageContentKeyTB_4();
- final TupleBinding<MessageContentKey> newContentKeyTupleBinding = new MessageContentKeyTB_5();
- final TupleBinding contentTB = new ContentTB();
-
- DatabaseVisitor contentVisitor = new DatabaseVisitor()
- {
- long _prevMsgId = -1; //Initialise to invalid value
- int _bytesSeenSoFar = 0;
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- _count++;
-
- //determine the msgId of the current entry
- MessageContentKey_4 contentKey = (MessageContentKey_4) oldContentKeyTupleBinding.entryToObject(key);
- long msgId = contentKey.getMessageId();
-
- // ONLY copy data if message is delivered to existing queue
- if (!queueMessages.contains(msgId))
- {
- return;
- }
- //if this is a new message, restart the byte offset count.
- if(_prevMsgId != msgId)
- {
- _bytesSeenSoFar = 0;
- }
-
- //determine the content size
- ByteBuffer content = (ByteBuffer) contentTB.entryToObject(value);
- int contentSize = content.limit();
-
- //create the new key: id + previously seen data count
- MessageContentKey_5 newKey = new MessageContentKey_5(msgId, _bytesSeenSoFar);
- DatabaseEntry newKeyEntry = new DatabaseEntry();
- newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry);
-
- DatabaseEntry newValueEntry = new DatabaseEntry();
- contentTB.objectToEntry(content, newValueEntry);
-
- newContentDB.put(null, newKeyEntry, newValueEntry);
-
- _prevMsgId = msgId;
- _bytesSeenSoFar += contentSize;
- }
- };
- _oldMessageStore.visitContentDb(contentVisitor);
-
- logCount(contentVisitor.getVisitedCount(), "Message Content");
-
-
- //Migrate _deliveryDb;
- _logger.info("Delivery Records");
- final Database deliveryDb =_newMessageStore.getDeliveryDb();
- DatabaseVisitor deliveryDbVisitor = new DatabaseVisitor()
- {
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- _count++;
-
- // get message id from entry key
- QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key);
- AMQShortString queueName = entryKey.getQueueName();
-
- // ONLY copy data if message queue exists
- if (existingQueues.contains(queueName))
- {
- deliveryDb.put(null, key, value);
- }
- }
- };
- _oldMessageStore.visitDelivery(deliveryDbVisitor);
- logCount(contentVisitor.getVisitedCount(), "Delivery Record");
- }
-
- /**
- * Log the specified count for item in a user friendly way.
- *
- * @param count of items to log
- * @param item description of what is being logged.
- */
- private void logCount(int count, String item)
- {
- _logger.info(" " + count + " " + item + " " + (count == 1 ? "entry" : "entries"));
- }
-
- /**
- * @param oldDatabase The old MessageStoreDB to perform the visit on
- * @param newDatabase The new MessageStoreDB to copy the data to.
- * @param contentName The string name of the content for display purposes.
- *
- * @throws AMQException Due to createQueue thorwing AMQException
- * @throws DatabaseException If there is a problem with the loading of the data
- */
- private void moveContents(Database oldDatabase, final Database newDatabase, String contentName) throws AMQException, DatabaseException
- {
-
- DatabaseVisitor moveVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- _count++;
- newDatabase.put(null, key, value);
- }
- };
-
- _oldMessageStore.visitDatabase(oldDatabase, moveVisitor);
-
- logCount(moveVisitor.getVisitedCount(), contentName);
- }
-
- private static void usage()
- {
- System.out.println("usage: BDBStoreUpgrade:\n [-h|--help] [-q|--quiet] [-f|--force] [-b|--backup <Path to backup-db>] " +
- "-i|--input <Path to input-db> [-o|--output <Path to upgraded-db>]");
- }
-
- private static void help()
- {
- System.out.println("usage: BDBStoreUpgrade:");
- System.out.println("Required:");
- for (Object obj : _options.getOptions())
- {
- Option option = (Option) obj;
- if (option.isRequired())
- {
- System.out.println("-" + option.getOpt() + "|--" + option.getLongOpt() + "\t\t-\t" + option.getDescription());
- }
- }
-
- System.out.println("\nOptions:");
- for (Object obj : _options.getOptions())
- {
- Option option = (Option) obj;
- if (!option.isRequired())
- {
- System.out.println("--" + option.getLongOpt() + "|-" + option.getOpt() + "\t\t-\t" + option.getDescription());
- }
- }
- }
-
- static boolean isDirectoryAStoreDir(File directory)
- {
- if (directory.isFile())
- {
- return false;
- }
-
- for (File file : directory.listFiles())
- {
- if (file.isFile())
- {
- if (file.getName().endsWith(BDB_FILE_ENDING))
- {
- return true;
- }
- }
- }
- return false;
- }
-
- static File[] discoverDBStores(File fromDir)
- {
- if (!fromDir.exists())
- {
- throw new IllegalArgumentException("'" + fromDir + "' does not exist unable to upgrade.");
- }
-
- // Ensure we are given a directory
- if (fromDir.isFile())
- {
- throw new IllegalArgumentException("'" + fromDir + "' is not a directory unable to upgrade.");
- }
-
- // Check to see if we have been given a single directory
- if (isDirectoryAStoreDir(fromDir))
- {
- return new File[]{fromDir};
- }
-
- // Check to see if we have been give a directory containing stores.
- List<File> stores = new LinkedList<File>();
-
- for (File directory : fromDir.listFiles())
- {
- if (directory.isDirectory())
- {
- if (isDirectoryAStoreDir(directory))
- {
- stores.add(directory);
- }
- }
- }
-
- return stores.toArray(new File[stores.size()]);
- }
-
- private static void performDBBackup(File source, File backup, boolean force) throws Exception
- {
- if (backup.exists())
- {
- if (force)
- {
- _logger.info("Backup location exists. Forced to remove.");
- FileUtils.delete(backup, true);
- }
- else
- {
- throw new IllegalArgumentException("Unable to perform backup a backup already exists.");
- }
- }
-
- try
- {
- _logger.info("Backing up '" + source + "' to '" + backup + "'");
- FileUtils.copyRecursive(source, backup);
- }
- catch (FileNotFoundException e)
- {
- //Throwing IAE here as this will be reported as a Backup not started
- throw new IllegalArgumentException("Unable to perform backup:" + e.getMessage());
- }
- catch (FileUtils.UnableToCopyException e)
- {
- //Throwing exception here as this will be reported as a Failed Backup
- throw new Exception("Unable to perform backup due to:" + e.getMessage());
- }
- }
-
- public static void main(String[] args) throws ParseException
- {
- setOptions(_options);
-
- final Options helpOptions = new Options();
- setHelpOptions(helpOptions);
-
- //Display help
- boolean displayHelp = false;
- try
- {
- if (new PosixParser().parse(helpOptions, args).hasOption("h"))
- {
- showHelp();
- }
- }
- catch (ParseException pe)
- {
- displayHelp = true;
- }
-
- //Parse commandline for required arguments
- try
- {
- _commandLine = new PosixParser().parse(_options, args);
- }
- catch (ParseException mae)
- {
- if (displayHelp)
- {
- showHelp();
- }
- else
- {
- fatalError(mae.getMessage());
- }
- }
-
- String fromDir = _commandLine.getOptionValue(OPTION_INPUT_SHORT);
- String toDir = _commandLine.getOptionValue(OPTION_OUTPUT_SHORT);
- String backupDir = _commandLine.getOptionValue(OPTION_BACKUP_SHORT);
-
- if (backupDir == null && _commandLine.hasOption(OPTION_BACKUP_SHORT))
- {
- backupDir = "";
- }
-
- //Attempt to locate possible Store to upgrade on input path
- File[] stores = new File[0];
- try
- {
- stores = discoverDBStores(new File(fromDir));
- }
- catch (IllegalArgumentException iae)
- {
- fatalError(iae.getMessage());
- }
-
- boolean interactive = !_commandLine.hasOption(OPTION_QUIET_SHORT);
- boolean force = _commandLine.hasOption(OPTION_FORCE_SHORT);
-
- try{
- for (File store : stores)
- {
-
- // if toDir is null then we are upgrading inplace so we don't need
- // to provide an upgraded toDir when upgrading multiple stores.
- if (toDir == null ||
- // Check to see if we are upgrading a store specified in
- // fromDir or if the directories are nested.
- (stores.length > 0
- && stores[0].toString().length() == fromDir.length()))
- {
- upgrade(store, toDir, backupDir, interactive, force);
- }
- else
- {
- // Add the extra part of path from store to the toDir
- upgrade(store, toDir + File.separator + store.toString().substring(fromDir.length()), backupDir, interactive, force);
- }
- }
- }
- catch (RuntimeException re)
- {
- if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
- {
- re.printStackTrace();
- _logger.error("Upgrade Failed: " + re.getMessage());
- }
- else
- {
- _logger.error("Upgrade stopped : User aborted");
- }
- }
-
- }
-
- @SuppressWarnings("static-access")
- private static void setOptions(Options options)
- {
- Option input =
- OptionBuilder.isRequired().hasArg().withDescription("Location (Path) of store to upgrade.").withLongOpt(OPTION_INPUT)
- .create(OPTION_INPUT_SHORT);
-
- Option output =
- OptionBuilder.hasArg().withDescription("Location (Path) for the upgraded-db to be written.").withLongOpt(OPTION_OUTPUT)
- .create(OPTION_OUTPUT_SHORT);
-
- Option quiet = new Option(OPTION_QUIET_SHORT, OPTION_QUIET, false, "Disable interactive options.");
-
- Option force = new Option(OPTION_FORCE_SHORT, OPTION_FORCE, false, "Force upgrade removing any existing upgrade target.");
- Option backup =
- OptionBuilder.hasOptionalArg().withDescription("Location (Path) for the backup-db to be written.").withLongOpt(OPTION_BACKUP)
- .create(OPTION_BACKUP_SHORT);
-
- options.addOption(input);
- options.addOption(output);
- options.addOption(quiet);
- options.addOption(force);
- options.addOption(backup);
- setHelpOptions(options);
- }
-
- private static void setHelpOptions(Options options)
- {
- options.addOption(new Option("h", "help", false, "Show this help."));
- }
-
- static void upgrade(File fromDir, String toDir, String backupDir, boolean interactive, boolean force)
- {
-
- _logger.info("Running BDB Message Store upgrade tool: v" + VERSION);
- int version = getStoreVersion(fromDir);
- if (!isVersionUpgradable(version))
- {
- return;
- }
- try
- {
- new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(version);
-
- _logger.info("Upgrade complete.");
- }
- catch (IllegalArgumentException iae)
- {
- _logger.error("Upgrade not started due to: " + iae.getMessage());
- }
- catch (DatabaseException de)
- {
- de.printStackTrace();
- _logger.error("Upgrade Failed: " + de.getMessage());
- }
- catch (RuntimeException re)
- {
- if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
- {
- re.printStackTrace();
- _logger.error("Upgrade Failed: " + re.getMessage());
- }
- else
- {
- throw re;
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- _logger.error("Upgrade Failed: " + e.getMessage());
- }
- }
-
- /**
- * Utility method to verify if store of given version can be upgraded.
- *
- * @param version
- * store version to verify
- * @return true if store can be upgraded, false otherwise
- */
- protected static boolean isVersionUpgradable(int version)
- {
- boolean storeUpgradable = false;
- if (version == 0)
- {
- _logger.error("Existing store version is undefined!");
- }
- else if (version < LOWEST_SUPPORTED_STORE_VERSION)
- {
- _logger.error(MessageFormat.format(PREVIOUS_STORE_VERSION_UNSUPPORTED,
- new Object[] { Integer.toString(version) }));
- }
- else if (version == BDBMessageStore.DATABASE_FORMAT_VERSION)
- {
- _logger.error(MessageFormat.format(STORE_ALREADY_UPGRADED, new Object[] { Integer.toString(version) }));
- }
- else if (version > BDBMessageStore.DATABASE_FORMAT_VERSION)
- {
- _logger.error(MessageFormat.format(FOLLOWING_STORE_VERSION_UNSUPPORTED,
- new Object[] { Integer.toString(version) }));
- }
- else
- {
- _logger.info("Existing store version is " + version);
- storeUpgradable = true;
- }
- return storeUpgradable;
- }
-
- /**
- * Detects existing store version by checking list of database in store
- * environment
- *
- * @param fromDir
- * store folder
- * @return version
- */
- public static int getStoreVersion(File fromDir)
- {
- int version = 0;
- EnvironmentConfig envConfig = new EnvironmentConfig();
- envConfig.setAllowCreate(false);
- envConfig.setTransactional(false);
- envConfig.setReadOnly(true);
- Environment environment = null;
- try
- {
-
- environment = new Environment(fromDir, envConfig);
- List<String> databases = environment.getDatabaseNames();
- for (String name : databases)
- {
- if (name.startsWith("exchangeDb"))
- {
- if (name.startsWith("exchangeDb_v"))
- {
- version = Integer.parseInt(name.substring(12));
- }
- else
- {
- version = 1;
- }
- break;
- }
- }
- }
- catch (Exception e)
- {
- _logger.error("Failure to open existing database: " + e.getMessage());
- }
- finally
- {
- if (environment != null)
- {
- try
- {
- environment.close();
- }
- catch (Exception e)
- {
- // ignoring. It should never happen.
- }
- }
- }
- return version;
- }
-
- private static void fatalError(String message)
- {
- System.out.println(message);
- usage();
- System.exit(1);
- }
-
- private static void showHelp()
- {
- help();
- System.exit(0);
- }
-
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
deleted file mode 100644
index 396f0ed817..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-public class BindingKey extends Object
-{
- private final AMQShortString _exchangeName;
- private final AMQShortString _queueName;
- private final AMQShortString _routingKey;
- private final FieldTable _arguments;
-
- public BindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments)
- {
- _exchangeName = exchangeName;
- _queueName = queueName;
- _routingKey = routingKey;
- _arguments = arguments;
- }
-
-
- public AMQShortString getExchangeName()
- {
- return _exchangeName;
- }
-
- public AMQShortString getQueueName()
- {
- return _queueName;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
-
- public FieldTable getArguments()
- {
- return _arguments;
- }
-
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
deleted file mode 100644
index 5ea3e9c2e8..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.nio.ByteBuffer;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-public class ContentTB extends TupleBinding
-{
- public Object entryToObject(TupleInput tupleInput)
- {
-
- final int size = tupleInput.readInt();
- byte[] underlying = new byte[size];
- tupleInput.readFast(underlying);
- return ByteBuffer.wrap(underlying);
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- ByteBuffer src = (ByteBuffer) object;
-
- src = src.slice();
-
- byte[] chunkData = new byte[src.limit()];
- src.duplicate().get(chunkData);
-
- tupleOutput.writeInt(chunkData.length);
- tupleOutput.writeFast(chunkData);
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
deleted file mode 100644
index 9bd879025f..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.AMQStoreException;
-
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-
-/** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */
-public abstract class DatabaseVisitor
-{
- protected int _count;
-
- abstract public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQStoreException, DatabaseException;
-
- public int getVisitedCount()
- {
- return _count;
- }
-
- public void resetVisitCount()
- {
- _count = 0;
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
deleted file mode 100644
index f9c7828bef..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
-
-public class ExchangeTB extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(ExchangeTB.class);
-
- public ExchangeTB()
- {
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
-
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
-
- boolean autoDelete = tupleInput.readBoolean();
-
- return new ExchangeRecord(name, typeName, autoDelete);
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- ExchangeRecord exchange = (ExchangeRecord) object;
-
- AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput);
- AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
-
- tupleOutput.writeBoolean(exchange.isAutoDelete());
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
deleted file mode 100644
index c09498cce3..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.framing.FieldTable;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-public class FieldTableEncoding
-{
- public static FieldTable readFieldTable(TupleInput tupleInput) throws DatabaseException
- {
- long length = tupleInput.readLong();
- if (length <= 0)
- {
- return null;
- }
- else
- {
-
- byte[] data = new byte[(int)length];
- tupleInput.readFast(data);
-
- try
- {
- return new FieldTable(new DataInputStream(new ByteArrayInputStream(data)),length);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- }
-
- public static void writeFieldTable(FieldTable fieldTable, TupleOutput tupleOutput)
- {
-
- if (fieldTable == null)
- {
- tupleOutput.writeLong(0);
- }
- else
- {
- tupleOutput.writeLong(fieldTable.getEncodedSize());
- tupleOutput.writeFast(fieldTable.getDataAsBytes());
- }
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
deleted file mode 100644
index 005e8d4604..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-public class MessageContentKey
-{
- private long _messageId;
-
- public MessageContentKey(long messageId)
- {
- _messageId = messageId;
- }
-
-
- public long getMessageId()
- {
- return _messageId;
- }
-
- public void setMessageId(long messageId)
- {
- this._messageId = messageId;
- }
-} \ No newline at end of file
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
deleted file mode 100644
index c274fdec8c..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.framing.AMQShortString;
-
-public class QueueEntryKey
-{
- private AMQShortString _queueName;
- private long _messageId;
-
-
- public QueueEntryKey(AMQShortString queueName, long messageId)
- {
- _queueName = queueName;
- _messageId = messageId;
- }
-
-
- public AMQShortString getQueueName()
- {
- return _queueName;
- }
-
-
- public long getMessageId()
- {
- return _messageId;
- }
-
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java
deleted file mode 100644
index 30357c97d4..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.keys;
-
-import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
-
-public class MessageContentKey_4 extends MessageContentKey
-{
- private int _chunkNum;
-
- public MessageContentKey_4(long messageId, int chunkNo)
- {
- super(messageId);
- _chunkNum = chunkNo;
- }
-
- public int getChunk()
- {
- return _chunkNum;
- }
-
- public void setChunk(int chunk)
- {
- this._chunkNum = chunk;
- }
-} \ No newline at end of file
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java
deleted file mode 100644
index a1a7fe80b5..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.keys;
-
-import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
-
-public class MessageContentKey_5 extends MessageContentKey
-{
- private int _offset;
-
- public MessageContentKey_5(long messageId, int chunkNo)
- {
- super(messageId);
- _offset = chunkNo;
- }
-
- public int getOffset()
- {
- return _offset;
- }
-
- public void setOffset(int chunk)
- {
- this._offset = chunk;
- }
-} \ No newline at end of file
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
deleted file mode 100644
index f20367e33b..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.records;
-
-import org.apache.qpid.framing.AMQShortString;
-
-public class ExchangeRecord extends Object
-{
- private final AMQShortString _exchangeName;
- private final AMQShortString _exchangeType;
- private final boolean _autoDelete;
-
- public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete)
- {
- _exchangeName = exchangeName;
- _exchangeType = exchangeType;
- _autoDelete = autoDelete;
- }
-
- public AMQShortString getNameShortString()
- {
- return _exchangeName;
- }
-
- public AMQShortString getType()
- {
- return _exchangeType;
- }
-
- public boolean isAutoDelete()
- {
- return _autoDelete;
- }
-
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
deleted file mode 100644
index fbe10433ca..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.records;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-public class QueueRecord extends Object
-{
- private final AMQShortString _queueName;
- private final AMQShortString _owner;
- private final FieldTable _arguments;
- private boolean _exclusive;
-
- public QueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments)
- {
- _queueName = queueName;
- _owner = owner;
- _exclusive = exclusive;
- _arguments = arguments;
- }
-
- public AMQShortString getNameShortString()
- {
- return _queueName;
- }
-
- public AMQShortString getOwner()
- {
- return _owner;
- }
-
- public boolean isExclusive()
- {
- return _exclusive;
- }
-
- public void setExclusive(boolean exclusive)
- {
- _exclusive = exclusive;
- }
-
- public FieldTable getArguments()
- {
- return _arguments;
- }
-
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java
deleted file mode 100644
index f6344b3d7d..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.testclient;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.ping.PingDurableClient;
-import org.apache.qpid.server.store.berkeleydb.BDBBackup;
-import org.apache.qpid.util.CommandLineParser;
-
-import java.util.Properties;
-
-/**
- * BackupTestClient extends {@link PingDurableClient} with an action that takes a BDB backup when a configurable
- * message count is reached. This enables a test user to restore this beackup, knowing how many committed but undelivered
- * messages were in the backup, in order to check that all are re-delivered when the backup is retored.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Perform BDB Backup on configurable message count.
- * </table>
- */
-public class BackupTestClient extends PingDurableClient
-{
- /** Used for debugging. */
- private static final Logger log = Logger.getLogger(BackupTestClient.class);
-
- /** Holds the from directory to take backups from. */
- private String fromDir;
-
- /** Holds the to directory to store backups in. */
- private String toDir;
-
- /**
- * Default constructor, passes all property overrides to the parent.
- *
- * @param overrides Any property overrides to apply to the defaults.
- *
- * @throws Exception Any underlying exception is allowed to fall through.
- */
- BackupTestClient(Properties overrides) throws Exception
- {
- super(overrides);
- }
-
- /**
- * Starts the ping/wait/receive process. From and to directory locations for the BDB backups must be specified
- * on the command line:
- *
- * <p/><table><caption>Command Line</caption>
- * <tr><th> Option <th> Comment
- * <tr><td> -fromdir <td> The path to the directory to back the bdb log file from.
- * <tr><td> -todir <td> The path to the directory to save the backed up bdb log files to.
- * </table>
- *
- * @param args The command line arguments.
- */
- public static void main(String[] args)
- {
- try
- {
- // Use the same command line format as BDBBackup utility, (compulsory from and to directories).
- Properties options =
- CommandLineParser.processCommandLine(args, new CommandLineParser(BDBBackup.COMMAND_LINE_SPEC),
- System.getProperties());
- BackupTestClient pingProducer = new BackupTestClient(options);
-
- // Keep the from and to directories for backups.
- pingProducer.fromDir = options.getProperty("fromdir");
- pingProducer.toDir = options.getProperty("todir");
-
- // Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
-
- // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- // pingProducer.getConnection().setExceptionListener(pingProducer);
-
- // Run the test procedure.
- int sent = pingProducer.send();
- pingProducer.waitForUser("Press return to begin receiving the pings.");
- pingProducer.receive(sent);
-
- System.exit(0);
- }
- catch (Exception e)
- {
- System.err.println(e.getMessage());
- log.error("Top level handler caught execption.", e);
- System.exit(1);
- }
- }
-
- /**
- * Supplies a triggered action extension, based on message count. This action takes a BDB log file backup.
- */
- public void takeAction()
- {
- BDBBackup backupUtil = new BDBBackup();
- backupUtil.takeBackupNoLock(fromDir, toDir);
- System.out.println("Took backup of BDB log files from directory: " + fromDir);
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
deleted file mode 100644
index 301ee417c5..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-public interface BindingTuple
-{
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
deleted file mode 100644
index 8e17f074d7..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import org.apache.qpid.server.store.berkeleydb.BindingKey;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-
-public class BindingTupleBindingFactory extends TupleBindingFactory<BindingKey>
-{
- public BindingTupleBindingFactory(int version)
- {
- super(version);
- }
-
- public TupleBinding<BindingKey> getInstance()
- {
- switch (_version)
- {
- default:
- case 5:
- //no change from v4
- case 4:
- return new BindingTuple_4();
- }
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java
deleted file mode 100644
index 52b131a7f2..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.BindingKey;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-
-public class BindingTuple_4 extends TupleBinding<BindingKey> implements BindingTuple
-{
- protected static final Logger _log = Logger.getLogger(BindingTuple.class);
-
- public BindingTuple_4()
- {
- super();
- }
-
- public BindingKey entryToObject(TupleInput tupleInput)
- {
- AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
-
- FieldTable arguments;
-
- // Addition for Version 2 of this table
- try
- {
- arguments = FieldTableEncoding.readFieldTable(tupleInput);
- }
- catch (DatabaseException e)
- {
- _log.error("Unable to create binding: " + e, e);
- return null;
- }
-
- return new BindingKey(exchangeName, queueName, routingKey, arguments);
- }
-
- public void objectToEntry(BindingKey binding, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
-
- // Addition for Version 2 of this table
- FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
- }
-
-} \ No newline at end of file
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java
deleted file mode 100644
index f5ba6bbce3..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-public class MessageContentKeyTB_4 extends TupleBinding<MessageContentKey>
-{
-
- public MessageContentKey entryToObject(TupleInput tupleInput)
- {
- long messageId = tupleInput.readLong();
- int chunk = tupleInput.readInt();
- return new MessageContentKey_4(messageId, chunk);
- }
-
- public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput)
- {
- final MessageContentKey_4 mk = (MessageContentKey_4) object;
- tupleOutput.writeLong(mk.getMessageId());
- tupleOutput.writeInt(mk.getChunk());
- }
-
-} \ No newline at end of file
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java
deleted file mode 100644
index e6a2fd23a8..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-public class MessageContentKeyTB_5 extends TupleBinding<MessageContentKey>
-{
- public MessageContentKey entryToObject(TupleInput tupleInput)
- {
- long messageId = tupleInput.readLong();
- int offset = tupleInput.readInt();
- return new MessageContentKey_5(messageId, offset);
- }
-
- public void objectToEntry(MessageContentKey object, TupleOutput tupleOutput)
- {
- final MessageContentKey_5 mk = (MessageContentKey_5) object;
- tupleOutput.writeLong(mk.getMessageId());
- tupleOutput.writeInt(mk.getOffset());
- }
-
-} \ No newline at end of file
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
deleted file mode 100644
index 76ee4f66e4..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-
-public class MessageContentKeyTupleBindingFactory extends TupleBindingFactory<MessageContentKey>
-{
- public MessageContentKeyTupleBindingFactory(int version)
- {
- super(version);
- }
-
- public TupleBinding<MessageContentKey> getInstance()
- {
- switch (_version)
- {
- default:
- case 5:
- return new MessageContentKeyTB_5();
- case 4:
- return new MessageContentKeyTB_4();
- }
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java
deleted file mode 100644
index e26b544e38..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-
-import java.io.*;
-
-/**
- * Handles the mapping to and from 0-8/0-9 message meta data
- */
-public class MessageMetaDataTB_4 extends TupleBinding<Object>
-{
- private static final Logger _log = Logger.getLogger(MessageMetaDataTB_4.class);
-
- public MessageMetaDataTB_4()
- {
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
- try
- {
- final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
- final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
- final int contentChunkCount = tupleInput.readInt();
-
- return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
- }
- catch (Exception e)
- {
- _log.error("Error converting entry to object: " + e, e);
- // annoyingly just have to return null since we cannot throw
- return null;
- }
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- MessageMetaData message = (MessageMetaData) object;
- try
- {
- writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
- }
- catch (AMQException e)
- {
- // can't do anything else since the BDB interface precludes throwing any exceptions
- // in practice we should never get an exception
- throw new RuntimeException("Error converting object to entry: " + e, e);
- }
- writeContentHeader(message.getContentHeaderBody(), tupleOutput);
- tupleOutput.writeInt(message.getContentChunkCount());
- }
-
- private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
- {
-
- final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
- final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
- final boolean mandatory = tupleInput.readBoolean();
- final boolean immediate = tupleInput.readBoolean();
-
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- } ;
-
- }
-
- private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
- {
- int bodySize = tupleInput.readInt();
- byte[] underlying = new byte[bodySize];
- tupleInput.readFast(underlying);
-
- try
- {
- return ContentHeaderBody.createFromBuffer(new DataInputStream(new ByteArrayInputStream(underlying)), bodySize);
- }
- catch (IOException e)
- {
- throw new AMQFrameDecodingException(null, e.getMessage(), e);
- }
- }
-
- private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
- {
-
- AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
- AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
- tupleOutput.writeBoolean(publishBody.isMandatory());
- tupleOutput.writeBoolean(publishBody.isImmediate());
- }
-
- private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
- {
- // write out the content header body
- final int bodySize = headerBody.getSize();
- ByteArrayOutputStream baos = new ByteArrayOutputStream(bodySize);
- try
- {
- headerBody.writePayload(new DataOutputStream(baos));
- tupleOutput.writeInt(bodySize);
- tupleOutput.writeFast(baos.toByteArray());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java
deleted file mode 100644
index 6dc041cb23..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.MessageMetaDataType;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-
-/**
- * Handles the mapping to and from message meta data
- */
-public class MessageMetaDataTB_5 extends MessageMetaDataTB_4
-{
- private static final Logger _log = Logger.getLogger(MessageMetaDataTB_5.class);
-
- @Override
- public Object entryToObject(TupleInput tupleInput)
- {
- try
- {
- final int bodySize = tupleInput.readInt();
- byte[] dataAsBytes = new byte[bodySize];
- tupleInput.readFast(dataAsBytes);
-
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
- buf.position(1);
- buf = buf.slice();
- MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
- StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
-
- return metaData;
- }
- catch (Exception e)
- {
- _log.error("Error converting entry to object: " + e, e);
- // annoyingly just have to return null since we cannot throw
- return null;
- }
- }
-
- @Override
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- StorableMessageMetaData metaData = (StorableMessageMetaData) object;
-
- final int bodySize = 1 + metaData.getStorableSize();
- byte[] underlying = new byte[bodySize];
- underlying[0] = (byte) metaData.getType().ordinal();
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
- buf.position(1);
- buf = buf.slice();
-
- metaData.writeToBuffer(0, buf);
- tupleOutput.writeInt(bodySize);
- tupleOutput.writeFast(underlying);
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
deleted file mode 100644
index 40153c13ea..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-
-public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory<Object>
-{
- public MessageMetaDataTupleBindingFactory(int version)
- {
- super(version);
- }
-
- public TupleBinding<Object> getInstance()
- {
- switch (_version)
- {
- default:
- case 5:
- return new MessageMetaDataTB_5();
- case 4:
- return new MessageMetaDataTB_4();
- }
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
deleted file mode 100644
index 975e558874..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-public class QueueEntryTB extends TupleBinding<QueueEntryKey>
-{
- public QueueEntryKey entryToObject(TupleInput tupleInput)
- {
- AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- Long messageId = tupleInput.readLong();
-
- return new QueueEntryKey(queueName, messageId);
- }
-
- public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput);
- tupleOutput.writeLong(mk.getMessageId());
- }
-} \ No newline at end of file
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
deleted file mode 100644
index affa9a271d..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-public interface QueueTuple
-{
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
deleted file mode 100644
index 512e319f96..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-
-public class QueueTupleBindingFactory extends TupleBindingFactory<QueueRecord>
-{
-
- public QueueTupleBindingFactory(int version)
- {
- super(version);
- }
-
- public TupleBinding<QueueRecord> getInstance()
- {
- switch (_version)
- {
- default:
- case 5:
- return new QueueTuple_5();
- case 4:
- return new QueueTuple_4();
- }
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
deleted file mode 100644
index 347eecf08e..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-public class QueueTuple_4 extends TupleBinding<QueueRecord> implements QueueTuple
-{
- protected static final Logger _logger = Logger.getLogger(QueueTuple_4.class);
-
- protected FieldTable _arguments;
-
- public QueueTuple_4()
- {
- super();
- }
-
- public QueueRecord entryToObject(TupleInput tupleInput)
- {
- try
- {
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
- // Addition for Version 2 of this table, read the queue arguments
- FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
-
- return new QueueRecord(name, owner, false, arguments);
- }
- catch (DatabaseException e)
- {
- _logger.error("Unable to create binding: " + e, e);
- return null;
- }
-
- }
-
- public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
- // Addition for Version 2 of this table, store the queue arguments
- FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
deleted file mode 100644
index 0f293b79b7..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-public class QueueTuple_5 extends QueueTuple_4
-{
- protected static final Logger _logger = Logger.getLogger(QueueTuple_5.class);
-
- protected FieldTable _arguments;
-
- public QueueTuple_5()
- {
- super();
- }
-
- public QueueRecord entryToObject(TupleInput tupleInput)
- {
- try
- {
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
- // Addition for Version 2 of this table, read the queue arguments
- FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
- // Addition for Version 3 of this table, read the queue exclusivity
- boolean exclusive = tupleInput.readBoolean();
-
- return new QueueRecord(name, owner, exclusive, arguments);
- }
- catch (DatabaseException e)
- {
- _logger.error("Unable to create binding: " + e, e);
- return null;
- }
-
- }
-
- public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
- // Addition for Version 2 of this table, store the queue arguments
- FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
- // Addition for Version 3 of this table, store the queue exclusivity
- tupleOutput.writeBoolean(queue.isExclusive());
- }
-}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
deleted file mode 100644
index 2adac1f9a3..0000000000
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.tuples;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-
-public abstract class TupleBindingFactory<E>
-{
- protected int _version;
-
- public TupleBindingFactory(int version)
- {
- _version = version;
- }
-
- public abstract TupleBinding<E> getInstance();
-}
diff --git a/java/bdbstore/src/resources/backup-log4j.xml b/java/bdbstore/src/resources/backup-log4j.xml
deleted file mode 100644
index 6b0619f0b6..0000000000
--- a/java/bdbstore/src/resources/backup-log4j.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- -
- - Licensed to the Apache Software Foundation (ASF) under one
- - or more contributor license agreements. See the NOTICE file
- - distributed with this work for additional information
- - regarding copyright ownership. The ASF licenses this file
- - to you under the Apache License, Version 2.0 (the
- - "License"); you may not use this file except in compliance
- - with the License. You may obtain a copy of the License at
- -
- - http://www.apache.org/licenses/LICENSE-2.0
- -
- - Unless required by applicable law or agreed to in writing,
- - software distributed under the License is distributed on an
- - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- - KIND, either express or implied. See the License for the
- - specific language governing permissions and limitations
- - under the License.
- -
- -->
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-
-<!-- =============================================================================== -->
-<!-- This is a Log4j configuration specially created for the BDB Backup utility, -->
-<!-- it outputs logging to the console for specifically designated console loggers -->
-<!-- at info level or above only. This avoids spamming the user with any internals -->
-<!-- of the Qpid code. -->
-<!-- Use a different logging set up to capture debugging output to diagnose errors. -->
-<!-- =============================================================================== -->
-
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
-
- <!-- ====================================================== -->
- <!-- Append messages to the console at info level or above. -->
- <!-- ====================================================== -->
-
- <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
- <param name="Target" value="System.out"/>
- <param name="Threshold" value="info"/>
-
- <layout class="org.apache.log4j.PatternLayout">
- <!-- The default pattern: Date Priority [Category] Message\n -->
- <param name="ConversionPattern" value="%m%n"/>
- </layout>
-
- </appender>
-
- <!-- ================ -->
- <!-- Limit categories -->
- <!-- ================ -->
-
- <category name="org.apache.qpid.server.store.berkeleydb.BDBBackup">
- <priority value="info"/>
- </category>
-
- <!-- ======================= -->
- <!-- Setup the Root category -->
- <!-- ======================= -->
-
- <root>
- <appender-ref ref="CONSOLE"/>
- </root>
-
-</log4j:configuration>
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java
deleted file mode 100644
index d076babc61..0000000000
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.framing.AMQShortString;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-import junit.framework.TestCase;
-
-/**
- * Tests for {@code AMQShortStringEncoding} including corner cases when string
- * is null or over 127 characters in length
- */
-public class AMQShortStringEncodingTest extends TestCase
-{
-
- public void testWriteReadNullValues()
- {
- // write into tuple output
- TupleOutput tupleOutput = new TupleOutput();
- AMQShortStringEncoding.writeShortString(null, tupleOutput);
- byte[] data = tupleOutput.getBufferBytes();
-
- // read from tuple input
- TupleInput tupleInput = new TupleInput(data);
- AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
- assertNull("Expected null but got " + result, result);
- }
-
- public void testWriteReadShortStringWithLengthOver127()
- {
- AMQShortString value = createString('a', 128);
-
- // write into tuple output
- TupleOutput tupleOutput = new TupleOutput();
- AMQShortStringEncoding.writeShortString(value, tupleOutput);
- byte[] data = tupleOutput.getBufferBytes();
-
- // read from tuple input
- TupleInput tupleInput = new TupleInput(data);
- AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
- assertEquals("Expected " + value + " but got " + result, value, result);
- }
-
- public void testWriteReadShortStringWithLengthLess127()
- {
- AMQShortString value = new AMQShortString("test");
-
- // write into tuple output
- TupleOutput tupleOutput = new TupleOutput();
- AMQShortStringEncoding.writeShortString(value, tupleOutput);
- byte[] data = tupleOutput.getBufferBytes();
-
- // read from tuple input
- TupleInput tupleInput = new TupleInput(data);
- AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
- assertEquals("Expected " + value + " but got " + result, value, result);
- }
-
- private AMQShortString createString(char ch, int length)
- {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < length; i++)
- {
- sb.append(ch);
- }
- return new AMQShortString(sb.toString());
- }
-
-}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
deleted file mode 100644
index ef31b78cfe..0000000000
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
-import org.apache.qpid.server.store.MessageMetaDataType;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageDeliveryPriority;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageTransfer;
-
-/**
- * Subclass of MessageStoreTest which runs the standard tests from the superclass against
- * the BDB Store as well as additional tests specific to the DBB store-implementation.
- */
-public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
-{
- /**
- * Tests that message metadata and content are successfully read back from a
- * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
- * verify their ability to co-exist within the store and be successful retrieved.
- */
- public void testBDBMessagePersistence() throws Exception
- {
- MessageStore store = getVirtualHost().getMessageStore();
-
- BDBMessageStore bdbStore = assertBDBStore(store);
-
- // Create content ByteBuffers.
- // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
- // Use a single chunk for the 0-10 message as per broker behaviour.
- String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
-
- ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
- ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes());
-
- ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes());
- int bodySize = completeContentBody_0_10.limit();
-
- /*
- * Create and insert a 0-8 message (metadata and multi-chunk content)
- */
- MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
- BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
-
- ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
-
- MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
- StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8);
-
- long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
- long messageid_0_8 = storedMessage_0_8.getMessageNumber();
-
- storedMessage_0_8.addContent(0, firstContentBytes_0_8);
- storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
- storedMessage_0_8.flushToStore();
-
- /*
- * Create and insert a 0-10 message (metadata and content)
- */
- MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
- DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
- Header header_0_10 = new Header(msgProps_0_10, delProps_0_10);
-
- MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
-
- MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
- StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10);
-
- long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime();
- long messageid_0_10 = storedMessage_0_10.getMessageNumber();
-
- storedMessage_0_10.addContent(0, completeContentBody_0_10);
- storedMessage_0_10.flushToStore();
-
- /*
- * reload the store only (read-only)
- */
- bdbStore = reloadStoreReadOnly(bdbStore);
-
- /*
- * Read back and validate the 0-8 message metadata and content
- */
- StorableMessageMetaData storeableMMD_0_8 = bdbStore.getMessageMetaData(messageid_0_8);
-
- assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_8, storeableMMD_0_8.getType());
- assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
- MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8;
-
- assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime());
-
- MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo();
- assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange());
- assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate());
- assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory());
- assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey());
-
- ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody();
- assertEquals("ContentHeader ClassID has changed", chb_0_8.classId, returnedHeaderBody_0_8.classId);
- assertEquals("ContentHeader weight has changed", chb_0_8.weight, returnedHeaderBody_0_8.weight);
- assertEquals("ContentHeader bodySize has changed", chb_0_8.bodySize, returnedHeaderBody_0_8.bodySize);
-
- BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties();
- assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString());
- assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
-
- ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.bodySize) ;
- long recoveredCount_0_8 = bdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
- assertEquals("Incorrect amount of payload data recovered", chb_0_8.bodySize, recoveredCount_0_8);
- String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
- assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
-
- /*
- * Read back and validate the 0-10 message metadata and content
- */
- StorableMessageMetaData storeableMMD_0_10 = bdbStore.getMessageMetaData(messageid_0_10);
-
- assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_10, storeableMMD_0_10.getType());
- assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
- MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10;
-
- assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
-
- DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class);
- assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
- assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
- assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
- assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange());
- assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
- assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
-
- MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class);
- assertNotNull("MessageProperties were not returned", returnedMsgProps);
- assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
- assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
- assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
-
- ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
- long recoveredCount = bdbStore.getContent(messageid_0_10, 0, recoveredContent);
- assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
-
- String returnedPayloadString_0_10 = new String(recoveredContent.array());
- assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
- }
-
- private DeliveryProperties createDeliveryProperties_0_10()
- {
- DeliveryProperties delProps_0_10 = new DeliveryProperties();
-
- delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- delProps_0_10.setImmediate(true);
- delProps_0_10.setExchange("exchange12345");
- delProps_0_10.setRoutingKey("routingKey12345");
- delProps_0_10.setExpiration(5);
- delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
-
- return delProps_0_10;
- }
-
- private MessageProperties createMessageProperties_0_10(int bodySize)
- {
- MessageProperties msgProps_0_10 = new MessageProperties();
- msgProps_0_10.setContentLength(bodySize);
- msgProps_0_10.setCorrelationId("qwerty".getBytes());
- msgProps_0_10.setContentType("text/html");
-
- return msgProps_0_10;
- }
-
- /**
- * Close the provided store and create a new (read-only) store to read back the data.
- *
- * Use this method instead of reloading the virtual host like other tests in order
- * to avoid the recovery handler deleting the message for not being on a queue.
- */
- private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception
- {
- messageStore.close();
- File storePath = new File(String.valueOf(_config.getProperty("store.environment-path")));
-
- BDBMessageStore newStore = new BDBMessageStore();
- newStore.configure(storePath, false);
- newStore.start();
-
- return newStore;
- }
-
- private MessagePublishInfo createPublishInfoBody_0_8()
- {
- return new MessagePublishInfo()
- {
- public AMQShortString getExchange()
- {
- return new AMQShortString("exchange12345");
- }
-
- public void setExchange(AMQShortString exchange)
- {
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return true;
- }
-
- public AMQShortString getRoutingKey()
- {
- return new AMQShortString("routingKey12345");
- }
- };
-
- }
-
- private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length)
- {
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
- return new ContentHeaderBody(classForBasic, 1, props, length);
- }
-
- private BasicContentHeaderProperties createContentHeaderProperties_0_8()
- {
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
- props.setContentType("text/html");
- props.getHeaders().setString("Test", "MST");
- return props;
- }
-
- /**
- * Tests that messages which are added to the store and then removed using the
- * public MessageStore interfaces are actually removed from the store by then
- * interrogating the store with its own implementation methods and verifying
- * expected exceptions are thrown to indicate the message is not present.
- */
- public void testMessageCreationAndRemoval() throws Exception
- {
- MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
-
- StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store);
- long messageid_0_8 = storedMessage_0_8.getMessageNumber();
-
- //remove the message in the fashion the broker normally would
- storedMessage_0_8.remove();
-
- //verify the removal using the BDB store implementation methods directly
- try
- {
- // the next line should throw since the message id should not be found
- bdbStore.getMessageMetaData(messageid_0_8);
- fail("No exception thrown when message id not found getting metadata");
- }
- catch (AMQStoreException e)
- {
- // pass since exception expected
- }
-
- //expecting no content, allocate a 1 byte
- ByteBuffer dst = ByteBuffer.allocate(1);
-
- assertEquals("Retrieved content when none was expected",
- 0, bdbStore.getContent(messageid_0_8, 0, dst));
- }
-
- private BDBMessageStore assertBDBStore(Object store)
- {
- if(!(store instanceof BDBMessageStore))
- {
- fail("Test requires an instance of BDBMessageStore to proceed");
- }
-
- return (BDBMessageStore) store;
- }
-
- private StoredMessage<MessageMetaData> createAndStoreMultiChunkMessage_0_8(MessageStore store)
- {
- byte[] body10Bytes = "0123456789".getBytes();
- byte[] body5Bytes = "01234".getBytes();
-
- ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes);
- ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes);
-
- int bodySize = body10Bytes.length + body5Bytes.length;
-
- //create and store the message using the MessageStore interface
- MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
- BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
-
- ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
-
- MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
- StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
-
- storedMessage_0_8.addContent(0, chunk1);
- storedMessage_0_8.addContent(chunk1.limit(), chunk2);
- storedMessage_0_8.flushToStore();
-
- return storedMessage_0_8;
- }
-
- /**
- * Tests transaction commit by utilising the enqueue and dequeue methods available
- * in the TransactionLog interface implemented by the store, and verifying the
- * behaviour using BDB implementation methods.
- */
- public void testTranCommit() throws Exception
- {
- TransactionLog log = getVirtualHost().getTransactionLog();
-
- BDBMessageStore bdbStore = assertBDBStore(log);
-
- final AMQShortString mockQueueName = new AMQShortString("queueName");
-
- TransactionLogResource mockQueue = new TransactionLogResource()
- {
- public String getResourceName()
- {
- return mockQueueName.asString();
- }
- };
-
- TransactionLog.Transaction txn = log.newTransaction();
-
- txn.enqueueMessage(mockQueue, 1L);
- txn.enqueueMessage(mockQueue, 5L);
- txn.commitTran();
-
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-
- assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- assertEquals("First Message is incorrect", 1L, val.longValue());
- val = enqueuedIds.get(1);
- assertEquals("Second Message is incorrect", 5L, val.longValue());
- }
-
-
- /**
- * Tests transaction rollback before a commit has occurred by utilising the
- * enqueue and dequeue methods available in the TransactionLog interface
- * implemented by the store, and verifying the behaviour using BDB
- * implementation methods.
- */
- public void testTranRollbackBeforeCommit() throws Exception
- {
- TransactionLog log = getVirtualHost().getTransactionLog();
-
- BDBMessageStore bdbStore = assertBDBStore(log);
-
- final AMQShortString mockQueueName = new AMQShortString("queueName");
-
- TransactionLogResource mockQueue = new TransactionLogResource()
- {
- public String getResourceName()
- {
- return mockQueueName.asString();
- }
- };
-
- TransactionLog.Transaction txn = log.newTransaction();
-
- txn.enqueueMessage(mockQueue, 21L);
- txn.abortTran();
-
- txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 22L);
- txn.enqueueMessage(mockQueue, 23L);
- txn.commitTran();
-
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-
- assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- assertEquals("First Message is incorrect", 22L, val.longValue());
- val = enqueuedIds.get(1);
- assertEquals("Second Message is incorrect", 23L, val.longValue());
- }
-
- /**
- * Tests transaction rollback after a commit has occurred by utilising the
- * enqueue and dequeue methods available in the TransactionLog interface
- * implemented by the store, and verifying the behaviour using BDB
- * implementation methods.
- */
- public void testTranRollbackAfterCommit() throws Exception
- {
- TransactionLog log = getVirtualHost().getTransactionLog();
-
- BDBMessageStore bdbStore = assertBDBStore(log);
-
- final AMQShortString mockQueueName = new AMQShortString("queueName");
-
- TransactionLogResource mockQueue = new TransactionLogResource()
- {
- public String getResourceName()
- {
- return mockQueueName.asString();
- }
- };
-
- TransactionLog.Transaction txn = log.newTransaction();
-
- txn.enqueueMessage(mockQueue, 30L);
- txn.commitTran();
-
- txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 31L);
- txn.abortTran();
-
- txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, 32L);
- txn.commitTran();
-
- List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
-
- assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- assertEquals("First Message is incorrect", 30L, val.longValue());
- val = enqueuedIds.get(1);
- assertEquals("Second Message is incorrect", 32L, val.longValue());
- }
-
-}
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
deleted file mode 100644
index cc19bcf5d8..0000000000
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
-import junit.framework.TestCase;
-
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.url.URLSyntaxException;
-
-/**
- * Prepares an older version brokers BDB store with the required
- * contents for use in the BDBStoreUpgradeTest.
- *
- * The store will then be used to verify that the upgraded is
- * completed properly and that once upgraded it functions as
- * expected with the new broker.
- */
-public class BDBStoreUpgradeTestPreparer extends TestCase
-{
- public static final String TOPIC_NAME="myUpgradeTopic";
- public static final String SUB_NAME="myDurSubName";
- public static final String QUEUE_NAME="myUpgradeQueue";
-
- private static AMQConnectionFactory _connFac;
- private static final String CONN_URL =
- "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
-
- /**
- * Create a BDBStoreUpgradeTestPreparer instance
- */
- public BDBStoreUpgradeTestPreparer () throws URLSyntaxException
- {
- _connFac = new AMQConnectionFactory(CONN_URL);
- }
-
- /**
- * Utility test method to allow running the preparation tool
- * using the test framework
- */
- public void testPrepareBroker() throws Exception
- {
- prepareBroker();
- }
-
- private void prepareBroker() throws Exception
- {
- prepareQueues();
- prepareDurableSubscription();
- }
-
- /**
- * Prepare a queue for use in testing message and binding recovery
- * after the upgrade is performed.
- *
- * - Create a transacted session on the connection.
- * - Use a consumer to create the (durable by default) queue.
- * - Send 5 large messages to test (multi-frame) content recovery.
- * - Send 1 small message to test (single-frame) content recovery.
- * - Commit the session.
- * - Send 5 small messages to test that uncommitted messages are not recovered.
- * following the upgrade.
- * - Close the session.
- */
- private void prepareQueues() throws Exception
- {
- // Create a connection
- Connection connection = _connFac.createConnection();
- connection.start();
- connection.setExceptionListener(new ExceptionListener()
- {
- public void onException(JMSException e)
- {
- e.printStackTrace();
- }
- });
- // Create a session on the connection, transacted to confirm delivery
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue(QUEUE_NAME);
- // Create a consumer to ensure the queue gets created
- // (and enter it into the store, as queues are made durable by default)
- MessageConsumer messageConsumer = session.createConsumer(queue);
- messageConsumer.close();
-
- // Create a Message producer
- MessageProducer messageProducer = session.createProducer(queue);
-
- // Publish 5 persistent messages, 256k chars to ensure they are multi-frame
- sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 5);
- // Publish 5 persistent messages, 1k chars to ensure they are single-frame
- sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
-
- session.commit();
-
- // Publish 5 persistent messages which will NOT be committed and so should be 'lost'
- sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
-
- session.close();
- connection.close();
- }
-
- /**
- * Prepare a DurableSubscription backing queue for use in testing selector
- * recovery and queue exclusivity marking during the upgrade process.
- *
- * - Create a transacted session on the connection.
- * - Open and close a DurableSubscription with selector to create the backing queue.
- * - Send a message which matches the selector.
- * - Send a message which does not match the selector.
- * - Send a message which matches the selector but will remain uncommitted.
- * - Close the session.
- */
- private void prepareDurableSubscription() throws Exception
- {
-
- // Create a connection
- TopicConnection connection = _connFac.createTopicConnection();
- connection.start();
- connection.setExceptionListener(new ExceptionListener()
- {
- public void onException(JMSException e)
- {
- e.printStackTrace();
- }
- });
- // Create a session on the connection, transacted to confirm delivery
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Topic topic = session.createTopic(TOPIC_NAME);
-
- // Create and register a durable subscriber with selector and then close it
- TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false);
- durSub1.close();
-
- // Create a publisher and send a persistent message which matches the selector
- // followed by one that does not match, and another which matches but is not
- // committed and so should be 'lost'
- TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
- TopicPublisher publisher = pubSession.createPublisher(topic);
-
- publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
- publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
- pubSession.commit();
- publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
-
- publisher.close();
- pubSession.close();
-
- }
-
- public static void sendMessages(Session session, MessageProducer messageProducer,
- Destination dest, int deliveryMode, int length, int numMesages) throws JMSException
- {
- for (int i = 1; i <= numMesages; i++)
- {
- Message message = session.createTextMessage(generateString(length));
- message.setIntProperty("ID", i);
- messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
- }
- }
-
- public static void publishMessages(Session session, TopicPublisher publisher,
- Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException
- {
- for (int i = 1; i <= numMesages; i++)
- {
- Message message = session.createTextMessage(generateString(length));
- message.setIntProperty("ID", i);
- message.setStringProperty("testprop", selectorProperty);
- publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
- }
- }
-
- /**
- * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
- *
- * @param length number of characters in the string
- * @return string sequence of the given length
- */
- public static String generateString(int length)
- {
- char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'};
- char[] chars = new char[length];
- for (int i = 0; i < (length); i++)
- {
- chars[i] = base_chars[i % 10];
- }
- return new String(chars);
- }
-
- /**
- * Run the preparation tool.
- * @param args Command line arguments.
- */
- public static void main(String[] args) throws Exception
- {
- BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer();
- producer.prepareBroker();
- }
-} \ No newline at end of file
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
deleted file mode 100644
index 4861e007af..0000000000
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ /dev/null
@@ -1,540 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
-import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.management.common.mbeans.ManagedQueue;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.DatabaseEntry;
-
-/**
- * Tests upgrading a BDB store and using it with the new broker
- * after the required contents are entered into the store using
- * an old broker with the BDBStoreUpgradeTestPreparer. The store
- * will then be used to verify that the upgraded is completed
- * properly and that once upgraded it functions as expected with
- * the new broker.
- */
-public class BDBUpgradeTest extends QpidBrokerTestCase
-{
- protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class);
-
- private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024);
- private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256);
- private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK");
- private static final String QPID_HOME = System.getProperty("QPID_HOME");
- private static final int VERSION_4 = 4;
-
- private String _fromDir;
- private String _toDir;
- private String _toDirTwice;
-
- @Override
- public void setUp() throws Exception
- {
- assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
- assertNotNull("QPID_HOME must be set", QPID_HOME);
-
- if(! isExternalBroker())
- {
- //override QPID_WORK to add the InVM port used so the store
- //output from the upgrade tool can be found by the broker
- setSystemProperty("QPID_WORK", QPID_WORK_ORIG + "/" + getPort());
- }
-
- _fromDir = QPID_HOME + "/bdbstore-to-upgrade/test-store";
- _toDir = getWorkDirBaseDir() + "/bdbstore/test-store";
- _toDirTwice = getWorkDirBaseDir() + "/bdbstore-upgraded-twice";
-
- //Clear the two target directories if they exist.
- File directory = new File(_toDir);
- if (directory.exists() && directory.isDirectory())
- {
- FileUtils.delete(directory, true);
- }
- directory = new File(_toDirTwice);
- if (directory.exists() && directory.isDirectory())
- {
- FileUtils.delete(directory, true);
- }
-
- //Upgrade the test store.
- upgradeBrokerStore(_fromDir, _toDir);
-
- //override the broker config used and then start the broker with the updated store
- _configFile = new File("build/etc/config-systests-bdb.xml");
- setConfigurationProperty("management.enabled", "true");
-
- super.setUp();
- }
-
- private String getWorkDirBaseDir()
- {
- return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort());
- }
-
- /**
- * Tests that the core upgrade method of the store upgrade tool passes through the exception
- * from the BDBMessageStore indicating that the data on disk can't be loaded as the previous
- * version because it has already been upgraded.
- * @throws Exception
- */
- public void testMultipleUpgrades() throws Exception
- {
- //stop the broker started by setUp() in order to allow the second upgrade attempt to proceed
- stopBroker();
-
- try
- {
- new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(VERSION_4);
- fail("Second Upgrade Succeeded");
- }
- catch (Exception e)
- {
- System.err.println("Showing stack trace, we are expecting an 'Unable to load BDBStore' error");
- e.printStackTrace();
- assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
- e.getMessage().contains("Unable to load BDBStore as version 4. Store on disk contains version 5 data"));
- }
- }
-
- /**
- * Test that the selector applied to the DurableSubscription was successfully
- * transfered to the new store, and functions as expected with continued use
- * by monitoring message count while sending new messages to the topic.
- */
- public void testSelectorDurability() throws Exception
- {
- JMXTestUtils jmxUtils = null;
- try
- {
- jmxUtils = new JMXTestUtils(this, "guest", "guest");
- jmxUtils.open();
- }
- catch (Exception e)
- {
- fail("Unable to establish JMX connection, test cannot proceed");
- }
-
- try
- {
- ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
- assertEquals("DurableSubscription backing queue should have 1 message on it initially",
- new Integer(1), dursubQueue.getMessageCount());
-
- // Create a connection and start it
- TopicConnection connection = (TopicConnection) getConnection();
- connection.start();
-
- // Send messages which don't match and do match the selector, checking message count
- TopicSession pubSession = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED);
- Topic topic = pubSession.createTopic(TOPIC_NAME);
- TopicPublisher publisher = pubSession.createPublisher(topic);
-
- BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
- pubSession.commit();
- assertEquals("DurableSubscription backing queue should still have 1 message on it",
- new Integer(1), dursubQueue.getMessageCount());
-
- BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
- pubSession.commit();
- assertEquals("DurableSubscription backing queue should now have 2 messages on it",
- new Integer(2), dursubQueue.getMessageCount());
-
- dursubQueue.clearQueue();
- pubSession.close();
- }
- finally
- {
- jmxUtils.close();
- }
- }
-
- /**
- * Test that the backing queue for the durable subscription created was successfully
- * detected and set as being exclusive during the upgrade process, and that the
- * regular queue was not.
- */
- public void testQueueExclusivity() throws Exception
- {
- JMXTestUtils jmxUtils = null;
- try
- {
- jmxUtils = new JMXTestUtils(this, "guest", "guest");
- jmxUtils.open();
- }
- catch (Exception e)
- {
- fail("Unable to establish JMX connection, test cannot proceed");
- }
-
- try
- {
- ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME);
- assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive());
-
- ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
- assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive());
- }
- finally
- {
- jmxUtils.close();
- }
- }
-
- /**
- * Test that the upgraded queue continues to function properly when used
- * for persistent messaging and restarting the broker.
- *
- * Sends the new messages to the queue BEFORE consuming those which were
- * sent before the upgrade. In doing so, this also serves to test that
- * the queue bindings were successfully transitioned during the upgrade.
- */
- public void testBindingAndMessageDurabability() throws Exception
- {
- // Create a connection and start it
- TopicConnection connection = (TopicConnection) getConnection();
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(QUEUE_NAME);
- MessageProducer messageProducer = session.createProducer(queue);
-
- // Send a new message
- BDBStoreUpgradeTestPreparer.sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1);
-
- session.close();
-
- // Restart the broker
- restartBroker();
-
- // Drain the queue of all messages
- connection = (TopicConnection) getConnection();
- connection.start();
- consumeQueueMessages(connection, true);
- }
-
- /**
- * Test that all of the committed persistent messages previously sent to
- * the broker are properly received following update of the MetaData and
- * Content entries during the store upgrade process.
- */
- public void testConsumptionOfUpgradedMessages() throws Exception
- {
- // Create a connection and start it
- Connection connection = getConnection();
- connection.start();
-
- consumeDurableSubscriptionMessages(connection);
- consumeQueueMessages(connection, false);
- }
-
- /**
- * Tests store migration containing messages for non-existing queue.
- *
- * @throws Exception
- */
- public void testMigrationOfMessagesForNonExistingQueues() throws Exception
- {
- stopBroker();
-
- // copy store data into a new location for adding of phantom message
- File storeLocation = new File(_fromDir);
- File target = new File(_toDirTwice);
- if (!target.exists())
- {
- target.mkdirs();
- }
- FileUtils.copyRecursive(storeLocation, target);
-
- // delete migrated data
- File directory = new File(_toDir);
- if (directory.exists() && directory.isDirectory())
- {
- FileUtils.delete(directory, true);
- }
-
- // test data
- String nonExistingQueueName = getTestQueueName();
- String messageText = "Test Phantom Message";
-
- // add message
- addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText);
-
- String[] inputs = { "Yes", "Yes", "Yes" };
- upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs);
-
- // start broker
- startBroker();
-
- // Create a connection and start it
- Connection connection = getConnection();
- connection.start();
-
- // consume a message for non-existing store
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(nonExistingQueueName);
- MessageConsumer messageConsumer = session.createConsumer(queue);
- Message message = messageConsumer.receive(1000);
-
- // assert consumed message
- assertNotNull("Message was not migrated!", message);
- assertTrue("Unexpected message received!", message instanceof TextMessage);
- String text = ((TextMessage) message).getText();
- assertEquals("Message migration failed!", messageText, text);
- }
-
- /**
- * An utility method to upgrade broker with simulation user interactions
- *
- * @param fromDir
- * location of the store to migrate
- * @param toDir
- * location of where migrated data will be stored
- * @param inputs
- * user answers on upgrade tool questions
- * @throws Exception
- */
- private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs)
- throws Exception
- {
- // save to restore system.in after data migration
- InputStream stdin = System.in;
-
- // set fake system in to simulate user interactions
- // FIXME: it is a quite dirty simulator of system input but it does the job
- System.setIn(new InputStream()
- {
-
- int counter = 0;
-
- public synchronized int read(byte b[], int off, int len)
- {
- byte[] src = (inputs[counter] + "\n").getBytes();
- System.arraycopy(src, 0, b, off, src.length);
- counter++;
- return src.length;
- }
-
- @Override
- public int read() throws IOException
- {
- return -1;
- }
- });
-
- try
- {
- // Upgrade the test store.
- new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4);
- }
- finally
- {
- // restore system in
- System.setIn(stdin);
- }
- }
-
- @SuppressWarnings("unchecked")
- private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName,
- String messageText) throws Exception
- {
- final AMQShortString queueName = new AMQShortString(nonExistingQueueName);
- BDBMessageStore store = new BDBMessageStore(storeVersion);
- store.configure(storeLocation, false);
- try
- {
- store.start();
-
- // store message objects
- ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8"));
- long bodySize = completeContentBody.limit();
- MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false,
- false, queueName);
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
- props.setContentType("text/plain");
- props.setType("text/plain");
- props.setMessageId("whatever");
- props.setEncoding("UTF-8");
- props.getHeaders().setString("Test", "MST");
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
- ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
-
- // add content entry to database
- long messageId = store.getNewMessageId();
- TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
- MessageContentKey contentKey = null;
- if (storeVersion == VERSION_4)
- {
- contentKey = new MessageContentKey_4(messageId, 0);
- }
- else
- {
- throw new Exception(storeVersion + " is not supported");
- }
- DatabaseEntry key = new DatabaseEntry();
- contentKeyTB.objectToEntry(contentKey, key);
- DatabaseEntry data = new DatabaseEntry();
- ContentTB contentTB = new ContentTB();
- contentTB.objectToEntry(completeContentBody, data);
- store.getContentDb().put(null, key, data);
-
- // add meta data entry to database
- TupleBinding<Long> longTB = TupleBinding.getPrimitiveBinding(Long.class);
- TupleBinding<Object> metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance();
- key = new DatabaseEntry();
- data = new DatabaseEntry();
- longTB.objectToEntry(new Long(messageId), key);
- MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1);
- metaDataTB.objectToEntry(metaData, data);
- store.getMetaDataDb().put(null, key, data);
-
- // add delivery entry to database
- TransactionLogResource mockQueue = new TransactionLogResource()
- {
- public String getResourceName()
- {
- return queueName.asString();
- }
- };
- TransactionLog log = (TransactionLog) store;
- TransactionLog.Transaction txn = log.newTransaction();
- txn.enqueueMessage(mockQueue, messageId);
- txn.commitTran();
- }
- finally
- {
- // close store
- store.close();
- }
- }
-
- private void consumeDurableSubscriptionMessages(Connection connection) throws Exception
- {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(TOPIC_NAME);
-
- TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false);
-
- // Retrieve the matching message
- Message m = durSub.receive(2000);
- assertNotNull("Failed to receive an expected message", m);
- assertEquals("Selector property did not match", "true", m.getStringProperty("testprop"));
- assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
- assertEquals("Message content was not as expected",BDBStoreUpgradeTestPreparer.generateString(1024) , ((TextMessage)m).getText());
-
- // Verify that neither the non-matching or uncommitted message are received
- m = durSub.receive(1000);
- assertNull("No more messages should have been recieved", m);
-
- durSub.close();
- session.close();
- }
-
- private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception
- {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(QUEUE_NAME);
-
- MessageConsumer consumer = session.createConsumer(queue);
- Message m;
-
- // Retrieve the initial pre-upgrade messages
- for (int i=1; i <= 5 ; i++)
- {
- m = consumer.receive(2000);
- assertNotNull("Failed to receive an expected message", m);
- assertEquals("ID property did not match", i, m.getIntProperty("ID"));
- assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
- }
- for (int i=1; i <= 5 ; i++)
- {
- m = consumer.receive(2000);
- assertNotNull("Failed to receive an expected message", m);
- assertEquals("ID property did not match", i, m.getIntProperty("ID"));
- assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText());
- }
-
- if(extraMessage)
- {
- //verify that the extra message is received
- m = consumer.receive(2000);
- assertNotNull("Failed to receive an expected message", m);
- assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
- assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
- }
-
- // Verify that no more messages are received
- m = consumer.receive(1000);
- assertNull("No more messages should have been recieved", m);
-
- consumer.close();
- session.close();
- }
-
- private void upgradeBrokerStore(String fromDir, String toDir) throws Exception
- {
- new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_4);
- }
-}
diff --git a/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb b/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
deleted file mode 100644
index c4e4e6c306..0000000000
--- a/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
+++ /dev/null
Binary files differ